ndi: Update to git version of the bindings

This commit is contained in:
Vivia Nikolaidou 2022-10-12 19:36:43 +03:00 committed by Sebastian Dröge
parent 18cbb587ba
commit 77a5e35081
7 changed files with 181 additions and 211 deletions

View file

@ -8,11 +8,11 @@ description = "NewTek NDI Plugin"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
glib = "0.15" glib = { git = "https://github.com/gtk-rs/gtk-rs-core"}
gst = { package = "gstreamer", version = "0.18", features = ["v1_12"] } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-base = { package = "gstreamer-base", version = "0.18" } gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-audio = { package = "gstreamer-audio", version = "0.18" } gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-video = { package = "gstreamer-video", version = "0.18", features = ["v1_12"] } gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
byte-slice-cast = "1" byte-slice-cast = "1"
once_cell = "1.0" once_cell = "1.0"
byteorder = "1.0" byteorder = "1.0"
@ -24,7 +24,7 @@ gst-plugin-version-helper = "0.7"
[features] [features]
default = ["interlaced-fields", "reference-timestamps", "sink"] default = ["interlaced-fields", "reference-timestamps", "sink"]
interlaced-fields = ["gst/v1_16", "gst-video/v1_16"] interlaced-fields = ["gst/v1_16", "gst-video/v1_16"]
reference-timestamps = ["gst/v1_14"] reference-timestamps = []
sink = ["gst/v1_18", "gst-base/v1_18"] sink = ["gst/v1_18", "gst-base/v1_18"]
advanced-sdk = [] advanced-sdk = []

View file

