diff --git a/src/lib.rs b/src/lib.rs index 3ef1ee65..d0b78d90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ use ndi::*; use ndisys::*; use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; use std::sync::Mutex; use std::time; @@ -83,6 +83,7 @@ fn connect_ndi( receiver_ndi_name: &str, connect_timeout: u32, bandwidth: NDIlib_recv_bandwidth_e, + cancel: &AtomicBool, ) -> Option { gst_debug!(cat, obj: element, "Starting NDI connection..."); @@ -119,8 +120,18 @@ fn connect_ndi( let timeout = time::Instant::now(); let source = loop { + if cancel.load(Ordering::SeqCst) { + gst_debug!(cat, obj: element, "Cancelled"); + return None; + } + find.wait_for_sources(50); + if cancel.load(Ordering::SeqCst) { + gst_debug!(cat, obj: element, "Cancelled"); + return None; + } + let sources = find.get_current_sources(); gst_debug!( diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc.rs index 8e01dead..51a6837a 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc.rs @@ -10,6 +10,7 @@ use gst_base::prelude::*; use gst_base::subclass::prelude::*; use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time; use std::{i32, u32}; @@ -146,6 +147,7 @@ pub(crate) struct NdiAudioSrc { cat: gst::DebugCategory, settings: Mutex, state: Mutex, + unlock: AtomicBool, } impl ObjectSubclass for NdiAudioSrc { @@ -165,6 +167,7 @@ impl ObjectSubclass for NdiAudioSrc { ), settings: Mutex::new(Default::default()), state: Mutex::new(Default::default()), + unlock: AtomicBool::new(false), } } @@ -353,13 +356,38 @@ impl ObjectImpl for NdiAudioSrc { impl ElementImpl for NdiAudioSrc {} impl BaseSrcImpl for NdiAudioSrc { + fn unlock(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> { + gst_debug!( + self.cat, + obj: element, + "Unlocking", + ); + self.unlock.store(true, Ordering::SeqCst); + Ok(()) + } + + fn unlock_stop(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> { + gst_debug!( + self.cat, + obj: element, + "Stop unlocking", + ); + self.unlock.store(false, Ordering::SeqCst); + Ok(()) + } + fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { *self.state.lock().unwrap() = Default::default(); - let settings = self.settings.lock().unwrap().clone(); - let mut state = self.state.lock().unwrap(); - state.id_receiver = connect_ndi( + if settings.ip_address.is_none() && settings.ndi_name.is_none() { + return Err(gst_error_msg!( + gst::LibraryError::Settings, + ["No IP address or NDI name given"] + )); + } + + let id_receiver = connect_ndi( self.cat, element, settings.ip_address.as_ref().map(String::as_str), @@ -367,14 +395,22 @@ impl BaseSrcImpl for NdiAudioSrc { &settings.receiver_ndi_name, settings.connect_timeout, settings.bandwidth, + &self.unlock, ); - match state.id_receiver { + // settings.id_receiver exists + match id_receiver { + None if self.unlock.load(Ordering::SeqCst) => Ok(()), None => Err(gst_error_msg!( gst::ResourceError::NotFound, ["Could not connect to this source"] )), - _ => Ok(()), + Some(id_receiver) => { + let mut state = self.state.lock().unwrap(); + state.id_receiver = Some(id_receiver); + + Ok(()) + } } } diff --git a/src/ndivideosrc.rs b/src/ndivideosrc.rs index 97c4b816..02e353c3 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc.rs @@ -12,6 +12,7 @@ use gst_video; use gst_video::prelude::*; use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time; use std::{i32, u32}; @@ -147,6 +148,7 @@ pub(crate) struct NdiVideoSrc { cat: gst::DebugCategory, settings: Mutex, state: Mutex, + unlock: AtomicBool, } impl ObjectSubclass for NdiVideoSrc { @@ -166,6 +168,7 @@ impl ObjectSubclass for NdiVideoSrc { ), settings: Mutex::new(Default::default()), state: Mutex::new(Default::default()), + unlock: AtomicBool::new(false), } } @@ -385,12 +388,43 @@ impl ObjectImpl for NdiVideoSrc { } } -impl ElementImpl for NdiVideoSrc {} +impl ElementImpl for NdiVideoSrc { + fn change_state(&self, element: &gst::Element, transition: gst::StateChange) -> Result { + match transition { + gst::StateChange::ReadyToPaused => self.unlock.store(true, Ordering::SeqCst), + gst::StateChange::PausedToReady => self.unlock.store(false, Ordering::SeqCst), + _ => (), + } + + self.parent_change_state(element, transition) + } +} impl BaseSrcImpl for NdiVideoSrc { + fn unlock(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> { + gst_debug!( + self.cat, + obj: element, + "Unlocking", + ); + self.unlock.store(true, Ordering::SeqCst); + Ok(()) + } + + fn unlock_stop(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> { + gst_debug!( + self.cat, + obj: element, + "Stop unlocking", + ); + self.unlock.store(false, Ordering::SeqCst); + Ok(()) + } + fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + self.unlock.store(false, Ordering::SeqCst); + *self.state.lock().unwrap() = Default::default(); - let mut state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap().clone(); if settings.ip_address.is_none() && settings.ndi_name.is_none() { @@ -400,7 +434,7 @@ impl BaseSrcImpl for NdiVideoSrc { )); } - state.id_receiver = connect_ndi( + let id_receiver = connect_ndi( self.cat, element, settings.ip_address.as_ref().map(String::as_str), @@ -408,19 +442,28 @@ impl BaseSrcImpl for NdiVideoSrc { &settings.receiver_ndi_name, settings.connect_timeout, settings.bandwidth, + &self.unlock, ); // settings.id_receiver exists - match state.id_receiver { + match id_receiver { + None if self.unlock.load(Ordering::SeqCst) => Ok(()), None => Err(gst_error_msg!( gst::ResourceError::NotFound, ["Could not connect to this source"] )), - _ => Ok(()), + Some(id_receiver) => { + let mut state = self.state.lock().unwrap(); + state.id_receiver = Some(id_receiver); + + Ok(()) + } } } fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + self.unlock.store(true, Ordering::SeqCst); + *self.state.lock().unwrap() = Default::default(); let mut state = self.state.lock().unwrap();