diff --git a/src/ndi.rs b/src/ndi.rs index 060e0a457..516631e17 100644 --- a/src/ndi.rs +++ b/src/ndi.rs @@ -149,24 +149,6 @@ impl<'a> Source<'a> { } } - fn ndi_name_ptr(&self) -> *const ::std::os::raw::c_char { - unsafe { - match *self { - Source::Borrowed(ptr, _) => ptr.as_ref().p_ndi_name, - Source::Owned(_, ref ndi_name, _) => ndi_name.as_ptr(), - } - } - } - - fn url_address_ptr(&self) -> *const ::std::os::raw::c_char { - unsafe { - match *self { - Source::Borrowed(ptr, _) => ptr.as_ref().p_url_address, - Source::Owned(_, _, ref url_address) => url_address.as_ptr(), - } - } - } - pub fn to_owned<'b>(&self) -> Source<'b> { unsafe { let (ndi_name, url_address) = match *self { @@ -200,11 +182,11 @@ impl<'a> PartialEq for Source<'a> { #[derive(Debug)] pub struct RecvBuilder<'a> { - source_to_connect_to: &'a Source<'a>, + source_to_connect_to: (&'a str, Option<&'a str>), allow_video_fields: bool, bandwidth: NDIlib_recv_bandwidth_e, color_format: NDIlib_recv_color_format_e, - ndi_name: &'a str, + ndi_recv_name: &'a str, } impl<'a> RecvBuilder<'a> { @@ -228,16 +210,25 @@ impl<'a> RecvBuilder<'a> { pub fn build(self) -> Option { unsafe { - let ndi_name = ffi::CString::new(self.ndi_name).unwrap(); + let ndi_recv_name = ffi::CString::new(self.ndi_recv_name).unwrap(); + let ndi_name = ffi::CString::new(self.source_to_connect_to.0).unwrap(); + let url_address = self + .source_to_connect_to + .1 + .as_ref() + .map(|s| ffi::CString::new(*s).unwrap()); let ptr = NDIlib_recv_create_v3(&NDIlib_recv_create_v3_t { source_to_connect_to: NDIlib_source_t { - p_ndi_name: self.source_to_connect_to.ndi_name_ptr(), - p_url_address: self.source_to_connect_to.url_address_ptr(), + p_ndi_name: ndi_name.as_ptr(), + p_url_address: url_address + .as_ref() + .map(|s| s.as_ptr()) + .unwrap_or_else(|| ptr::null_mut()), }, allow_video_fields: self.allow_video_fields, bandwidth: self.bandwidth, color_format: self.color_format, - p_ndi_recv_name: ndi_name.as_ptr(), + p_ndi_recv_name: ndi_recv_name.as_ptr(), }); if ptr.is_null() { @@ -266,13 +257,16 @@ unsafe impl Send for RecvInstanceInner {} unsafe impl Sync for RecvInstanceInner {} impl RecvInstance { - pub fn builder<'a>(source_to_connect_to: &'a Source, ndi_name: &'a str) -> RecvBuilder<'a> { + pub fn builder<'a>( + source_to_connect_to: (&'a str, Option<&'a str>), + ndi_recv_name: &'a str, + ) -> RecvBuilder<'a> { RecvBuilder { source_to_connect_to, allow_video_fields: true, bandwidth: NDIlib_recv_bandwidth_highest, color_format: NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA, - ndi_name, + ndi_recv_name, } } diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc.rs index 47a41fdc7..884436d56 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc.rs @@ -24,6 +24,7 @@ use crate::DEFAULT_RECEIVER_NDI_NAME; #[derive(Debug, Clone)] struct Settings { ndi_name: Option, + url_address: Option, connect_timeout: u32, timeout: u32, receiver_ndi_name: String, @@ -35,6 +36,7 @@ impl Default for Settings { fn default() -> Self { Settings { ndi_name: None, + url_address: None, receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(), connect_timeout: 10000, timeout: 5000, @@ -44,7 +46,7 @@ impl Default for Settings { } } -static PROPERTIES: [subclass::Property; 6] = [ +static PROPERTIES: [subclass::Property; 7] = [ subclass::Property("ndi-name", |name| { glib::ParamSpec::string( name, @@ -54,6 +56,15 @@ static PROPERTIES: [subclass::Property; 6] = [ glib::ParamFlags::READWRITE, ) }), + subclass::Property("url-address", |name| { + glib::ParamSpec::string( + name, + "URL/Address", + "URL/address and port of the sender, e.g. 127.0.0.1:5961. This is used as an additional filter together with the NDI name.", + None, + glib::ParamFlags::READWRITE, + ) + }), subclass::Property("receiver-ndi-name", |name| { glib::ParamSpec::string( name, @@ -216,6 +227,18 @@ impl ObjectImpl for NdiAudioSrc { ); settings.ndi_name = ndi_name; } + subclass::Property("url-address", ..) => { + let mut settings = self.settings.lock().unwrap(); + let url_address = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing url-address from {:?} to {:?}", + settings.url_address, + url_address, + ); + settings.url_address = url_address; + } subclass::Property("receiver-ndi-name", ..) => { let mut settings = self.settings.lock().unwrap(); let receiver_ndi_name = value.get().unwrap(); @@ -293,6 +316,10 @@ impl ObjectImpl for NdiAudioSrc { let settings = self.settings.lock().unwrap(); Ok(settings.ndi_name.to_value()) } + subclass::Property("url-address", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.url_address.to_value()) + } subclass::Property("receiver-ndi-name", ..) => { let settings = self.settings.lock().unwrap(); Ok(settings.receiver_ndi_name.to_value()) @@ -374,19 +401,18 @@ impl BaseSrcImpl for NdiAudioSrc { *self.state.lock().unwrap() = Default::default(); let settings = self.settings.lock().unwrap().clone(); - let ndi_name = if let Some(ref ndi_name) = settings.ndi_name { - ndi_name - } else { + if settings.ndi_name.is_none() { return Err(gst_error_msg!( gst::LibraryError::Settings, ["No IP address or NDI name given"] )); - }; + } let receiver = connect_ndi( self.cat, element, - ndi_name, + settings.ndi_name.as_ref().unwrap().as_str(), + settings.url_address.as_ref().map(String::as_str), &settings.receiver_ndi_name, settings.connect_timeout, settings.bandwidth, diff --git a/src/ndivideosrc.rs b/src/ndivideosrc.rs index 359ede6f3..c4d048bf1 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc.rs @@ -25,6 +25,7 @@ use crate::DEFAULT_RECEIVER_NDI_NAME; #[derive(Debug, Clone)] struct Settings { ndi_name: Option, + url_address: Option, connect_timeout: u32, timeout: u32, receiver_ndi_name: String, @@ -36,6 +37,7 @@ impl Default for Settings { fn default() -> Self { Settings { ndi_name: None, + url_address: None, receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(), connect_timeout: 10000, timeout: 5000, @@ -45,7 +47,7 @@ impl Default for Settings { } } -static PROPERTIES: [subclass::Property; 6] = [ +static PROPERTIES: [subclass::Property; 7] = [ subclass::Property("ndi-name", |name| { glib::ParamSpec::string( name, @@ -55,6 +57,15 @@ static PROPERTIES: [subclass::Property; 6] = [ glib::ParamFlags::READWRITE, ) }), + subclass::Property("url-address", |name| { + glib::ParamSpec::string( + name, + "URL/Address", + "URL/address and port of the sender, e.g. 127.0.0.1:5961. This is used as an additional filter together with the NDI name.", + None, + glib::ParamFlags::READWRITE, + ) + }), subclass::Property("receiver-ndi-name", |name| { glib::ParamSpec::string( name, @@ -251,6 +262,18 @@ impl ObjectImpl for NdiVideoSrc { ); settings.ndi_name = ndi_name; } + subclass::Property("url-address", ..) => { + let mut settings = self.settings.lock().unwrap(); + let url_address = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing url-address from {:?} to {:?}", + settings.url_address, + url_address, + ); + settings.url_address = url_address; + } subclass::Property("receiver-ndi-name", ..) => { let mut settings = self.settings.lock().unwrap(); let receiver_ndi_name = value.get().unwrap(); @@ -328,6 +351,10 @@ impl ObjectImpl for NdiVideoSrc { let settings = self.settings.lock().unwrap(); Ok(settings.ndi_name.to_value()) } + subclass::Property("url-address", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.url_address.to_value()) + } subclass::Property("receiver-ndi-name", ..) => { let settings = self.settings.lock().unwrap(); Ok(settings.receiver_ndi_name.to_value()) @@ -409,19 +436,18 @@ impl BaseSrcImpl for NdiVideoSrc { *self.state.lock().unwrap() = Default::default(); let settings = self.settings.lock().unwrap().clone(); - let ndi_name = if let Some(ref ndi_name) = settings.ndi_name { - ndi_name - } else { + if settings.ndi_name.is_none() { return Err(gst_error_msg!( gst::LibraryError::Settings, - ["No IP address or NDI name given"] + ["No NDI name given"] )); - }; + } let receiver = connect_ndi( self.cat, element, - ndi_name, + settings.ndi_name.as_ref().unwrap().as_str(), + settings.url_address.as_ref().map(String::as_str), &settings.receiver_ndi_name, settings.connect_timeout, settings.bandwidth, diff --git a/src/receiver.rs b/src/receiver.rs index 3b8dab30e..371cdf2b4 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -9,28 +9,20 @@ use byte_slice_cast::AsMutSliceOf; use std::cmp; use std::collections::VecDeque; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex, Weak}; use std::thread; use super::*; -enum ReceiverInfo { - Connecting { - id: usize, - ndi_name: String, - video: Option>>, - audio: Option>>, - observations: Observations, - }, - Connected { - id: usize, - ndi_name: String, - recv: RecvInstance, - video: Option>>, - audio: Option>>, - observations: Observations, - }, +pub struct ReceiverInfo { + id: usize, + ndi_name: String, + url_address: Option, + recv: RecvInstance, + video: Option>>, + audio: Option>>, + observations: Observations, } lazy_static! { @@ -81,15 +73,17 @@ pub struct ReceiverInner { queue: ReceiverQueue, - recv: Mutex>, - recv_cond: Condvar, + recv: Mutex, observations: Observations, cat: gst::DebugCategory, element: glib::WeakRef, timestamp_mode: TimestampMode, + + first_frame: AtomicBool, timeout: u32, + connect_timeout: u32, thread: Mutex>>, } @@ -385,32 +379,15 @@ impl Receiver { info: &mut ReceiverInfo, timestamp_mode: TimestampMode, timeout: u32, + connect_timeout: u32, element: &gst_base::BaseSrc, cat: gst::DebugCategory, ) -> Self where Receiver: ReceiverCapture, { - let (id, storage_video, storage_audio, recv, observations) = match info { - ReceiverInfo::Connecting { - id, - ref observations, - ref mut audio, - ref mut video, - .. - } => (*id, video, audio, None, observations), - ReceiverInfo::Connected { - id, - ref mut recv, - ref observations, - ref mut audio, - ref mut video, - .. - } => (*id, video, audio, Some(recv.clone()), observations), - }; - let receiver = Receiver(Arc::new(ReceiverInner { - id, + id: info.id, queue: ReceiverQueue(Arc::new(( Mutex::new(ReceiverQueueInner { capturing: true, @@ -422,13 +399,14 @@ impl Receiver { }), Condvar::new(), ))), - recv: Mutex::new(recv), - recv_cond: Condvar::new(), - observations: observations.clone(), + recv: Mutex::new(info.recv.clone()), + observations: info.observations.clone(), cat, element: element.downgrade(), timestamp_mode, + first_frame: AtomicBool::new(true), timeout, + connect_timeout, thread: Mutex::new(None), })); @@ -459,7 +437,7 @@ impl Receiver { }); let weak = Arc::downgrade(&receiver.0); - Self::store_internal(storage_video, storage_audio, weak); + Self::store_internal(info, weak); *receiver.0.thread.lock().unwrap() = Some(thread); @@ -522,24 +500,12 @@ impl Drop for ReceiverInner { let mut receivers = HASHMAP_RECEIVERS.lock().unwrap(); { - let val = receivers.get_mut(&self.id).unwrap(); - let (audio, video) = match val { - ReceiverInfo::Connecting { - ref mut audio, - ref mut video, - .. - } => (audio, video), - ReceiverInfo::Connected { - ref mut audio, - ref mut video, - .. - } => (audio, video), - }; - if video.is_some() && audio.is_some() { + let receiver = receivers.get_mut(&self.id).unwrap(); + if receiver.audio.is_some() && receiver.video.is_some() { if T::IS_VIDEO { - *video = None; + receiver.video = None; } else { - *audio = None; + receiver.audio = None; } return; } @@ -556,6 +522,7 @@ pub fn connect_ndi( cat: gst::DebugCategory, element: &gst_base::BaseSrc, ndi_name: &str, + url_address: Option<&str>, receiver_ndi_name: &str, connect_timeout: u32, bandwidth: NDIlib_recv_bandwidth_e, @@ -570,227 +537,61 @@ where let mut receivers = HASHMAP_RECEIVERS.lock().unwrap(); // Check if we already have a receiver for this very stream - for val in receivers.values_mut() { - let (val_audio, val_video, val_ndi_name) = match val { - ReceiverInfo::Connecting { - ref mut audio, - ref mut video, - ref ndi_name, - .. - } => (audio, video, ndi_name.as_str()), - ReceiverInfo::Connected { - ref mut audio, - ref mut video, - ref ndi_name, - .. - } => (audio, video, ndi_name.as_str()), - }; - - if val_ndi_name == ndi_name { - if (val_video.is_some() || !T::IS_VIDEO) && (val_audio.is_some() || T::IS_VIDEO) { + for receiver in receivers.values_mut() { + if receiver.ndi_name == ndi_name + && receiver.url_address.as_ref().map(String::as_str) == url_address + { + if (receiver.video.is_some() || !T::IS_VIDEO) + && (receiver.audio.is_some() || T::IS_VIDEO) + { gst_element_error!( element, gst::ResourceError::OpenRead, [ "Source with ndi-name '{}' already in use for {}", - val_ndi_name, + receiver.ndi_name, if T::IS_VIDEO { "video" } else { "audio" } ] ); return None; } else { - return Some(Receiver::new(val, timestamp_mode, timeout, element, cat)); + return Some(Receiver::new( + receiver, + timestamp_mode, + timeout, + connect_timeout, + element, + cat, + )); } } } - // Otherwise asynchronously search for it and return the receiver to the caller - let id_receiver = ID_RECEIVER.fetch_add(1, Ordering::SeqCst); - let mut info = ReceiverInfo::Connecting { - id: id_receiver, - ndi_name: String::from(ndi_name), - video: None, - audio: None, - observations: Observations::new(), - }; - - let receiver = Receiver::new(&mut info, timestamp_mode, timeout, element, cat); - - receivers.insert(id_receiver, info); - - let receiver_ndi_name = String::from(receiver_ndi_name); - let element = element.clone(); - thread::spawn(move || { - use std::panic; - - let res = match panic::catch_unwind(move || { - connect_ndi_async( - cat, - &element, - id_receiver, - receiver_ndi_name, - connect_timeout, - bandwidth, - ) - }) { - Ok(res) => res, - Err(_) => Err(Some(gst_error_msg!( - gst::LibraryError::Failed, - ["Panic while connecting to NDI source"] - ))), - }; - - match res { - Ok(_) => (), - Err(None) => { - gst_debug!(cat, "Shutting down while connecting"); - } - Err(Some(err)) => { - gst_error!(cat, "Error while connecting: {:?}", err); - let mut receivers = HASHMAP_RECEIVERS.lock().unwrap(); - let info = match receivers.get_mut(&id_receiver) { - None => return, - Some(val) => val, - }; - - let (audio, video) = match info { - ReceiverInfo::Connecting { - ref audio, - ref video, - .. - } => (audio, video), - ReceiverInfo::Connected { .. } => unreachable!(), - }; - - assert!(audio.is_some() || video.is_some()); - - if let Some(audio) = audio.as_ref().and_then(|v| v.upgrade()).map(Receiver) { - if let Some(element) = audio.0.element.upgrade() { - element.post_error_message(&err); - } - let audio_recv = audio.0.recv.lock().unwrap(); - let mut queue = (audio.0.queue.0).0.lock().unwrap(); - assert!(audio_recv.is_none()); - queue.error = Some(gst::FlowError::Error); - audio.0.recv_cond.notify_one(); - (audio.0.queue.0).1.notify_one(); - } - - if let Some(video) = video.as_ref().and_then(|v| v.upgrade()).map(Receiver) { - if let Some(element) = video.0.element.upgrade() { - element.post_error_message(&err); - } - let video_recv = video.0.recv.lock().unwrap(); - let mut queue = (video.0.queue.0).0.lock().unwrap(); - assert!(video_recv.is_none()); - queue.error = Some(gst::FlowError::Error); - video.0.recv_cond.notify_one(); - (video.0.queue.0).1.notify_one(); - } - } - } - }); - - Some(receiver) -} - -fn connect_ndi_async( - cat: gst::DebugCategory, - element: &gst_base::BaseSrc, - id_receiver: usize, - receiver_ndi_name: String, - connect_timeout: u32, - bandwidth: NDIlib_recv_bandwidth_e, -) -> Result<(), Option> { - let mut find = match FindInstance::builder().build() { - None => { - return Err(Some(gst_error_msg!( - gst::CoreError::Negotiation, - ["Cannot run NDI: NDIlib_find_create_v2 error"] - ))); - } - Some(find) => find, - }; - - let timer = time::Instant::now(); - let source = loop { - let new_sources = find.wait_for_sources(100); - let sources = find.get_current_sources(); - - gst_debug!( - cat, - obj: element, - "Total sources found in network {}", - sources.len(), - ); - - if new_sources { - for source in &sources { - gst_debug!( - cat, - obj: element, - "Found source '{}' with URL {}", - source.ndi_name(), - source.url_address(), - ); - } - - let receivers = HASHMAP_RECEIVERS.lock().unwrap(); - let info = match receivers.get(&id_receiver) { - None => return Err(None), - Some(val) => val, - }; - - let ndi_name = match info { - ReceiverInfo::Connecting { - ref ndi_name, - ref audio, - ref video, - .. - } => { - assert!(audio.is_some() || video.is_some()); - ndi_name - } - ReceiverInfo::Connected { .. } => unreachable!(), - }; - - let source = sources.iter().find(|s| s.ndi_name() == ndi_name.as_str()); - - if let Some(source) = source { - break source.to_owned(); - } - } - - if timer.elapsed().as_millis() >= connect_timeout as u128 { - return Err(Some(gst_error_msg!( - gst::ResourceError::NotFound, - ["Stream not found"] - ))); - } - }; - + // Otherwise create a new one and return it to the caller gst_debug!( cat, obj: element, - "Connecting to NDI source with ndi-name '{}' and URL {}", - source.ndi_name(), - source.url_address(), + "Connecting to NDI source with ndi-name '{}' and URL/Address {:?}", + ndi_name, + url_address, ); // FIXME: Ideally we would use NDIlib_recv_color_format_fastest here but that seems to be // broken with interlaced content currently - let recv = RecvInstance::builder(&source, &receiver_ndi_name) + let recv = RecvInstance::builder((ndi_name, url_address), &receiver_ndi_name) .bandwidth(bandwidth) .color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA) .allow_video_fields(true) .build(); let recv = match recv { None => { - return Err(Some(gst_error_msg!( + gst_element_error!( + element, gst::CoreError::Negotiation, ["Failed to connect to source"] - ))); + ); + return None; } Some(recv) => recv, }; @@ -800,96 +601,36 @@ fn connect_ndi_async( let enable_hw_accel = MetadataFrame::new(0, Some("")); recv.send_metadata(&enable_hw_accel); - let mut receivers = HASHMAP_RECEIVERS.lock().unwrap(); - let info = match receivers.get_mut(&id_receiver) { - None => return Err(None), - Some(val) => val, - }; - - let (audio, video, observations) = match info { - ReceiverInfo::Connecting { - ref audio, - ref video, - ref observations, - .. - } => (audio.clone(), video.clone(), observations), - ReceiverInfo::Connected { .. } => unreachable!(), - }; - - assert!(audio.is_some() || video.is_some()); - - *info = ReceiverInfo::Connected { + let id_receiver = ID_RECEIVER.fetch_add(1, Ordering::SeqCst); + let mut info = ReceiverInfo { id: id_receiver, - ndi_name: source.ndi_name().to_owned(), - recv: recv.clone(), - video: video.clone(), - audio: audio.clone(), - observations: observations.clone(), + ndi_name: String::from(ndi_name), + url_address: url_address.map(String::from), + recv, + video: None, + audio: None, + observations: Observations::new(), }; - gst_debug!(cat, obj: element, "Started NDI connection"); + // This will set info.audio/video accordingly + let receiver = Receiver::new( + &mut info, + timestamp_mode, + timeout, + connect_timeout, + element, + cat, + ); - if let Some(audio) = audio.and_then(|v| v.upgrade()).map(Receiver) { - let mut audio_recv = audio.0.recv.lock().unwrap(); - assert!(audio_recv.is_none()); - *audio_recv = Some(recv.clone()); - audio.0.recv_cond.notify_one(); - } + receivers.insert(id_receiver, info); - if let Some(video) = video.and_then(|v| v.upgrade()).map(Receiver) { - let mut video_recv = video.0.recv.lock().unwrap(); - assert!(video_recv.is_none()); - *video_recv = Some(recv); - video.0.recv_cond.notify_one(); - } - - Ok(()) + Some(receiver) } fn receive_thread(receiver: &Weak>) where Receiver: ReceiverCapture, { - // First loop until we actually are connected, or an error happened - let recv = { - let receiver = match receiver.upgrade().map(Receiver) { - None => return, - Some(receiver) => receiver, - }; - - let element = match receiver.0.element.upgrade() { - None => return, - Some(element) => element, - }; - - let mut recv = receiver.0.recv.lock().unwrap(); - loop { - { - let queue = (receiver.0.queue.0).0.lock().unwrap(); - if !queue.capturing { - gst_debug!(receiver.0.cat, obj: &element, "Shutting down"); - return; - } - - // If an error happened in the meantime, just go out of here - if queue.error.is_some() { - gst_error!( - receiver.0.cat, - obj: &element, - "Error while waiting for connection" - ); - return; - } - } - - if let Some(ref recv) = *recv { - break recv.clone(); - } - - recv = receiver.0.recv_cond.wait(recv).unwrap(); - } - }; - // Now first capture frames until the queues are empty so that we're sure that we output only // the very latest frame that is available now loop { @@ -921,6 +662,8 @@ where } } + let recv = receiver.0.recv.lock().unwrap(); + let queue = recv.get_queue(); if (!T::IS_VIDEO && queue.audio_frames() <= 1) || (T::IS_VIDEO && queue.video_frames() <= 1) { @@ -950,6 +693,8 @@ where } } + let recv = receiver.0.recv.lock().unwrap(); + let res = receiver.capture_internal(&element, &recv); match res { @@ -999,11 +744,7 @@ pub trait ReceiverCapture { recv: &RecvInstance, ) -> Result<(gst::Buffer, T::InfoType), gst::FlowError>; - fn store_internal( - storage_video: &mut Option>>, - storage_audio: &mut Option>>, - weak: Weak>, - ); + fn store_internal(info: &mut ReceiverInfo, weak: Weak>); } impl ReceiverCapture for Receiver { @@ -1015,13 +756,9 @@ impl ReceiverCapture for Receiver { self.capture_video(element, recv) } - fn store_internal( - storage_video: &mut Option>>, - _storage_audio: &mut Option>>, - weak: Weak>, - ) { - assert!(storage_video.is_none()); - *storage_video = Some(weak); + fn store_internal(info: &mut ReceiverInfo, weak: Weak>) { + assert!(info.video.is_none()); + info.video = Some(weak); } } @@ -1109,13 +846,9 @@ impl ReceiverCapture for Receiver { self.capture_audio(element, recv) } - fn store_internal( - _storage_video: &mut Option>>, - storage_audio: &mut Option>>, - weak: Weak>, - ) { - assert!(storage_audio.is_none()); - *storage_audio = Some(weak); + fn store_internal(info: &mut ReceiverInfo, weak: Weak>) { + assert!(info.audio.is_none()); + info.audio = Some(weak); } } @@ -1125,7 +858,12 @@ impl Receiver { element: &gst_base::BaseSrc, recv: &RecvInstance, ) -> Result<(gst::Buffer, gst_video::VideoInfo), gst::FlowError> { - let timeout = time::Instant::now(); + let timer = time::Instant::now(); + let timeout = if self.0.first_frame.load(Ordering::SeqCst) { + self.0.connect_timeout + } else { + self.0.timeout + }; let mut flushing; let mut playing; @@ -1156,7 +894,7 @@ impl Receiver { ); return Err(gst::FlowError::Error); } - Ok(None) if timeout.elapsed().as_millis() >= self.0.timeout as u128 => { + Ok(None) if timer.elapsed().as_millis() >= timeout as u128 => { gst_debug!(self.0.cat, obj: element, "Timed out -- assuming EOS",); return Err(gst::FlowError::Eos); } @@ -1174,6 +912,8 @@ impl Receiver { break video_frame; }; + self.0.first_frame.store(false, Ordering::SeqCst); + gst_debug!( self.0.cat, obj: element, @@ -1546,7 +1286,12 @@ impl Receiver { element: &gst_base::BaseSrc, recv: &RecvInstance, ) -> Result<(gst::Buffer, gst_audio::AudioInfo), gst::FlowError> { - let timeout = time::Instant::now(); + let timer = time::Instant::now(); + let timeout = if self.0.first_frame.load(Ordering::SeqCst) { + self.0.connect_timeout + } else { + self.0.timeout + }; let mut flushing; let mut playing; @@ -1577,7 +1322,7 @@ impl Receiver { ); return Err(gst::FlowError::Error); } - Ok(None) if timeout.elapsed().as_millis() >= self.0.timeout as u128 => { + Ok(None) if timer.elapsed().as_millis() >= timeout as u128 => { gst_debug!(self.0.cat, obj: element, "Timed out -- assuming EOS",); return Err(gst::FlowError::Eos); } @@ -1595,6 +1340,8 @@ impl Receiver { break audio_frame; }; + self.0.first_frame.store(false, Ordering::SeqCst); + gst_debug!( self.0.cat, obj: element,