@ -1,6 +1,6 @@
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_error, gst_log, gst_trace}; use gst::{error, log, trace};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
@ -60,7 +60,7 @@ impl DeviceProviderImpl for DeviceProvider {
Some(&*METADATA) Some(&*METADATA)
} }
fn probe(&self, _device_provider: &Self::Type) -> Vec<gst::Device> { fn probe(&self) -> Vec<gst::Device> {
self.current_devices self.current_devices
.lock() .lock()
.unwrap() .unwrap()
@ -69,10 +69,11 @@ impl DeviceProviderImpl for DeviceProvider {
.collect() .collect()
} }
fn start(&self, device_provider: &Self::Type) -> Result<(), gst::LoggableError> { fn start(&self) -> Result<(), gst::LoggableError> {
let mut thread_guard = self.thread.lock().unwrap(); let mut thread_guard = self.thread.lock().unwrap();
let device_provider = self.instance();
if thread_guard.is_some() { if thread_guard.is_some() {
gst_log!(CAT, obj: device_provider, "Device provider already started"); log!(CAT, obj: device_provider, "Device provider already started");
return Ok(()); return Ok(());
} }
@ -90,13 +91,13 @@ impl DeviceProviderImpl for DeviceProvider {
{ {
let mut find_guard = imp.find.lock().unwrap(); let mut find_guard = imp.find.lock().unwrap();
if find_guard.is_some() { if find_guard.is_some() {
gst_log!(CAT, obj: &device_provider, "Already started"); log!(CAT, obj: &device_provider, "Already started");
return; return;
} }
let find = match ndi::FindInstance::builder().build() { let find = match ndi::FindInstance::builder().build() {
None => { None => {
gst_error!(CAT, obj: &device_provider, "Failed to create Find instance"); error!(CAT, obj: &device_provider, "Failed to create Find instance");
return; return;
} }
Some(find) => find, Some(find) => find,
@ -123,7 +124,7 @@ impl DeviceProviderImpl for DeviceProvider {
Ok(()) Ok(())
} }
fn stop(&self, _device_provider: &Self::Type) { fn stop(&self) {
if let Some(_thread) = self.thread.lock().unwrap().take() { if let Some(_thread) = self.thread.lock().unwrap().take() {
self.is_running.store(false, atomic::Ordering::SeqCst); self.is_running.store(false, atomic::Ordering::SeqCst);
// Don't actually join because that might take a while // Don't actually join because that might take a while
@ -140,7 +141,7 @@ impl DeviceProvider {
}; };
if !find.wait_for_sources(if first { 1000 } else { 5000 }) { if !find.wait_for_sources(if first { 1000 } else { 5000 }) {
gst_trace!(CAT, obj: device_provider, "No new sources found"); trace!(CAT, obj: device_provider, "No new sources found");
return; return;
} }
@ -156,8 +157,8 @@ impl DeviceProvider {
let old_device_imp = Device::from_instance(old_device); let old_device_imp = Device::from_instance(old_device);
let old_source = old_device_imp.source.get().unwrap(); let old_source = old_device_imp.source.get().unwrap();
if !sources.contains(&*old_source) { if !sources.contains(old_source) {
gst_log!( log!(
CAT, CAT,
obj: device_provider, obj: device_provider,
"Source {:?} disappeared", "Source {:?} disappeared",
@ -184,7 +185,7 @@ impl DeviceProvider {
// Now go through all new devices and announce them // Now go through all new devices and announce them
for source in sources { for source in sources {
gst_log!(CAT, obj: device_provider, "Source {:?} appeared", source); log!(CAT, obj: device_provider, "Source {:?} appeared", source);
let device = super::Device::new(&source); let device = super::Device::new(&source);
device_provider.device_add(&device); device_provider.device_add(&device);
current_devices_guard.push(device); current_devices_guard.push(device);
@ -215,11 +216,7 @@ impl ObjectImpl for Device {}
impl GstObjectImpl for Device {} impl GstObjectImpl for Device {}
impl DeviceImpl for Device { impl DeviceImpl for Device {
fn create_element( fn create_element(&self, name: Option<&str>) -> Result<gst::Element, gst::LoggableError> {
&self,
_device: &Self::Type,
name: Option<&str>,
) -> Result<gst::Element, gst::LoggableError> {
let source_info = self.source.get().unwrap(); let source_info = self.source.get().unwrap();
let element = glib::Object::with_type( let element = glib::Object::with_type(
crate::ndisrc::NdiSrc::static_type(), crate::ndisrc::NdiSrc::static_type(),
@ -229,7 +226,6 @@ impl DeviceImpl for Device {
("url-address", &source_info.url_address()), ("url-address", &source_info.url_address()),
], ],
) )
.unwrap()
.dynamic_cast::<gst::Element>() .dynamic_cast::<gst::Element>()
.unwrap(); .unwrap();
@ -258,8 +254,7 @@ impl super::Device {
("display-name", &display_name), ("display-name", &display_name),
("device-class", &device_class), ("device-class", &device_class),
("properties", &extra_properties), ("properties", &extra_properties),
]) ]);
.unwrap();
let device_impl = Device::from_instance(&device); let device_impl = Device::from_instance(&device);
device_impl.source.set(source.to_owned()).unwrap(); device_impl.source.set(source.to_owned()).unwrap();

View file

@ -1,7 +1,7 @@
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_info, gst_trace}; use gst::{debug, error, info, trace};
use gst_base::prelude::*; use gst_base::prelude::*;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
@ -76,13 +76,7 @@ impl ObjectImpl for NdiSink {
PROPERTIES.as_ref() PROPERTIES.as_ref()
} }
fn set_property( fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
&self,
_obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() { match pspec.name() {
"ndi-name" => { "ndi-name" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
@ -94,7 +88,7 @@ impl ObjectImpl for NdiSink {
}; };
} }
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() { match pspec.name() {
"ndi-name" => { "ndi-name" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
@ -176,7 +170,7 @@ impl ElementImpl for NdiSink {
} }
impl BaseSinkImpl for NdiSink { impl BaseSinkImpl for NdiSink {
fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { fn start(&self) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
@ -195,30 +189,30 @@ impl BaseSinkImpl for NdiSink {
audio_info: None, audio_info: None,
}; };
*state_storage = Some(state); *state_storage = Some(state);
gst_info!(CAT, obj: element, "Started"); info!(CAT, obj: self.instance(), "Started");
Ok(()) Ok(())
} }
fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
*state_storage = None; *state_storage = None;
gst_info!(CAT, obj: element, "Stopped"); info!(CAT, obj: self.instance(), "Stopped");
Ok(()) Ok(())
} }
fn unlock(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { fn unlock(&self) -> Result<(), gst::ErrorMessage> {
Ok(()) Ok(())
} }
fn unlock_stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
Ok(()) Ok(())
} }
fn set_caps(&self, element: &Self::Type, caps: &gst::Caps) -> Result<(), gst::LoggableError> { fn set_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> {
gst_debug!(CAT, obj: element, "Setting caps {}", caps); debug!(CAT, obj: self.instance(), "Setting caps {}", caps);
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage { let state = match &mut *state_storage {
@ -244,11 +238,8 @@ impl BaseSinkImpl for NdiSink {
Ok(()) Ok(())
} }
fn render( fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
&self, let element = self.instance();
element: &Self::Type,
buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage { let state = match &mut *state_storage {
None => return Err(gst::FlowError::Error), None => return Err(gst::FlowError::Error),
@ -260,11 +251,11 @@ impl BaseSinkImpl for NdiSink {
for (buffer, info, timecode) in audio_meta.buffers() { for (buffer, info, timecode) in audio_meta.buffers() {
let frame = crate::ndi::AudioFrame::try_from_buffer(info, buffer, *timecode) let frame = crate::ndi::AudioFrame::try_from_buffer(info, buffer, *timecode)
.map_err(|_| { .map_err(|_| {
gst_error!(CAT, obj: element, "Unsupported audio frame"); error!(CAT, obj: element, "Unsupported audio frame");
gst::FlowError::NotNegotiated gst::FlowError::NotNegotiated
})?; })?;
gst_trace!( trace!(
CAT, CAT,
obj: element, obj: element,
"Sending audio buffer {:?} with timecode {} and format {:?}", "Sending audio buffer {:?} with timecode {} and format {:?}",
@ -297,17 +288,17 @@ impl BaseSinkImpl for NdiSink {
let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, info) let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, info)
.map_err(|_| { .map_err(|_| {
gst_error!(CAT, obj: element, "Failed to map buffer"); error!(CAT, obj: element, "Failed to map buffer");
gst::FlowError::Error gst::FlowError::Error
})?; })?;
let frame = crate::ndi::VideoFrame::try_from_video_frame(&frame, timecode) let frame = crate::ndi::VideoFrame::try_from_video_frame(&frame, timecode)
.map_err(|_| { .map_err(|_| {
gst_error!(CAT, obj: element, "Unsupported video frame"); error!(CAT, obj: element, "Unsupported video frame");
gst::FlowError::NotNegotiated gst::FlowError::NotNegotiated
})?; })?;
gst_trace!( trace!(
CAT, CAT,
obj: element, obj: element,
"Sending video buffer {:?} with timecode {} and format {:?}", "Sending video buffer {:?} with timecode {} and format {:?}",
@ -337,11 +328,11 @@ impl BaseSinkImpl for NdiSink {
let frame = let frame =
crate::ndi::AudioFrame::try_from_buffer(info, buffer, timecode).map_err(|_| { crate::ndi::AudioFrame::try_from_buffer(info, buffer, timecode).map_err(|_| {
gst_error!(CAT, obj: element, "Unsupported audio frame"); error!(CAT, obj: element, "Unsupported audio frame");
gst::FlowError::NotNegotiated gst::FlowError::NotNegotiated
})?; })?;
gst_trace!( trace!(
CAT, CAT,
obj: element, obj: element,
"Sending audio buffer {:?} with timecode {} and format {:?}", "Sending audio buffer {:?} with timecode {} and format {:?}",

View file

@ -2,7 +2,7 @@ use glib::prelude::*;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_trace, gst_warning}; use gst::{debug, error, trace, warning};
use gst_base::prelude::*; use gst_base::prelude::*;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
@ -55,10 +55,11 @@ impl ObjectSubclass for NdiSinkCombiner {
} }
impl ObjectImpl for NdiSinkCombiner { impl ObjectImpl for NdiSinkCombiner {
fn constructed(&self, obj: &Self::Type) { fn constructed(&self) {
let obj = self.instance();
obj.add_pad(&self.video_pad).unwrap(); obj.add_pad(&self.video_pad).unwrap();
self.parent_constructed(obj); self.parent_constructed();
} }
} }
@ -147,12 +148,12 @@ impl ElementImpl for NdiSinkCombiner {
PAD_TEMPLATES.as_ref() PAD_TEMPLATES.as_ref()
} }
fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) { fn release_pad(&self, pad: &gst::Pad) {
let mut audio_pad_storage = self.audio_pad.lock().unwrap(); let mut audio_pad_storage = self.audio_pad.lock().unwrap();
if audio_pad_storage.as_ref().map(|p| p.upcast_ref()) == Some(pad) { if audio_pad_storage.as_ref().map(|p| p.upcast_ref()) == Some(pad) {
gst_debug!(CAT, obj: element, "Release audio pad"); debug!(CAT, obj: self.instance(), "Release audio pad");
self.parent_release_pad(element, pad); self.parent_release_pad(pad);
*audio_pad_storage = None; *audio_pad_storage = None;
} }
} }
@ -161,21 +162,21 @@ impl ElementImpl for NdiSinkCombiner {
impl AggregatorImpl for NdiSinkCombiner { impl AggregatorImpl for NdiSinkCombiner {
fn create_new_pad( fn create_new_pad(
&self, &self,
agg: &Self::Type,
templ: &gst::PadTemplate, templ: &gst::PadTemplate,
_req_name: Option<&str>, _req_name: Option<&str>,
_caps: Option<&gst::Caps>, _caps: Option<&gst::Caps>,
) -> Option<gst_base::AggregatorPad> { ) -> Option<gst_base::AggregatorPad> {
let agg = self.instance();
let mut audio_pad_storage = self.audio_pad.lock().unwrap(); let mut audio_pad_storage = self.audio_pad.lock().unwrap();
if audio_pad_storage.is_some() { if audio_pad_storage.is_some() {
gst_error!(CAT, obj: agg, "Audio pad already requested"); error!(CAT, obj: agg, "Audio pad already requested");
return None; return None;
} }
let sink_templ = agg.pad_template("audio").unwrap(); let sink_templ = agg.pad_template("audio").unwrap();
if templ != &sink_templ { if templ != &sink_templ {
gst_error!(CAT, obj: agg, "Wrong pad template"); error!(CAT, obj: agg, "Wrong pad template");
return None; return None;
} }
@ -183,12 +184,12 @@ impl AggregatorImpl for NdiSinkCombiner {
gst::PadBuilder::<gst_base::AggregatorPad>::from_template(templ, Some("audio")).build(); gst::PadBuilder::<gst_base::AggregatorPad>::from_template(templ, Some("audio")).build();
*audio_pad_storage = Some(pad.clone()); *audio_pad_storage = Some(pad.clone());
gst_debug!(CAT, obj: agg, "Requested audio pad"); debug!(CAT, obj: agg, "Requested audio pad");
Some(pad) Some(pad)
} }
fn start(&self, agg: &Self::Type) -> Result<(), gst::ErrorMessage> { fn start(&self) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
*state_storage = Some(State { *state_storage = Some(State {
audio_info: None, audio_info: None,
@ -197,48 +198,48 @@ impl AggregatorImpl for NdiSinkCombiner {
current_audio_buffers: Vec::new(), current_audio_buffers: Vec::new(),
}); });
gst_debug!(CAT, obj: agg, "Started"); debug!(CAT, obj: self.instance(), "Started");
Ok(()) Ok(())
} }
fn stop(&self, agg: &Self::Type) -> Result<(), gst::ErrorMessage> { fn stop(&self) -> Result<(), gst::ErrorMessage> {
// Drop our state now // Drop our state now
let _ = self.state.lock().unwrap().take(); let _ = self.state.lock().unwrap().take();
gst_debug!(CAT, obj: agg, "Stopped"); debug!(CAT, obj: self.instance(), "Stopped");
Ok(()) Ok(())
} }
fn next_time(&self, _agg: &Self::Type) -> Option<gst::ClockTime> { fn next_time(&self) -> Option<gst::ClockTime> {
// FIXME: What to do here? We don't really know when the next buffer is expected // FIXME: What to do here? We don't really know when the next buffer is expected
gst::ClockTime::NONE gst::ClockTime::NONE
} }
fn clip( fn clip(
&self, &self,
agg: &Self::Type,
agg_pad: &gst_base::AggregatorPad, agg_pad: &gst_base::AggregatorPad,
mut buffer: gst::Buffer, mut buffer: gst::Buffer,
) -> Option<gst::Buffer> { ) -> Option<gst::Buffer> {
let agg = self.instance();
let segment = match agg_pad.segment().downcast::<gst::ClockTime>() { let segment = match agg_pad.segment().downcast::<gst::ClockTime>() {
Ok(segment) => segment, Ok(segment) => segment,
Err(_) => { Err(_) => {
gst_error!(CAT, obj: agg, "Only TIME segments supported"); error!(CAT, obj: agg, "Only TIME segments supported");
return Some(buffer); return Some(buffer);
} }
}; };
let pts = buffer.pts(); let pts = buffer.pts();
if pts.is_none() { if pts.is_none() {
gst_error!(CAT, obj: agg, "Only buffers with PTS supported"); error!(CAT, obj: agg, "Only buffers with PTS supported");
return Some(buffer); return Some(buffer);
} }
let duration = buffer.duration(); let duration = buffer.duration();
gst_trace!( trace!(
CAT, CAT,
obj: agg_pad, obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}", "Clipping buffer {:?} with PTS {} and duration {}",
@ -273,7 +274,7 @@ impl AggregatorImpl for NdiSinkCombiner {
unreachable!() unreachable!()
}; };
gst_debug!( debug!(
CAT, CAT,
obj: agg_pad, obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}", "Clipping buffer {:?} with PTS {} and duration {}",
@ -312,15 +313,12 @@ impl AggregatorImpl for NdiSinkCombiner {
} }
} }
fn aggregate( fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> {
&self,
agg: &Self::Type,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
// FIXME: Can't really happen because we always return NONE from get_next_time() but that // FIXME: Can't really happen because we always return NONE from get_next_time() but that
// should be improved! // should be improved!
assert!(!timeout); assert!(!timeout);
let agg = self.instance();
// Because peek_buffer() can call into clip() and that would take the state lock again, // Because peek_buffer() can call into clip() and that would take the state lock again,
// first try getting buffers from both pads here // first try getting buffers from both pads here
let video_buffer_and_segment = match self.video_pad.peek_buffer() { let video_buffer_and_segment = match self.video_pad.peek_buffer() {
@ -329,7 +327,7 @@ impl AggregatorImpl for NdiSinkCombiner {
let video_segment = match video_segment.downcast::<gst::ClockTime>() { let video_segment = match video_segment.downcast::<gst::ClockTime>() {
Ok(video_segment) => video_segment, Ok(video_segment) => video_segment,
Err(video_segment) => { Err(video_segment) => {
gst_error!( error!(
CAT, CAT,
obj: agg, obj: agg,
"Video segment of wrong format {:?}", "Video segment of wrong format {:?}",
@ -342,7 +340,7 @@ impl AggregatorImpl for NdiSinkCombiner {
Some((video_buffer, video_segment)) Some((video_buffer, video_segment))
} }
None if !self.video_pad.is_eos() => { None if !self.video_pad.is_eos() => {
gst_trace!(CAT, obj: agg, "Waiting for video buffer"); trace!(CAT, obj: agg, "Waiting for video buffer");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
} }
None => None, None => None,
@ -354,7 +352,7 @@ impl AggregatorImpl for NdiSinkCombiner {
Some(audio_buffer) if audio_buffer.size() == 0 => { Some(audio_buffer) if audio_buffer.size() == 0 => {
// Skip empty/gap audio buffer // Skip empty/gap audio buffer
audio_pad.drop_buffer(); audio_pad.drop_buffer();
gst_trace!(CAT, obj: agg, "Empty audio buffer, waiting for next"); trace!(CAT, obj: agg, "Empty audio buffer, waiting for next");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
} }
Some(audio_buffer) => { Some(audio_buffer) => {
@ -362,7 +360,7 @@ impl AggregatorImpl for NdiSinkCombiner {
let audio_segment = match audio_segment.downcast::<gst::ClockTime>() { let audio_segment = match audio_segment.downcast::<gst::ClockTime>() {
Ok(audio_segment) => audio_segment, Ok(audio_segment) => audio_segment,
Err(audio_segment) => { Err(audio_segment) => {
gst_error!( error!(
CAT, CAT,
obj: agg, obj: agg,
"Audio segment of wrong format {:?}", "Audio segment of wrong format {:?}",
@ -375,7 +373,7 @@ impl AggregatorImpl for NdiSinkCombiner {
Some((audio_buffer, audio_segment, audio_pad)) Some((audio_buffer, audio_segment, audio_pad))
} }
None if !audio_pad.is_eos() => { None if !audio_pad.is_eos() => {
gst_trace!(CAT, obj: agg, "Waiting for audio buffer"); trace!(CAT, obj: agg, "Waiting for audio buffer");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
} }
None => None, None => None,
@ -396,7 +394,7 @@ impl AggregatorImpl for NdiSinkCombiner {
match state.current_video_buffer { match state.current_video_buffer {
None => { None => {
gst_trace!(CAT, obj: agg, "First video buffer, waiting for second"); trace!(CAT, obj: agg, "First video buffer, waiting for second");
state.current_video_buffer = Some((video_buffer, video_running_time)); state.current_video_buffer = Some((video_buffer, video_running_time));
drop(state_storage); drop(state_storage);
self.video_pad.drop_buffer(); self.video_pad.drop_buffer();
@ -411,7 +409,7 @@ impl AggregatorImpl for NdiSinkCombiner {
} else { } else {
match (&state.current_video_buffer, &audio_buffer_segment_and_pad) { match (&state.current_video_buffer, &audio_buffer_segment_and_pad) {
(None, None) => { (None, None) => {
gst_trace!( trace!(
CAT, CAT,
obj: agg, obj: agg,
"All pads are EOS and no buffers are queued, finishing" "All pads are EOS and no buffers are queued, finishing"
@ -428,7 +426,7 @@ impl AggregatorImpl for NdiSinkCombiner {
let video_segment = match video_segment.downcast::<gst::ClockTime>() { let video_segment = match video_segment.downcast::<gst::ClockTime>() {
Ok(video_segment) => video_segment, Ok(video_segment) => video_segment,
Err(video_segment) => { Err(video_segment) => {
gst_error!( error!(
CAT, CAT,
obj: agg, obj: agg,
"Video segment of wrong format {:?}", "Video segment of wrong format {:?}",
@ -440,7 +438,7 @@ impl AggregatorImpl for NdiSinkCombiner {
let video_pts = let video_pts =
video_segment.position_from_running_time(audio_running_time); video_segment.position_from_running_time(audio_running_time);
if video_pts.is_none() { if video_pts.is_none() {
gst_warning!(CAT, obj: agg, "Can't output more audio after video EOS"); warning!(CAT, obj: agg, "Can't output more audio after video EOS");
return Err(gst::FlowError::Eos); return Err(gst::FlowError::Eos);
} }
@ -460,7 +458,7 @@ impl AggregatorImpl for NdiSinkCombiner {
let audio_info = match state.audio_info { let audio_info = match state.audio_info {
Some(ref audio_info) => audio_info, Some(ref audio_info) => audio_info,
None => { None => {
gst_error!(CAT, obj: agg, "Have no audio caps"); error!(CAT, obj: agg, "Have no audio caps");
return Err(gst::FlowError::NotNegotiated); return Err(gst::FlowError::NotNegotiated);
} }
}; };
@ -487,7 +485,7 @@ impl AggregatorImpl for NdiSinkCombiner {
}) })
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
gst_trace!( trace!(
CAT, CAT,
obj: agg, obj: agg,
"Including audio buffer {:?} with timecode {}: {} <= {}", "Including audio buffer {:?} with timecode {}: {} <= {}",
@ -528,7 +526,7 @@ impl AggregatorImpl for NdiSinkCombiner {
drop(state_storage); drop(state_storage);
} }
gst_trace!( trace!(
CAT, CAT,
obj: agg, obj: agg,
"Finishing video buffer {:?}", "Finishing video buffer {:?}",
@ -537,13 +535,9 @@ impl AggregatorImpl for NdiSinkCombiner {
agg.finish_buffer(current_video_buffer) agg.finish_buffer(current_video_buffer)
} }
fn sink_event( fn sink_event(&self, pad: &gst_base::AggregatorPad, event: gst::Event) -> bool {
&self,
agg: &Self::Type,
pad: &gst_base::AggregatorPad,
event: gst::Event,
) -> bool {
use gst::EventView; use gst::EventView;
let agg = self.instance();
match event.view() { match event.view() {
EventView::Caps(caps) => { EventView::Caps(caps) => {
@ -559,7 +553,7 @@ impl AggregatorImpl for NdiSinkCombiner {
let info = match gst_video::VideoInfo::from_caps(&caps) { let info = match gst_video::VideoInfo::from_caps(&caps) {
Ok(info) => info, Ok(info) => info,
Err(_) => { Err(_) => {
gst_error!(CAT, obj: pad, "Failed to parse caps {:?}", caps); error!(CAT, obj: pad, "Failed to parse caps {:?}", caps);
return false; return false;
} }
}; };
@ -587,7 +581,7 @@ impl AggregatorImpl for NdiSinkCombiner {
let info = match gst_audio::AudioInfo::from_caps(&caps) { let info = match gst_audio::AudioInfo::from_caps(&caps) {
Ok(info) => info, Ok(info) => info,
Err(_) => { Err(_) => {
gst_error!(CAT, obj: pad, "Failed to parse caps {:?}", caps); error!(CAT, obj: pad, "Failed to parse caps {:?}", caps);
return false; return false;
} }
}; };
@ -598,36 +592,31 @@ impl AggregatorImpl for NdiSinkCombiner {
// The video segment is passed through as-is and the video timestamps are preserved // The video segment is passed through as-is and the video timestamps are preserved
EventView::Segment(segment) if pad == &self.video_pad => { EventView::Segment(segment) if pad == &self.video_pad => {
let segment = segment.segment(); let segment = segment.segment();
gst_debug!(CAT, obj: agg, "Updating segment {:?}", segment); debug!(CAT, obj: agg, "Updating segment {:?}", segment);
agg.update_segment(segment); agg.update_segment(segment);
} }
_ => (), _ => (),
} }
self.parent_sink_event(agg, pad, event) self.parent_sink_event(pad, event)
} }
fn sink_query( fn sink_query(&self, pad: &gst_base::AggregatorPad, query: &mut gst::QueryRef) -> bool {
&self, use gst::QueryViewMut;
agg: &Self::Type,
pad: &gst_base::AggregatorPad,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
match query.view_mut() { match query.view_mut() {
QueryView::Caps(_) if pad == &self.video_pad => { QueryViewMut::Caps(_) if pad == &self.video_pad => {
// Directly forward caps queries // Directly forward caps queries
let srcpad = agg.static_pad("src").unwrap(); let srcpad = self.instance().static_pad("src").unwrap();
return srcpad.peer_query(query); return srcpad.peer_query(query);
} }
_ => (), _ => (),
} }
self.parent_sink_query(agg, pad, query) self.parent_sink_query(pad, query)
} }
fn negotiate(&self, _agg: &Self::Type) -> bool { fn negotiate(&self) -> bool {
// No negotiation needed as the video caps are just passed through // No negotiation needed as the video caps are just passed through
true true
} }

View file

@ -1,6 +1,6 @@
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error}; use gst::{debug, error};
use gst_base::prelude::*; use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
@ -184,8 +184,9 @@ impl ObjectImpl for NdiSrc {
PROPERTIES.as_ref() PROPERTIES.as_ref()
} }
fn constructed(&self, obj: &Self::Type) { fn constructed(&self) {
self.parent_constructed(obj); self.parent_constructed();
let obj = self.instance();
// Initialize live-ness and notify the base class that // Initialize live-ness and notify the base class that
// we'd like to operate in Time format // we'd like to operate in Time format
@ -193,18 +194,13 @@ impl ObjectImpl for NdiSrc {
obj.set_format(gst::Format::Time); obj.set_format(gst::Format::Time);
} }
fn set_property( fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
&self, let obj = self.instance();
obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() { match pspec.name() {
"ndi-name" => { "ndi-name" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let ndi_name = value.get().unwrap(); let ndi_name = value.get().unwrap();
gst_debug!( debug!(
CAT, CAT,
obj: obj, obj: obj,
"Changing ndi-name from {:?} to {:?}", "Changing ndi-name from {:?} to {:?}",
@ -216,7 +212,7 @@ impl ObjectImpl for NdiSrc {
"url-address" => { "url-address" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap(); let url_address = value.get().unwrap();
gst_debug!( debug!(
CAT, CAT,
obj: obj, obj: obj,
"Changing url-address from {:?} to {:?}", "Changing url-address from {:?} to {:?}",
@ -228,7 +224,7 @@ impl ObjectImpl for NdiSrc {
"receiver-ndi-name" => { "receiver-ndi-name" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get::<Option<String>>().unwrap(); let receiver_ndi_name = value.get::<Option<String>>().unwrap();
gst_debug!( debug!(
CAT, CAT,
obj: obj, obj: obj,
"Changing receiver-ndi-name from {:?} to {:?}", "Changing receiver-ndi-name from {:?} to {:?}",
@ -241,7 +237,7 @@ impl ObjectImpl for NdiSrc {
"connect-timeout" => { "connect-timeout" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get().unwrap(); let connect_timeout = value.get().unwrap();
gst_debug!( debug!(
CAT, CAT,
obj: obj, obj: obj,
"Changing connect-timeout from {} to {}", "Changing connect-timeout from {} to {}",
@ -253,7 +249,7 @@ impl ObjectImpl for NdiSrc {
"timeout" => { "timeout" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let timeout = value.get().unwrap(); let timeout = value.get().unwrap();
gst_debug!( debug!(
CAT, CAT,
obj: obj, obj: obj,
"Changing timeout from {} to {}", "Changing timeout from {} to {}",
@ -265,7 +261,7 @@ impl ObjectImpl for NdiSrc {
"max-queue-length" => { "max-queue-length" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get().unwrap(); let max_queue_length = value.get().unwrap();
gst_debug!( debug!(
CAT, CAT,
obj: obj, obj: obj,
"Changing max-queue-length from {} to {}", "Changing max-queue-length from {} to {}",
@ -277,7 +273,7 @@ impl ObjectImpl for NdiSrc {
"bandwidth" => { "bandwidth" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get().unwrap(); let bandwidth = value.get().unwrap();
gst_debug!( debug!(
CAT, CAT,
obj: obj, obj: obj,
"Changing bandwidth from {} to {}", "Changing bandwidth from {} to {}",
@ -289,7 +285,7 @@ impl ObjectImpl for NdiSrc {
"color-format" => { "color-format" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let color_format = value.get().unwrap(); let color_format = value.get().unwrap();
gst_debug!( debug!(
CAT, CAT,
obj: obj, obj: obj,
"Changing color format from {:?} to {:?}", "Changing color format from {:?} to {:?}",
@ -301,7 +297,7 @@ impl ObjectImpl for NdiSrc {
"timestamp-mode" => { "timestamp-mode" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get().unwrap(); let timestamp_mode = value.get().unwrap();
gst_debug!( debug!(
CAT, CAT,
obj: obj, obj: obj,
"Changing timestamp mode from {:?} to {:?}", "Changing timestamp mode from {:?} to {:?}",
@ -309,7 +305,7 @@ impl ObjectImpl for NdiSrc {
timestamp_mode timestamp_mode
); );
if settings.timestamp_mode != timestamp_mode { if settings.timestamp_mode != timestamp_mode {
let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); let _ = obj.post_message(gst::message::Latency::builder().src(&*obj).build());
} }
settings.timestamp_mode = timestamp_mode; settings.timestamp_mode = timestamp_mode;
} }
@ -317,7 +313,7 @@ impl ObjectImpl for NdiSrc {
} }
} }
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() { match pspec.name() {
"ndi-name" => { "ndi-name" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
@ -394,7 +390,6 @@ impl ElementImpl for NdiSrc {
fn change_state( fn change_state(
&self, &self,
element: &Self::Type,
transition: gst::StateChange, transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition { match transition {
@ -416,34 +411,34 @@ impl ElementImpl for NdiSrc {
_ => (), _ => (),
} }
self.parent_change_state(element, transition) self.parent_change_state(transition)
} }
} }
impl BaseSrcImpl for NdiSrc { impl BaseSrcImpl for NdiSrc {
fn negotiate(&self, element: &Self::Type) -> Result<(), gst::LoggableError> { fn negotiate(&self) -> Result<(), gst::LoggableError> {
element self.instance()
.set_caps(&gst::Caps::builder("application/x-ndi").build()) .set_caps(&gst::Caps::builder("application/x-ndi").build())
.map_err(|_| gst::loggable_error!(CAT, "Failed to negotiate caps",)) .map_err(|_| gst::loggable_error!(CAT, "Failed to negotiate caps",))
} }
fn unlock(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { fn unlock(&self) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Unlocking",); debug!(CAT, obj: self.instance(), "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true); controller.set_flushing(true);
} }
Ok(()) Ok(())
} }
fn unlock_stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stop unlocking",); debug!(CAT, obj: self.instance(), "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false); controller.set_flushing(false);
} }
Ok(()) Ok(())
} }
fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { fn start(&self) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default(); *self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone(); let settings = self.settings.lock().unwrap().clone();
@ -455,7 +450,7 @@ impl BaseSrcImpl for NdiSrc {
} }
let receiver = Receiver::connect( let receiver = Receiver::connect(
element.upcast_ref(), self.instance().upcast_ref(),
settings.ndi_name.as_deref(), settings.ndi_name.as_deref(),
settings.url_address.as_deref(), settings.url_address.as_deref(),
&settings.receiver_ndi_name, &settings.receiver_ndi_name,
@ -483,7 +478,7 @@ impl BaseSrcImpl for NdiSrc {
} }
} }
fn stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { fn stop(&self) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() { if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown(); controller.shutdown();
} }
@ -491,16 +486,16 @@ impl BaseSrcImpl for NdiSrc {
Ok(()) Ok(())
} }
fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool { fn query(&self, query: &mut gst::QueryRef) -> bool {
use gst::QueryView; use gst::QueryViewMut;
match query.view_mut() { match query.view_mut() {
QueryView::Scheduling(ref mut q) => { QueryViewMut::Scheduling(ref mut q) => {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]); q.add_scheduling_modes(&[gst::PadMode::Push]);
true true
} }
QueryView::Latency(ref mut q) => { QueryViewMut::Latency(ref mut q) => {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
@ -518,9 +513,9 @@ impl BaseSrcImpl for NdiSrc {
let max = settings.max_queue_length as u64 * latency; let max = settings.max_queue_length as u64 * latency;
gst_debug!( debug!(
CAT, CAT,
obj: element, obj: self.instance(),
"Returning latency min {} max {}", "Returning latency min {} max {}",
min, min,
max max
@ -531,23 +526,23 @@ impl BaseSrcImpl for NdiSrc {
false false
} }
} }
_ => BaseSrcImplExt::parent_query(self, element, query), _ => BaseSrcImplExt::parent_query(self, query),
} }
} }
fn create( fn create(
&self, &self,
element: &Self::Type,
_offset: u64, _offset: u64,
_buffer: Option<&mut gst::BufferRef>, _buffer: Option<&mut gst::BufferRef>,
_length: u32, _length: u32,
) -> Result<CreateSuccess, gst::FlowError> { ) -> Result<CreateSuccess, gst::FlowError> {
let element = self.instance();
let recv = { let recv = {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
match state.receiver.take() { match state.receiver.take() {
Some(recv) => recv, Some(recv) => recv,
None => { None => {
gst_error!(CAT, obj: element, "Have no receiver"); error!(CAT, obj: element, "Have no receiver");
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
} }
@ -616,7 +611,7 @@ impl BaseSrcImpl for NdiSrc {
drop(state); drop(state);
if latency_changed { if latency_changed {
let _ = element.post_message( let _ = element.post_message(
gst::message::Latency::builder().src(element).build(), gst::message::Latency::builder().src(&*element).build(),
); );
} }

View file

@ -1,6 +1,6 @@
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_log}; use gst::{debug, error, log};
use std::sync::Mutex; use std::sync::Mutex;
@ -45,14 +45,14 @@ impl ObjectSubclass for NdiSrcDemux {
NdiSrcDemux::catch_panic_pad_function( NdiSrcDemux::catch_panic_pad_function(
parent, parent,
|| Err(gst::FlowError::Error), || Err(gst::FlowError::Error),
|self_, element| self_.sink_chain(pad, element, buffer), |self_| self_.sink_chain(pad, &self_.instance(), buffer),
) )
}) })
.event_function(|pad, parent, event| { .event_function(|pad, parent, event| {
NdiSrcDemux::catch_panic_pad_function( NdiSrcDemux::catch_panic_pad_function(
parent, parent,
|| false, || false,
|self_, element| self_.sink_event(pad, element, event), |self_| self_.sink_event(pad, &self_.instance(), event),
) )
}) })
.build(); .build();
@ -65,10 +65,10 @@ impl ObjectSubclass for NdiSrcDemux {
} }
impl ObjectImpl for NdiSrcDemux { impl ObjectImpl for NdiSrcDemux {
fn constructed(&self, obj: &Self::Type) { fn constructed(&self) {
self.parent_constructed(obj); self.parent_constructed();
obj.add_pad(&self.sinkpad).unwrap(); self.instance().add_pad(&self.sinkpad).unwrap();
} }
} }
@ -127,10 +127,10 @@ impl ElementImpl for NdiSrcDemux {
#[allow(clippy::single_match)] #[allow(clippy::single_match)]
fn change_state( fn change_state(
&self, &self,
element: &Self::Type,
transition: gst::StateChange, transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
let res = self.parent_change_state(element, transition)?; let element = self.instance();
let res = self.parent_change_state(transition)?;
match transition { match transition {
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
@ -157,13 +157,13 @@ impl NdiSrcDemux {
element: &super::NdiSrcDemux, element: &super::NdiSrcDemux,
mut buffer: gst::Buffer, mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
let meta = buffer let meta = buffer
.make_mut() .make_mut()
.meta_mut::<ndisrcmeta::NdiSrcMeta>() .meta_mut::<ndisrcmeta::NdiSrcMeta>()
.ok_or_else(|| { .ok_or_else(|| {
gst_error!(CAT, obj: element, "Buffer without NDI source meta"); error!(CAT, obj: element, "Buffer without NDI source meta");
gst::FlowError::Error gst::FlowError::Error
})?; })?;
@ -178,7 +178,7 @@ impl NdiSrcDemux {
if let Some(ref pad) = state.audio_pad { if let Some(ref pad) = state.audio_pad {
srcpad = pad.clone(); srcpad = pad.clone();
} else { } else {
gst_debug!(CAT, obj: element, "Adding audio pad with caps {}", caps); debug!(CAT, obj: element, "Adding audio pad with caps {}", caps);
let klass = element.element_class(); let klass = element.element_class();
let templ = klass.pad_template("audio").unwrap(); let templ = klass.pad_template("audio").unwrap();
@ -219,7 +219,7 @@ impl NdiSrcDemux {
} }
if state.audio_caps.as_ref() != Some(&caps) { if state.audio_caps.as_ref() != Some(&caps) {
gst_debug!(CAT, obj: element, "Audio caps changed to {}", caps); debug!(CAT, obj: element, "Audio caps changed to {}", caps);
events.push(gst::event::Caps::new(&caps)); events.push(gst::event::Caps::new(&caps));
state.audio_caps = Some(caps); state.audio_caps = Some(caps);
} }
@ -228,7 +228,7 @@ impl NdiSrcDemux {
if let Some(ref pad) = state.video_pad { if let Some(ref pad) = state.video_pad {
srcpad = pad.clone(); srcpad = pad.clone();
} else { } else {
gst_debug!(CAT, obj: element, "Adding video pad with caps {}", caps); debug!(CAT, obj: element, "Adding video pad with caps {}", caps);
let klass = element.element_class(); let klass = element.element_class();
let templ = klass.pad_template("video").unwrap(); let templ = klass.pad_template("video").unwrap();
@ -269,7 +269,7 @@ impl NdiSrcDemux {
} }
if state.video_caps.as_ref() != Some(&caps) { if state.video_caps.as_ref() != Some(&caps) {
gst_debug!(CAT, obj: element, "Video caps changed to {}", caps); debug!(CAT, obj: element, "Video caps changed to {}", caps);
events.push(gst::event::Caps::new(&caps)); events.push(gst::event::Caps::new(&caps));
state.video_caps = Some(caps); state.video_caps = Some(caps);
} }
@ -299,7 +299,7 @@ impl NdiSrcDemux {
fn sink_event(&self, pad: &gst::Pad, element: &super::NdiSrcDemux, event: gst::Event) -> bool { fn sink_event(&self, pad: &gst::Pad, element: &super::NdiSrcDemux, event: gst::Event) -> bool {
use gst::EventView; use gst::EventView;
gst_log!(CAT, obj: pad, "Handling event {:?}", event); log!(CAT, obj: pad, "Handling event {:?}", event);
if let EventView::Eos(_) = event.view() { if let EventView::Eos(_) = event.view() {
if element.num_src_pads() == 0 { if element.num_src_pads() == 0 {
// error out on EOS if no src pad are available // error out on EOS if no src pad are available

View file

@ -1,6 +1,6 @@
use glib::prelude::*; use glib::prelude::*;
use gst::prelude::*; use gst::prelude::*;
use gst::{gst_debug, gst_error, gst_log, gst_trace, gst_warning}; use gst::{debug, error, log, trace, warning};
use gst_video::prelude::*; use gst_video::prelude::*;
use byte_slice_cast::*; use byte_slice_cast::*;
@ -291,7 +291,7 @@ impl Observations {
let mut inner = self.0.borrow_mut(); let mut inner = self.0.borrow_mut();
gst_trace!( trace!(
CAT, CAT,
obj: element, obj: element,
"Local time {}, remote time {}, slope correct {}/{}", "Local time {}, remote time {}, slope correct {}/{}",
@ -325,7 +325,7 @@ impl Observations {
match (inner.base_remote_time, inner.base_local_time) { match (inner.base_remote_time, inner.base_local_time) {
(Some(remote), Some(local)) => (remote, local), (Some(remote), Some(local)) => (remote, local),
_ => { _ => {
gst_debug!( debug!(
CAT, CAT,
obj: element, obj: element,
"Initializing base time: local {}, remote {}", "Initializing base time: local {}, remote {}",
@ -373,7 +373,7 @@ impl Observations {
// Check for some obviously wrong slopes and try to correct for that // Check for some obviously wrong slopes and try to correct for that
if !(0.5..1.5).contains(&scaled_slope) { if !(0.5..1.5).contains(&scaled_slope) {
gst_warning!( warning!(
CAT, CAT,
obj: element, obj: element,
"Too small/big slope {}, resetting", "Too small/big slope {}, resetting",
@ -414,7 +414,7 @@ impl Observations {
.unwrap() .unwrap()
.0 .0
.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; .mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?;
gst_debug!( debug!(
CAT, CAT,
obj: element, obj: element,
"Initializing base time: local {}, remote {}, slope correction {}/{}", "Initializing base time: local {}, remote {}, slope correction {}/{}",
@ -435,7 +435,7 @@ impl Observations {
let local_diff = local_time.saturating_sub(base_local_time); let local_diff = local_time.saturating_sub(base_local_time);
let delta = (local_diff as i64) - (remote_diff as i64); let delta = (local_diff as i64) - (remote_diff as i64);
gst_trace!( trace!(
CAT, CAT,
obj: element, obj: element,
"Local diff {}, remote diff {}, delta {}", "Local diff {}, remote diff {}, delta {}",
@ -447,7 +447,7 @@ impl Observations {
if (delta > inner.skew && delta - inner.skew > 1_000_000_000) if (delta > inner.skew && delta - inner.skew > 1_000_000_000)
|| (delta < inner.skew && inner.skew - delta > 1_000_000_000) || (delta < inner.skew && inner.skew - delta > 1_000_000_000)
{ {
gst_warning!( warning!(
CAT, CAT,
obj: element, obj: element,
"Delta {} too far from skew {}, resetting", "Delta {} too far from skew {}, resetting",
@ -457,7 +457,7 @@ impl Observations {
let discont = !inner.deltas.is_empty(); let discont = !inner.deltas.is_empty();
gst_debug!( debug!(
CAT, CAT,
obj: element, obj: element,
"Initializing base time: local {}, remote {}", "Initializing base time: local {}, remote {}",
@ -511,14 +511,14 @@ impl Observations {
out_time + (inner.skew as u64) out_time + (inner.skew as u64)
}; };
gst_trace!( trace!(
CAT, CAT,
obj: element, obj: element,
"Skew {}, min delta {}", "Skew {}, min delta {}",
inner.skew, inner.skew,
inner.min_delta inner.min_delta
); );
gst_trace!( trace!(
CAT, CAT,
obj: element, obj: element,
"Outputting {}", "Outputting {}",
@ -563,7 +563,7 @@ impl Drop for ReceiverInner {
let element = self.element.upgrade(); let element = self.element.upgrade();
if let Some(ref element) = element { if let Some(ref element) = element {
gst_debug!(CAT, obj: element, "Closed NDI connection"); debug!(CAT, obj: element, "Closed NDI connection");
} }
} }
} }
@ -684,11 +684,11 @@ impl Receiver {
timeout: u32, timeout: u32,
max_queue_length: usize, max_queue_length: usize,
) -> Option<Self> { ) -> Option<Self> {
gst_debug!(CAT, obj: element, "Starting NDI connection..."); debug!(CAT, obj: element, "Starting NDI connection...");
assert!(ndi_name.is_some() || url_address.is_some()); assert!(ndi_name.is_some() || url_address.is_some());
gst_debug!( debug!(
CAT, CAT,
obj: element, obj: element,
"Connecting to NDI source with NDI name '{:?}' and URL/Address {:?}", "Connecting to NDI source with NDI name '{:?}' and URL/Address {:?}",
@ -754,13 +754,13 @@ impl Receiver {
let flushing = { let flushing = {
let queue = (receiver.0.queue.0).0.lock().unwrap(); let queue = (receiver.0.queue.0).0.lock().unwrap();
if queue.shutdown { if queue.shutdown {
gst_debug!(CAT, obj: &element, "Shutting down"); debug!(CAT, obj: &element, "Shutting down");
break; break;
} }
// If an error happened in the meantime, just go out of here // If an error happened in the meantime, just go out of here
if queue.error.is_some() { if queue.error.is_some() {
gst_error!(CAT, obj: &element, "Error while waiting for connection"); error!(CAT, obj: &element, "Error while waiting for connection");
return; return;
} }
@ -775,7 +775,7 @@ impl Receiver {
let res = match recv.capture(50) { let res = match recv.capture(50) {
_ if flushing => { _ if flushing => {
gst_debug!(CAT, obj: &element, "Flushing"); debug!(CAT, obj: &element, "Flushing");
Err(gst::FlowError::Flushing) Err(gst::FlowError::Flushing)
} }
Err(_) => { Err(_) => {
@ -787,11 +787,11 @@ impl Receiver {
Err(gst::FlowError::Error) Err(gst::FlowError::Error)
} }
Ok(None) if timeout > 0 && timer.elapsed().as_millis() >= timeout as u128 => { Ok(None) if timeout > 0 && timer.elapsed().as_millis() >= timeout as u128 => {
gst_debug!(CAT, obj: &element, "Timed out -- assuming EOS",); debug!(CAT, obj: &element, "Timed out -- assuming EOS",);
Err(gst::FlowError::Eos) Err(gst::FlowError::Eos)
} }
Ok(None) => { Ok(None) => {
gst_debug!(CAT, obj: &element, "No frame received yet, retry"); debug!(CAT, obj: &element, "No frame received yet, retry");
continue; continue;
} }
Ok(Some(Frame::Video(frame))) => { Ok(Some(Frame::Video(frame))) => {
@ -824,7 +824,7 @@ impl Receiver {
} }
Ok(Some(Frame::Metadata(frame))) => { Ok(Some(Frame::Metadata(frame))) => {
if let Some(metadata) = frame.metadata() { if let Some(metadata) = frame.metadata() {
gst_debug!( debug!(
CAT, CAT,
obj: &element, obj: &element,
"Received metadata at timecode {}: {}", "Received metadata at timecode {}: {}",
@ -841,7 +841,7 @@ impl Receiver {
Ok(item) => { Ok(item) => {
let mut queue = (receiver.0.queue.0).0.lock().unwrap(); let mut queue = (receiver.0.queue.0).0.lock().unwrap();
while queue.buffer_queue.len() > receiver.0.max_queue_length { while queue.buffer_queue.len() > receiver.0.max_queue_length {
gst_warning!( warning!(
CAT, CAT,
obj: &element, obj: &element,
"Dropping old buffer -- queue has {} items", "Dropping old buffer -- queue has {} items",
@ -854,7 +854,7 @@ impl Receiver {
timer = time::Instant::now(); timer = time::Instant::now();
} }
Err(gst::FlowError::Eos) => { Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: &element, "Signalling EOS"); debug!(CAT, obj: &element, "Signalling EOS");
let mut queue = (receiver.0.queue.0).0.lock().unwrap(); let mut queue = (receiver.0.queue.0).0.lock().unwrap();
queue.timeout = true; queue.timeout = true;
(receiver.0.queue.0).1.notify_one(); (receiver.0.queue.0).1.notify_one();
@ -868,7 +868,7 @@ impl Receiver {
timer = time::Instant::now(); timer = time::Instant::now();
} }
Err(err) => { Err(err) => {
gst_error!(CAT, obj: &element, "Signalling error"); error!(CAT, obj: &element, "Signalling error");
let mut queue = (receiver.0.queue.0).0.lock().unwrap(); let mut queue = (receiver.0.queue.0).0.lock().unwrap();
if queue.error.is_none() { if queue.error.is_none() {
queue.error = Some(err); queue.error = Some(err);
@ -898,7 +898,7 @@ impl Receiver {
}; };
let timecode = gst::ClockTime::from_nseconds(timecode as u64 * 100); let timecode = gst::ClockTime::from_nseconds(timecode as u64 * 100);
gst_log!( log!(
CAT, CAT,
obj: element, obj: element,
"Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
@ -927,7 +927,7 @@ impl Receiver {
TimestampMode::ReceiveTimeTimecode => match res_timecode { TimestampMode::ReceiveTimeTimecode => match res_timecode {
Some((pts, duration, discont)) => (pts, duration, discont), Some((pts, duration, discont)) => (pts, duration, discont),
None => { None => {
gst_warning!(CAT, obj: element, "Can't calculate timestamp"); warning!(CAT, obj: element, "Can't calculate timestamp");
(receive_time, duration, false) (receive_time, duration, false)
} }
}, },
@ -935,7 +935,7 @@ impl Receiver {
Some((pts, duration, discont)) => (pts, duration, discont), Some((pts, duration, discont)) => (pts, duration, discont),
None => { None => {
if timestamp.is_some() { if timestamp.is_some() {
gst_warning!(CAT, obj: element, "Can't calculate timestamp"); warning!(CAT, obj: element, "Can't calculate timestamp");
} }
(receive_time, duration, false) (receive_time, duration, false)
@ -966,7 +966,7 @@ impl Receiver {
} }
}; };
gst_log!( log!(
CAT, CAT,
obj: element, obj: element,
"Calculated PTS {}, duration {}", "Calculated PTS {}, duration {}",
@ -982,12 +982,12 @@ impl Receiver {
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
video_frame: VideoFrame, video_frame: VideoFrame,
) -> Result<Buffer, gst::FlowError> { ) -> Result<Buffer, gst::FlowError> {
gst_debug!(CAT, obj: element, "Received video frame {:?}", video_frame); debug!(CAT, obj: element, "Received video frame {:?}", video_frame);
let (pts, duration, discont) = self let (pts, duration, discont) = self
.calculate_video_timestamp(element, &video_frame) .calculate_video_timestamp(element, &video_frame)
.ok_or_else(|| { .ok_or_else(|| {
gst_debug!(CAT, obj: element, "Flushing, dropping buffer"); debug!(CAT, obj: element, "Flushing, dropping buffer");
gst::FlowError::Flushing gst::FlowError::Flushing
})?; })?;
@ -1001,7 +1001,7 @@ impl Receiver {
.set_flags(gst::BufferFlags::RESYNC); .set_flags(gst::BufferFlags::RESYNC);
} }
gst_log!(CAT, obj: element, "Produced video buffer {:?}", buffer); log!(CAT, obj: element, "Produced video buffer {:?}", buffer);
Ok(Buffer::Video(buffer, info)) Ok(Buffer::Video(buffer, info))
} }
@ -1203,7 +1203,7 @@ impl Receiver {
.contains(&fourcc) .contains(&fourcc)
{ {
let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { let compressed_packet = video_frame.compressed_packet().ok_or_else(|| {
gst_error!( error!(
CAT, CAT,
obj: element, obj: element,
"Video packet doesn't have compressed packet start" "Video packet doesn't have compressed packet start"
@ -1214,7 +1214,7 @@ impl Receiver {
})?; })?;
if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_H264 { if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_H264 {
gst_error!(CAT, obj: element, "Non-H264 video packet"); error!(CAT, obj: element, "Non-H264 video packet");
gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]); gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]);
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
@ -1241,7 +1241,7 @@ impl Receiver {
.contains(&fourcc) .contains(&fourcc)
{ {
let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { let compressed_packet = video_frame.compressed_packet().ok_or_else(|| {
gst_error!( error!(
CAT, CAT,
obj: element, obj: element,
"Video packet doesn't have compressed packet start" "Video packet doesn't have compressed packet start"
@ -1252,7 +1252,7 @@ impl Receiver {
})?; })?;
if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_HEVC { if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_HEVC {
gst_error!(CAT, obj: element, "Non-H265 video packet"); error!(CAT, obj: element, "Non-H265 video packet");
gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]); gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]);
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
@ -1483,7 +1483,7 @@ impl Receiver {
#[cfg(feature = "advanced-sdk")] #[cfg(feature = "advanced-sdk")]
VideoInfo::SpeedHQInfo { .. } => { VideoInfo::SpeedHQInfo { .. } => {
let data = video_frame.data().ok_or_else(|| { let data = video_frame.data().ok_or_else(|| {
gst_error!(CAT, obj: element, "Video packet has no data"); error!(CAT, obj: element, "Video packet has no data");
gst::element_error!( gst::element_error!(
element, element,
gst::StreamError::Format, gst::StreamError::Format,
@ -1498,7 +1498,7 @@ impl Receiver {
#[cfg(feature = "advanced-sdk")] #[cfg(feature = "advanced-sdk")]
VideoInfo::H264Info { .. } | VideoInfo::H265Info { .. } => { VideoInfo::H264Info { .. } | VideoInfo::H265Info { .. } => {
let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { let compressed_packet = video_frame.compressed_packet().ok_or_else(|| {
gst_error!( error!(
CAT, CAT,
obj: element, obj: element,
"Video packet doesn't have compressed packet start" "Video packet doesn't have compressed packet start"
@ -1533,12 +1533,12 @@ impl Receiver {
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
audio_frame: AudioFrame, audio_frame: AudioFrame,
) -> Result<Buffer, gst::FlowError> { ) -> Result<Buffer, gst::FlowError> {
gst_debug!(CAT, obj: element, "Received audio frame {:?}", audio_frame); debug!(CAT, obj: element, "Received audio frame {:?}", audio_frame);
let (pts, duration, discont) = self let (pts, duration, discont) = self
.calculate_audio_timestamp(element, &audio_frame) .calculate_audio_timestamp(element, &audio_frame)
.ok_or_else(|| { .ok_or_else(|| {
gst_debug!(CAT, obj: element, "Flushing, dropping buffer"); debug!(CAT, obj: element, "Flushing, dropping buffer");
gst::FlowError::Flushing gst::FlowError::Flushing
})?; })?;
@ -1552,7 +1552,7 @@ impl Receiver {
.set_flags(gst::BufferFlags::RESYNC); .set_flags(gst::BufferFlags::RESYNC);
} }
gst_log!(CAT, obj: element, "Produced audio buffer {:?}", buffer); log!(CAT, obj: element, "Produced audio buffer {:?}", buffer);
Ok(Buffer::Audio(buffer, info)) Ok(Buffer::Audio(buffer, info))
} }
@ -1608,7 +1608,7 @@ impl Receiver {
use std::convert::TryInto; use std::convert::TryInto;
let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| { let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| {
gst_error!( error!(
CAT, CAT,
obj: element, obj: element,
"Audio packet doesn't have compressed packet start" "Audio packet doesn't have compressed packet start"
@ -1619,7 +1619,7 @@ impl Receiver {
})?; })?;
if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_AAC { if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_AAC {
gst_error!(CAT, obj: element, "Non-AAC audio packet"); error!(CAT, obj: element, "Non-AAC audio packet");
gst::element_error!(element, gst::StreamError::Format, ["Invalid audio packet"]); gst::element_error!(element, gst::StreamError::Format, ["Invalid audio packet"]);
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
@ -1717,7 +1717,7 @@ impl Receiver {
#[cfg(feature = "advanced-sdk")] #[cfg(feature = "advanced-sdk")]
AudioInfo::OpusInfo { .. } => { AudioInfo::OpusInfo { .. } => {
let data = audio_frame.data().ok_or_else(|| { let data = audio_frame.data().ok_or_else(|| {
gst_error!(CAT, obj: element, "Audio packet has no data"); error!(CAT, obj: element, "Audio packet has no data");
gst::element_error!( gst::element_error!(
element, element,
gst::StreamError::Format, gst::StreamError::Format,
@ -1732,7 +1732,7 @@ impl Receiver {
#[cfg(feature = "advanced-sdk")] #[cfg(feature = "advanced-sdk")]
AudioInfo::AacInfo { .. } => { AudioInfo::AacInfo { .. } => {
let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| { let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| {
gst_error!( error!(
CAT, CAT,
obj: element, obj: element,
"Audio packet doesn't have compressed packet start" "Audio packet doesn't have compressed packet start"