Implement cancellation of connection attempts

This commit is contained in:
Sebastian Dröge 2019-07-17 12:12:58 +03:00
parent f27c2507c5
commit 19d25d20a7
3 changed files with 101 additions and 11 deletions

View file

@ -21,7 +21,7 @@ use ndi::*;
use ndisys::*;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use std::sync::Mutex;
use std::time;
@ -83,6 +83,7 @@ fn connect_ndi(
receiver_ndi_name: &str,
connect_timeout: u32,
bandwidth: NDIlib_recv_bandwidth_e,
cancel: &AtomicBool,
) -> Option<usize> {
gst_debug!(cat, obj: element, "Starting NDI connection...");
@ -119,8 +120,18 @@ fn connect_ndi(
let timeout = time::Instant::now();
let source = loop {
if cancel.load(Ordering::SeqCst) {
gst_debug!(cat, obj: element, "Cancelled");
return None;
}
find.wait_for_sources(50);
if cancel.load(Ordering::SeqCst) {
gst_debug!(cat, obj: element, "Cancelled");
return None;
}
let sources = find.get_current_sources();
gst_debug!(

View file

@ -10,6 +10,7 @@ use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time;
use std::{i32, u32};
@ -146,6 +147,7 @@ pub(crate) struct NdiAudioSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
unlock: AtomicBool,
}
impl ObjectSubclass for NdiAudioSrc {
@ -165,6 +167,7 @@ impl ObjectSubclass for NdiAudioSrc {
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
unlock: AtomicBool::new(false),
}
}
@ -353,13 +356,38 @@ impl ObjectImpl for NdiAudioSrc {
impl ElementImpl for NdiAudioSrc {}
impl BaseSrcImpl for NdiAudioSrc {
fn unlock(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(
self.cat,
obj: element,
"Unlocking",
);
self.unlock.store(true, Ordering::SeqCst);
Ok(())
}
fn unlock_stop(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(
self.cat,
obj: element,
"Stop unlocking",
);
self.unlock.store(false, Ordering::SeqCst);
Ok(())
}
fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.lock().unwrap();
state.id_receiver = connect_ndi(
if settings.ip_address.is_none() && settings.ndi_name.is_none() {
return Err(gst_error_msg!(
gst::LibraryError::Settings,
["No IP address or NDI name given"]
));
}
let id_receiver = connect_ndi(
self.cat,
element,
settings.ip_address.as_ref().map(String::as_str),
@ -367,14 +395,22 @@ impl BaseSrcImpl for NdiAudioSrc {
&settings.receiver_ndi_name,
settings.connect_timeout,
settings.bandwidth,
&self.unlock,
);
match state.id_receiver {
// settings.id_receiver exists
match id_receiver {
None if self.unlock.load(Ordering::SeqCst) => Ok(()),
None => Err(gst_error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
_ => Ok(()),
Some(id_receiver) => {
let mut state = self.state.lock().unwrap();
state.id_receiver = Some(id_receiver);
Ok(())
}
}
}

View file

@ -12,6 +12,7 @@ use gst_video;
use gst_video::prelude::*;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time;
use std::{i32, u32};
@ -147,6 +148,7 @@ pub(crate) struct NdiVideoSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
unlock: AtomicBool,
}
impl ObjectSubclass for NdiVideoSrc {
@ -166,6 +168,7 @@ impl ObjectSubclass for NdiVideoSrc {
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
unlock: AtomicBool::new(false),
}
}
@ -385,12 +388,43 @@ impl ObjectImpl for NdiVideoSrc {
}
}
impl ElementImpl for NdiVideoSrc {}
impl ElementImpl for NdiVideoSrc {
fn change_state(&self, element: &gst::Element, transition: gst::StateChange) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::ReadyToPaused => self.unlock.store(true, Ordering::SeqCst),
gst::StateChange::PausedToReady => self.unlock.store(false, Ordering::SeqCst),
_ => (),
}
self.parent_change_state(element, transition)
}
}
impl BaseSrcImpl for NdiVideoSrc {
fn unlock(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(
self.cat,
obj: element,
"Unlocking",
);
self.unlock.store(true, Ordering::SeqCst);
Ok(())
}
fn unlock_stop(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(
self.cat,
obj: element,
"Stop unlocking",
);
self.unlock.store(false, Ordering::SeqCst);
Ok(())
}
fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
self.unlock.store(false, Ordering::SeqCst);
*self.state.lock().unwrap() = Default::default();
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap().clone();
if settings.ip_address.is_none() && settings.ndi_name.is_none() {
@ -400,7 +434,7 @@ impl BaseSrcImpl for NdiVideoSrc {
));
}
state.id_receiver = connect_ndi(
let id_receiver = connect_ndi(
self.cat,
element,
settings.ip_address.as_ref().map(String::as_str),
@ -408,19 +442,28 @@ impl BaseSrcImpl for NdiVideoSrc {
&settings.receiver_ndi_name,
settings.connect_timeout,
settings.bandwidth,
&self.unlock,
);
// settings.id_receiver exists
match state.id_receiver {
match id_receiver {
None if self.unlock.load(Ordering::SeqCst) => Ok(()),
None => Err(gst_error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
_ => Ok(()),
Some(id_receiver) => {
let mut state = self.state.lock().unwrap();
state.id_receiver = Some(id_receiver);
Ok(())
}
}
}
fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
self.unlock.store(true, Ordering::SeqCst);
*self.state.lock().unwrap() = Default::default();
let mut state = self.state.lock().unwrap();