From 34858762f7c654dd4d0061c5683ee647a608c540 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 17 Jul 2019 10:40:26 +0300 Subject: [PATCH] Consolidate element properties stream-name is called ndi-name everywhere in the NDI SDK and documentation ip is called ip-address everywhere Rename loss-threshold to timeout and change it to be in milliseconds instead of iterations. Add connect-timeout for timeout during connection Add bandwidth and receiver-ndi-name properties, and initialize the latter with a reasonable default value. --- README.md | 13 ++- src/lib.rs | 68 ++++++++------- src/ndi.rs | 76 ++++++++++++++--- src/ndiaudiosrc.rs | 198 ++++++++++++++++++++++++++++++------------ src/ndisys.rs | 14 ++- src/ndivideosrc.rs | 208 +++++++++++++++++++++++++++++++++------------ 6 files changed, 406 insertions(+), 171 deletions(-) diff --git a/README.md b/README.md index d9694a9b..86b0fa58 100644 --- a/README.md +++ b/README.md @@ -16,12 +16,12 @@ gst-inspect-1.0 ndivideosrc gst-inspect-1.0 ndiaudiosrc #Video pipeline -gst-launch-1.0 ndivideosrc stream-name="GC-DEV2 (OBS)" ! autovideosink +gst-launch-1.0 ndivideosrc ndi-name="GC-DEV2 (OBS)" ! autovideosink #Audio pipeline -gst-launch-1.0 ndiaudiosrc stream-name="GC-DEV2 (OBS)" ! autoaudiosink +gst-launch-1.0 ndiaudiosrc ndi-name="GC-DEV2 (OBS)" ! autoaudiosink #Video and audio pipeline -gst-launch-1.0 ndivideosrc stream-name="GC-DEV2 (OBS)" ! autovideosink ndiaudiosrc stream-name="GC-DEV2 (OBS)" ! autoaudiosink +gst-launch-1.0 ndivideosrc ndi-name="GC-DEV2 (OBS)" ! autovideosink ndiaudiosrc ndi-name="GC-DEV2 (OBS)" ! autoaudiosink ``` Feel free to contribute to this project. Some ways you can contribute are: @@ -63,11 +63,8 @@ gst-inspect-1.0 ndi More info about GStreamer plugins written in Rust: ---------------------------------- -https://github.com/sdroege/gstreamer-rs -https://github.com/sdroege/gst-plugin-rs - -https://coaxion.net/blog/2018/01/how-to-write-gstreamer-elements-in-rust-part-1-a-video-filter-for-converting-rgb-to-grayscale/ -https://coaxion.net/blog/2018/02/how-to-write-gstreamer-elements-in-rust-part-2-a-raw-audio-sine-wave-source/ +https://gitlab.freedesktop.org/gstreamer/gstreamer-rs +https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs License diff --git a/src/lib.rs b/src/lib.rs index 4bef1639..3ef1ee65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ use ndisys::*; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; +use std::time; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] #[repr(u32)] @@ -44,8 +45,8 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { struct ReceiverInfo { id: usize, - stream_name: String, - ip: String, + ndi_name: String, + ip_address: String, video: bool, audio: bool, ndi_instance: RecvInstance, @@ -57,6 +58,10 @@ lazy_static! { Mutex::new(m) }; + static ref DEFAULT_RECEIVER_NDI_NAME: String = { + format!("GStreamer NDI Source {}-{}", env!("CARGO_PKG_VERSION"), env!("COMMIT_ID")) + }; + #[cfg(feature = "reference-timestamps")] static ref TIMECODE_CAPS: gst::Caps = { gst::Caps::new_simple("timestamp/x-ndi-timecode", &[]) @@ -73,8 +78,11 @@ static ID_RECEIVER: AtomicUsize = AtomicUsize::new(0); fn connect_ndi( cat: gst::DebugCategory, element: &gst_base::BaseSrc, - ip: &str, - stream_name: &str, + ip_address: Option<&str>, + ndi_name: Option<&str>, + receiver_ndi_name: &str, + connect_timeout: u32, + bandwidth: NDIlib_recv_bandwidth_e, ) -> Option { gst_debug!(cat, obj: element, "Starting NDI connection..."); @@ -83,7 +91,7 @@ fn connect_ndi( let video = element.get_type() == ndivideosrc::NdiVideoSrc::get_type(); for val in receivers.values_mut() { - if val.ip == ip || val.stream_name == stream_name { + if Some(val.ip_address.as_str()) == ip_address || Some(val.ndi_name.as_str()) == ndi_name { if (val.video || !video) && (val.audio || video) { continue; } else { @@ -109,47 +117,45 @@ fn connect_ndi( Some(find) => find, }; - // TODO Sleep 1s to wait for all sources - find.wait_for_sources(2000); + let timeout = time::Instant::now(); + let source = loop { + find.wait_for_sources(50); - let sources = find.get_current_sources(); + let sources = find.get_current_sources(); - // We need at least one source - if sources.is_empty() { - gst_element_error!( - element, - gst::CoreError::Negotiation, - ["Error getting NDIlib_find_get_current_sources"] + gst_debug!( + cat, + obj: element, + "Total sources found in network {}", + sources.len(), ); - return None; - } - let source = sources - .iter() - .find(|s| s.ndi_name() == stream_name || s.ip_address() == ip); + let source = sources + .iter() + .find(|s| Some(s.ndi_name()) == ndi_name || Some(s.ip_address()) == ip_address); - let source = match source { - None => { - gst_element_error!(element, gst::ResourceError::OpenRead, ["Stream not found"]); + if let Some(source) = source { + break source.to_owned(); + } + + if timeout.elapsed().as_millis() >= connect_timeout as u128 { + gst_element_error!(element, gst::ResourceError::NotFound, ["Stream not found"]); return None; } - Some(source) => source, }; gst_debug!( cat, obj: element, - "Total sources in network {}: Connecting to NDI source with name '{}' and address '{}'", - sources.len(), + "Connecting to NDI source with ndi-name '{}' and ip-address '{}'", source.ndi_name(), source.ip_address(), ); - // FIXME: Property for the name and bandwidth // FIXME: Ideally we would use NDIlib_recv_color_format_fastest here but that seems to be // broken with interlaced content currently - let recv = RecvInstance::builder(&source, "Galicaster NDI Receiver") - .bandwidth(NDIlib_recv_bandwidth_e::NDIlib_recv_bandwidth_highest) + let recv = RecvInstance::builder(&source, receiver_ndi_name) + .bandwidth(bandwidth) .color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA) .allow_video_fields(true) .build(); @@ -174,12 +180,12 @@ fn connect_ndi( receivers.insert( id_receiver, ReceiverInfo { - stream_name: source.ndi_name().to_owned(), - ip: source.ip_address().to_owned(), + id: id_receiver, + ndi_name: source.ndi_name().to_owned(), + ip_address: source.ip_address().to_owned(), video, audio: !video, ndi_instance: recv, - id: id_receiver, }, ); diff --git a/src/ndi.rs b/src/ndi.rs index 6f7d6724..46465224 100644 --- a/src/ndi.rs +++ b/src/ndi.rs @@ -94,7 +94,7 @@ impl FindInstance { let mut sources = vec![]; for i in 0..no_sources { - sources.push(Source( + sources.push(Source::Borrowed( ptr::NonNull::new(sources_ptr.add(i as usize) as *mut _).unwrap(), self, )); @@ -114,26 +114,76 @@ impl Drop for FindInstance { } #[derive(Debug)] -pub struct Source<'a>(ptr::NonNull, &'a FindInstance); +pub enum Source<'a> { + Borrowed(ptr::NonNull, &'a FindInstance), + Owned(NDIlib_source_t, ffi::CString, ffi::CString), +} unsafe impl<'a> Send for Source<'a> {} impl<'a> Source<'a> { pub fn ndi_name(&self) -> &str { unsafe { - assert!(!self.0.as_ref().p_ndi_name.is_null()); - ffi::CStr::from_ptr(self.0.as_ref().p_ndi_name) - .to_str() - .unwrap() + let ptr = match *self { + Source::Borrowed(ptr, _) => &*ptr.as_ptr(), + Source::Owned(ref source, _, _) => source, + }; + + assert!(!ptr.p_ndi_name.is_null()); + ffi::CStr::from_ptr(ptr.p_ndi_name).to_str().unwrap() } } pub fn ip_address(&self) -> &str { unsafe { - assert!(!self.0.as_ref().p_ip_address.is_null()); - ffi::CStr::from_ptr(self.0.as_ref().p_ip_address) - .to_str() - .unwrap() + let ptr = match *self { + Source::Borrowed(ptr, _) => &*ptr.as_ptr(), + Source::Owned(ref source, _, _) => source, + }; + + assert!(!ptr.p_ip_address.is_null()); + ffi::CStr::from_ptr(ptr.p_ip_address).to_str().unwrap() + } + } + + fn ndi_name_ptr(&self) -> *const ::std::os::raw::c_char { + unsafe { + match *self { + Source::Borrowed(ptr, _) => ptr.as_ref().p_ndi_name, + Source::Owned(_, ref ndi_name, _) => ndi_name.as_ptr(), + } + } + } + + fn ip_address_ptr(&self) -> *const ::std::os::raw::c_char { + unsafe { + match *self { + Source::Borrowed(ptr, _) => ptr.as_ref().p_ip_address, + Source::Owned(_, _, ref ip_address) => ip_address.as_ptr(), + } + } + } + + pub fn to_owned<'b>(&self) -> Source<'b> { + unsafe { + let (ndi_name, ip_address) = match *self { + Source::Borrowed(ptr, _) => (ptr.as_ref().p_ndi_name, ptr.as_ref().p_ip_address), + Source::Owned(_, ref ndi_name, ref ip_address) => { + (ndi_name.as_ptr(), ip_address.as_ptr()) + } + }; + + let ndi_name = ffi::CString::new(ffi::CStr::from_ptr(ndi_name).to_bytes()).unwrap(); + let ip_address = ffi::CString::new(ffi::CStr::from_ptr(ip_address).to_bytes()).unwrap(); + + Source::Owned( + NDIlib_source_t { + p_ndi_name: ndi_name.as_ptr(), + p_ip_address: ip_address.as_ptr(), + }, + ndi_name, + ip_address, + ) } } } @@ -171,8 +221,8 @@ impl<'a> RecvBuilder<'a> { let ndi_name = ffi::CString::new(self.ndi_name).unwrap(); let ptr = NDIlib_recv_create_v3(&NDIlib_recv_create_v3_t { source_to_connect_to: NDIlib_source_t { - p_ndi_name: self.source_to_connect_to.0.as_ref().p_ndi_name, - p_ip_address: self.source_to_connect_to.0.as_ref().p_ip_address, + p_ndi_name: self.source_to_connect_to.ndi_name_ptr(), + p_ip_address: self.source_to_connect_to.ip_address_ptr(), }, allow_video_fields: self.allow_video_fields, bandwidth: self.bandwidth, @@ -198,7 +248,7 @@ impl RecvInstance { RecvBuilder { source_to_connect_to, allow_video_fields: true, - bandwidth: NDIlib_recv_bandwidth_e::NDIlib_recv_bandwidth_highest, + bandwidth: NDIlib_recv_bandwidth_highest, color_format: NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA, ndi_name, } diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc.rs index 897f3342..bfadbcdd 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc.rs @@ -10,6 +10,7 @@ use gst_base::prelude::*; use gst_base::subclass::prelude::*; use std::sync::Mutex; +use std::time; use std::{i32, u32}; use connect_ndi; @@ -18,6 +19,7 @@ use ndisys; use stop_ndi; use TimestampMode; +use DEFAULT_RECEIVER_NDI_NAME; use HASHMAP_RECEIVERS; #[cfg(feature = "reference-timestamps")] use TIMECODE_CAPS; @@ -28,50 +30,87 @@ use byte_slice_cast::AsMutSliceOf; #[derive(Debug, Clone)] struct Settings { - stream_name: String, - ip: String, - loss_threshold: u32, + ndi_name: Option, + ip_address: Option, + connect_timeout: u32, + timeout: u32, + receiver_ndi_name: String, + bandwidth: ndisys::NDIlib_recv_bandwidth_e, timestamp_mode: TimestampMode, } impl Default for Settings { fn default() -> Self { Settings { - stream_name: String::from("Fixed ndi stream name"), - ip: String::from(""), - loss_threshold: 5, + ndi_name: None, + ip_address: None, + receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(), + connect_timeout: 10000, + timeout: 5000, + bandwidth: ndisys::NDIlib_recv_bandwidth_highest, timestamp_mode: TimestampMode::ReceiveTime, } } } -static PROPERTIES: [subclass::Property; 4] = [ - subclass::Property("stream-name", |name| { +static PROPERTIES: [subclass::Property; 7] = [ + subclass::Property("ndi-name", |name| { glib::ParamSpec::string( name, - "Stream Name", - "Name of the streaming device", + "NDI Name", + "NDI stream name of the sender", None, glib::ParamFlags::READWRITE, ) }), - subclass::Property("ip", |name| { + subclass::Property("ip-address", |name| { glib::ParamSpec::string( name, - "Stream IP", - "IP of the streaming device. Ex: 127.0.0.1:5961", + "IP Address", + "IP address and port of the sender, e.g. 127.0.0.1:5961", None, glib::ParamFlags::READWRITE, ) }), - subclass::Property("loss-threshold", |name| { + subclass::Property("receiver-ndi-name", |name| { + glib::ParamSpec::string( + name, + "Receiver NDI Name", + "NDI stream name of this receiver", + Some(&*DEFAULT_RECEIVER_NDI_NAME), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("connect-timeout", |name| { glib::ParamSpec::uint( name, - "Loss threshold", - "Loss threshold", + "Connect Timeout", + "Connection timeout in ms", 0, - 60, - 5, + u32::MAX, + 10000, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("timeout", |name| { + glib::ParamSpec::uint( + name, + "Timeout", + "Receive timeout in ms", + 0, + u32::MAX, + 5000, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bandwidth", |name| { + glib::ParamSpec::int( + name, + "Bandwidth", + "Bandwidth, -10 metadata-only, 10 audio-only, 100 highest", + -10, + 100, + 100, glib::ParamFlags::READWRITE, ) }), @@ -134,7 +173,7 @@ impl ObjectSubclass for NdiAudioSrc { "NewTek NDI Audio Source", "Source", "NewTek NDI audio source", - "Ruben Gonzalez , Daniel Vilar ", + "Ruben Gonzalez , Daniel Vilar , Sebastian Dröge ", ); let caps = gst::Caps::new_simple( @@ -181,41 +220,78 @@ impl ObjectImpl for NdiAudioSrc { let basesrc = obj.downcast_ref::().unwrap(); match *prop { - subclass::Property("stream-name", ..) => { + subclass::Property("ndi-name", ..) => { let mut settings = self.settings.lock().unwrap(); - let stream_name = value.get().unwrap(); + let ndi_name = value.get(); gst_debug!( self.cat, obj: basesrc, - "Changing stream-name from {} to {}", - settings.stream_name, - stream_name + "Changing ndi-name from {:?} to {:?}", + settings.ndi_name, + ndi_name, ); - settings.stream_name = stream_name; + settings.ndi_name = ndi_name; } - subclass::Property("ip", ..) => { + subclass::Property("ip-address", ..) => { let mut settings = self.settings.lock().unwrap(); - let ip = value.get().unwrap(); + let ip_address = value.get(); gst_debug!( self.cat, obj: basesrc, - "Changing ip from {} to {}", - settings.ip, - ip + "Changing ip from {:?} to {:?}", + settings.ip_address, + ip_address, ); - settings.ip = ip; + settings.ip_address = ip_address; } - subclass::Property("loss-threshold", ..) => { + subclass::Property("receiver-ndi-name", ..) => { let mut settings = self.settings.lock().unwrap(); - let loss_threshold = value.get().unwrap(); + let receiver_ndi_name = value.get(); gst_debug!( self.cat, obj: basesrc, - "Changing loss threshold from {} to {}", - settings.loss_threshold, - loss_threshold + "Changing receiver-ndi-name from {:?} to {:?}", + settings.receiver_ndi_name, + receiver_ndi_name, ); - settings.loss_threshold = loss_threshold; + settings.receiver_ndi_name = + receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone()); + } + subclass::Property("connect-timeout", ..) => { + let mut settings = self.settings.lock().unwrap(); + let connect_timeout = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing connect-timeout from {} to {}", + settings.connect_timeout, + connect_timeout, + ); + settings.connect_timeout = connect_timeout; + } + subclass::Property("timeout", ..) => { + let mut settings = self.settings.lock().unwrap(); + let timeout = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing timeout from {} to {}", + settings.timeout, + timeout, + ); + settings.timeout = timeout; + } + subclass::Property("bandwidth", ..) => { + let mut settings = self.settings.lock().unwrap(); + let bandwidth = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing bandwidth from {} to {}", + settings.bandwidth, + bandwidth, + ); + settings.bandwidth = bandwidth; } subclass::Property("timestamp-mode", ..) => { let mut settings = self.settings.lock().unwrap(); @@ -241,17 +317,29 @@ impl ObjectImpl for NdiAudioSrc { let prop = &PROPERTIES[id]; match *prop { - subclass::Property("stream-name", ..) => { + subclass::Property("ndi-name", ..) => { let settings = self.settings.lock().unwrap(); - Ok(settings.stream_name.to_value()) + Ok(settings.ndi_name.to_value()) } - subclass::Property("ip", ..) => { + subclass::Property("ip-address", ..) => { let settings = self.settings.lock().unwrap(); - Ok(settings.ip.to_value()) + Ok(settings.ip_address.to_value()) } - subclass::Property("loss-threshold", ..) => { + subclass::Property("receiver-ndi-name", ..) => { let settings = self.settings.lock().unwrap(); - Ok(settings.loss_threshold.to_value()) + Ok(settings.receiver_ndi_name.to_value()) + } + subclass::Property("connect-timeout", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.connect_timeout.to_value()) + } + subclass::Property("timeout", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.timeout.to_value()) + } + subclass::Property("bandwidth", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.bandwidth.to_value()) } subclass::Property("timestamp-mode", ..) => { let settings = self.settings.lock().unwrap(); @@ -270,11 +358,15 @@ impl BaseSrcImpl for NdiAudioSrc { let settings = self.settings.lock().unwrap().clone(); let mut state = self.state.lock().unwrap(); + state.id_receiver = connect_ndi( self.cat, element, - &settings.ip.clone(), - &settings.stream_name.clone(), + settings.ip_address.as_ref().map(String::as_str), + settings.ndi_name.as_ref().map(String::as_str), + &settings.receiver_ndi_name, + settings.connect_timeout, + settings.bandwidth, ); match state.id_receiver { @@ -356,11 +448,11 @@ impl BaseSrcImpl for NdiAudioSrc { let clock = element.get_clock().unwrap(); - let mut count_frame_none = 0; + let timeout = time::Instant::now(); let audio_frame = loop { // FIXME: make interruptable let res = loop { - match recv.capture(false, true, false, 1000) { + match recv.capture(false, true, false, 50) { Err(_) => break Err(()), Ok(None) => break Ok(None), Ok(Some(Frame::Audio(frame))) => break Ok(Some(frame)), @@ -373,17 +465,11 @@ impl BaseSrcImpl for NdiAudioSrc { gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]); return Err(gst::FlowError::Error); } - Ok(None) if settings.loss_threshold != 0 => { - if count_frame_none < settings.loss_threshold { - count_frame_none += 1; - continue; - } - gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); - return Err(gst::FlowError::Error); + Ok(None) if timeout.elapsed().as_millis() >= settings.timeout as u128 => { + return Err(gst::FlowError::Eos); } Ok(None) => { - gst_debug!(self.cat, obj: element, "No audio frame received, retry"); - count_frame_none += 1; + gst_debug!(self.cat, obj: element, "No audio frame received yet, retry"); continue; } Ok(Some(frame)) => frame, diff --git a/src/ndisys.rs b/src/ndisys.rs index 70bd1450..67399d0f 100644 --- a/src/ndisys.rs +++ b/src/ndisys.rs @@ -88,14 +88,12 @@ pub enum NDIlib_frame_type_e { NDIlib_frame_type_status_change = 100, } -#[repr(i32)] -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub enum NDIlib_recv_bandwidth_e { - NDIlib_recv_bandwidth_metadata_only = -10, - NDIlib_recv_bandwidth_audio_only = 10, - NDIlib_recv_bandwidth_lowest = 0, - NDIlib_recv_bandwidth_highest = 100, -} +pub type NDIlib_recv_bandwidth_e = i32; + +pub const NDIlib_recv_bandwidth_metadata_only: NDIlib_recv_bandwidth_e = -10; +pub const NDIlib_recv_bandwidth_audio_only: NDIlib_recv_bandwidth_e = 10; +pub const NDIlib_recv_bandwidth_lowest: NDIlib_recv_bandwidth_e = 0; +pub const NDIlib_recv_bandwidth_highest: NDIlib_recv_bandwidth_e = 100; #[repr(u32)] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] diff --git a/src/ndivideosrc.rs b/src/ndivideosrc.rs index 33e991c1..2fda6da5 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc.rs @@ -12,6 +12,7 @@ use gst_video; use gst_video::prelude::*; use std::sync::Mutex; +use std::time; use std::{i32, u32}; use ndi::*; @@ -21,6 +22,7 @@ use connect_ndi; use stop_ndi; use TimestampMode; +use DEFAULT_RECEIVER_NDI_NAME; use HASHMAP_RECEIVERS; #[cfg(feature = "reference-timestamps")] use TIMECODE_CAPS; @@ -29,50 +31,87 @@ use TIMESTAMP_CAPS; #[derive(Debug, Clone)] struct Settings { - stream_name: String, - ip: String, - loss_threshold: u32, + ndi_name: Option, + ip_address: Option, + connect_timeout: u32, + timeout: u32, + receiver_ndi_name: String, + bandwidth: ndisys::NDIlib_recv_bandwidth_e, timestamp_mode: TimestampMode, } impl Default for Settings { fn default() -> Self { Settings { - stream_name: String::from("Fixed ndi stream name"), - ip: String::from(""), - loss_threshold: 5, + ndi_name: None, + ip_address: None, + receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(), + connect_timeout: 10000, + timeout: 5000, + bandwidth: ndisys::NDIlib_recv_bandwidth_highest, timestamp_mode: TimestampMode::ReceiveTime, } } } -static PROPERTIES: [subclass::Property; 4] = [ - subclass::Property("stream-name", |name| { +static PROPERTIES: [subclass::Property; 7] = [ + subclass::Property("ndi-name", |name| { glib::ParamSpec::string( name, - "Stream Name", - "Name of the streaming device", + "NDI Name", + "NDI stream name of the sender", None, glib::ParamFlags::READWRITE, ) }), - subclass::Property("ip", |name| { + subclass::Property("ip-address", |name| { glib::ParamSpec::string( name, - "Stream IP", - "IP of the streaming device. Ex: 127.0.0.1:5961", + "IP Address", + "IP address and port of the sender, e.g. 127.0.0.1:5961", None, glib::ParamFlags::READWRITE, ) }), - subclass::Property("loss-threshold", |name| { + subclass::Property("receiver-ndi-name", |name| { + glib::ParamSpec::string( + name, + "Receiver NDI Name", + "NDI stream name of this receiver", + Some(&*DEFAULT_RECEIVER_NDI_NAME), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("connect-timeout", |name| { glib::ParamSpec::uint( name, - "Loss threshold", - "Loss threshold", + "Connect Timeout", + "Connection timeout in ms", 0, - 60, - 5, + u32::MAX, + 10000, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("timeout", |name| { + glib::ParamSpec::uint( + name, + "Timeout", + "Receive timeout in ms", + 0, + u32::MAX, + 5000, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bandwidth", |name| { + glib::ParamSpec::int( + name, + "Bandwidth", + "Bandwidth, -10 metadata-only, 10 audio-only, 100 highest", + -10, + 100, + 100, glib::ParamFlags::READWRITE, ) }), @@ -135,7 +174,7 @@ impl ObjectSubclass for NdiVideoSrc { "NewTek NDI Video Source", "Source", "NewTek NDI video source", - "Ruben Gonzalez , Daniel Vilar ", + "Ruben Gonzalez , Daniel Vilar , Sebastian Dröge ", ); // On the src pad, we can produce F32/F64 with any sample rate @@ -216,41 +255,78 @@ impl ObjectImpl for NdiVideoSrc { let basesrc = obj.downcast_ref::().unwrap(); match *prop { - subclass::Property("stream-name", ..) => { + subclass::Property("ndi-name", ..) => { let mut settings = self.settings.lock().unwrap(); - let stream_name = value.get().unwrap(); + let ndi_name = value.get(); gst_debug!( self.cat, obj: basesrc, - "Changing stream-name from {} to {}", - settings.stream_name, - stream_name + "Changing ndi-name from {:?} to {:?}", + settings.ndi_name, + ndi_name, ); - settings.stream_name = stream_name; + settings.ndi_name = ndi_name; } - subclass::Property("ip", ..) => { + subclass::Property("ip-address", ..) => { let mut settings = self.settings.lock().unwrap(); - let ip = value.get().unwrap(); + let ip_address = value.get(); gst_debug!( self.cat, obj: basesrc, - "Changing ip from {} to {}", - settings.ip, - ip + "Changing ip from {:?} to {:?}", + settings.ip_address, + ip_address, ); - settings.ip = ip; + settings.ip_address = ip_address; } - subclass::Property("loss-threshold", ..) => { + subclass::Property("receiver-ndi-name", ..) => { let mut settings = self.settings.lock().unwrap(); - let loss_threshold = value.get().unwrap(); + let receiver_ndi_name = value.get(); gst_debug!( self.cat, obj: basesrc, - "Changing loss threshold from {} to {}", - settings.loss_threshold, - loss_threshold + "Changing receiver-ndi-name from {:?} to {:?}", + settings.receiver_ndi_name, + receiver_ndi_name, ); - settings.loss_threshold = loss_threshold; + settings.receiver_ndi_name = + receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone()); + } + subclass::Property("connect-timeout", ..) => { + let mut settings = self.settings.lock().unwrap(); + let connect_timeout = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing connect-timeout from {} to {}", + settings.connect_timeout, + connect_timeout, + ); + settings.connect_timeout = connect_timeout; + } + subclass::Property("timeout", ..) => { + let mut settings = self.settings.lock().unwrap(); + let timeout = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing timeout from {} to {}", + settings.timeout, + timeout, + ); + settings.timeout = timeout; + } + subclass::Property("bandwidth", ..) => { + let mut settings = self.settings.lock().unwrap(); + let bandwidth = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing bandwidth from {} to {}", + settings.bandwidth, + bandwidth, + ); + settings.bandwidth = bandwidth; } subclass::Property("timestamp-mode", ..) => { let mut settings = self.settings.lock().unwrap(); @@ -276,17 +352,29 @@ impl ObjectImpl for NdiVideoSrc { let prop = &PROPERTIES[id]; match *prop { - subclass::Property("stream-name", ..) => { + subclass::Property("ndi-name", ..) => { let settings = self.settings.lock().unwrap(); - Ok(settings.stream_name.to_value()) + Ok(settings.ndi_name.to_value()) } - subclass::Property("ip", ..) => { + subclass::Property("ip-address", ..) => { let settings = self.settings.lock().unwrap(); - Ok(settings.ip.to_value()) + Ok(settings.ip_address.to_value()) } - subclass::Property("loss-threshold", ..) => { + subclass::Property("receiver-ndi-name", ..) => { let settings = self.settings.lock().unwrap(); - Ok(settings.loss_threshold.to_value()) + Ok(settings.receiver_ndi_name.to_value()) + } + subclass::Property("connect-timeout", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.connect_timeout.to_value()) + } + subclass::Property("timeout", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.timeout.to_value()) + } + subclass::Property("bandwidth", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.bandwidth.to_value()) } subclass::Property("timestamp-mode", ..) => { let settings = self.settings.lock().unwrap(); @@ -304,7 +392,23 @@ impl BaseSrcImpl for NdiVideoSrc { *self.state.lock().unwrap() = Default::default(); let mut state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap().clone(); - state.id_receiver = connect_ndi(self.cat, element, &settings.ip, &settings.stream_name); + + if settings.ip_address.is_none() && settings.ndi_name.is_none() { + return Err(gst_error_msg!( + gst::LibraryError::Settings, + ["No IP address or NDI name given"] + )); + } + + state.id_receiver = connect_ndi( + self.cat, + element, + settings.ip_address.as_ref().map(String::as_str), + settings.ndi_name.as_ref().map(String::as_str), + &settings.receiver_ndi_name, + settings.connect_timeout, + settings.bandwidth, + ); // settings.id_receiver exists match state.id_receiver { @@ -390,11 +494,11 @@ impl BaseSrcImpl for NdiVideoSrc { let clock = element.get_clock().unwrap(); - let mut count_frame_none = 0; + let timeout = time::Instant::now(); let video_frame = loop { // FIXME: make interruptable let res = loop { - match recv.capture(true, false, false, 1000) { + match recv.capture(true, false, false, 50) { Err(_) => break Err(()), Ok(None) => break Ok(None), Ok(Some(Frame::Video(frame))) => break Ok(Some(frame)), @@ -407,17 +511,11 @@ impl BaseSrcImpl for NdiVideoSrc { gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]); return Err(gst::FlowError::Error); } - Ok(None) if settings.loss_threshold != 0 => { - if count_frame_none < settings.loss_threshold { - count_frame_none += 1; - continue; - } - gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); - return Err(gst::FlowError::Error); + Ok(None) if timeout.elapsed().as_millis() >= settings.timeout as u128 => { + return Err(gst::FlowError::Eos); } Ok(None) => { - gst_debug!(self.cat, obj: element, "No video frame received, retry"); - count_frame_none += 1; + gst_debug!(self.cat, obj: element, "No video frame received yet, retry"); continue; } Ok(Some(frame)) => frame,