From f2a4699d1301fd6c4b97524b91594d0ec076cad6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 16 Jan 2020 10:27:50 +0200 Subject: [PATCH] Add a device provider for listing available sources Fixes https://github.com/teltek/gst-plugin-ndi/issues/36 --- Cargo.toml | 1 + src/device_provider.rs | 315 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 + src/ndi.rs | 7 + 4 files changed, 325 insertions(+) create mode 100644 src/device_provider.rs diff --git a/Cargo.toml b/Cargo.toml index 388e4363..7437f0f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ gstreamer-audio = "0.15" gstreamer-video = { version = "0.15", features = ["v1_12"] } lazy_static = "1.1.0" byte-slice-cast = "0.3.0" +once_cell = "1.0" [build-dependencies] gst-plugin-version-helper = "0.1" diff --git a/src/device_provider.rs b/src/device_provider.rs new file mode 100644 index 00000000..3059e10d --- /dev/null +++ b/src/device_provider.rs @@ -0,0 +1,315 @@ +use glib; +use glib::subclass; +use gst; +use gst::prelude::*; +use gst::subclass::prelude::*; + +use once_cell::sync::OnceCell; + +use std::sync::atomic; +use std::sync::Mutex; +use std::thread; + +use crate::ndi; + +#[derive(Debug)] +struct DeviceProvider { + cat: gst::DebugCategory, + thread: Mutex>>, + current_devices: Mutex>, + find: Mutex>, + is_running: atomic::AtomicBool, +} + +impl ObjectSubclass for DeviceProvider { + const NAME: &'static str = "NdiDeviceProvider"; + type ParentType = gst::DeviceProvider; + type Instance = subclass::simple::InstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn new() -> Self { + Self { + cat: gst::DebugCategory::new( + "ndideviceprovider", + gst::DebugColorFlags::empty(), + Some("NewTek NDI Device Provider"), + ), + thread: Mutex::new(None), + current_devices: Mutex::new(vec![]), + find: Mutex::new(None), + is_running: atomic::AtomicBool::new(false), + } + } + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "NewTek NDI Device Provider", + "Source/Audio/Video/Network", + "NewTek NDI Device Provider", + "Ruben Gonzalez , Daniel Vilar , Sebastian Dröge ", + ); + } +} + +impl ObjectImpl for DeviceProvider { + glib_object_impl!(); +} + +impl DeviceProviderImpl for DeviceProvider { + fn probe(&self, _device_provider: &gst::DeviceProvider) -> Vec { + self.current_devices.lock().unwrap().clone() + } + fn start(&self, device_provider: &gst::DeviceProvider) -> Result<(), gst::LoggableError> { + let mut thread_guard = self.thread.lock().unwrap(); + if thread_guard.is_some() { + gst_log!( + self.cat, + obj: device_provider, + "Device provider already started" + ); + return Ok(()); + } + + self.is_running.store(true, atomic::Ordering::SeqCst); + + let device_provider_weak = device_provider.downgrade(); + let mut first = true; + *thread_guard = Some(thread::spawn(move || { + let device_provider = match device_provider_weak.upgrade() { + None => return, + Some(device_provider) => device_provider, + }; + + let imp = DeviceProvider::from_instance(&device_provider); + { + let mut find_guard = imp.find.lock().unwrap(); + if find_guard.is_some() { + gst_log!(imp.cat, obj: &device_provider, "Already started"); + return; + } + + let find = match ndi::FindInstance::builder().build() { + None => { + gst_error!( + imp.cat, + obj: &device_provider, + "Failed to create Find instance" + ); + return; + } + Some(find) => find, + }; + *find_guard = Some(find); + } + + loop { + let device_provider = match device_provider_weak.upgrade() { + None => break, + Some(device_provider) => device_provider, + }; + + let imp = DeviceProvider::from_instance(&device_provider); + if !imp.is_running.load(atomic::Ordering::SeqCst) { + break; + } + + imp.poll(&device_provider, first); + first = false; + } + })); + + Ok(()) + } + fn stop(&self, _device_provider: &gst::DeviceProvider) { + if let Some(_thread) = self.thread.lock().unwrap().take() { + self.is_running.store(false, atomic::Ordering::SeqCst); + // Don't actually join because that might take a while + } + } +} + +impl DeviceProvider { + fn poll(&self, device_provider: &gst::DeviceProvider, first: bool) { + let mut find_guard = self.find.lock().unwrap(); + let find = match *find_guard { + None => return, + Some(ref mut find) => find, + }; + + if !find.wait_for_sources(if first { 1000 } else { 5000 }) { + gst_trace!(self.cat, obj: device_provider, "No new sources found"); + return; + } + + let sources = find.get_current_sources(); + let mut sources = sources.iter().map(|s| s.to_owned()).collect::>(); + + let mut current_devices_guard = self.current_devices.lock().unwrap(); + let mut expired_devices = vec![]; + let mut remaining_sources = vec![]; + + // First check for each device we previously knew if it's still available + for old_device in &*current_devices_guard { + let old_device_imp = Device::from_instance(old_device); + let old_source = old_device_imp.source.get().unwrap(); + + if !sources.contains(&old_source.0) { + gst_log!( + self.cat, + obj: device_provider, + "Source {:?} disappeared", + old_source + ); + expired_devices.push(old_device.clone()); + } else { + // Otherwise remember that we had it before already and don't have to announce it + // again. After the loop we're going to remove these all from the sources vec. + remaining_sources.push(old_source.0.to_owned()); + } + } + + for remaining_source in remaining_sources { + sources.retain(|s| s != &remaining_source); + } + + // Remove all expired devices from the list of cached devices + current_devices_guard.retain(|d| !expired_devices.contains(d)); + // And also notify the device provider of them having disappeared + for old_device in expired_devices { + device_provider.device_remove(&old_device); + } + + // Now go through all new devices and announce them + for source in sources { + gst_log!( + self.cat, + obj: device_provider, + "Source {:?} appeared", + source + ); + // Add once for audio, another time for video + let device = Device::new(&source, true); + device_provider.device_add(&device); + current_devices_guard.push(device); + + let device = Device::new(&source, false); + device_provider.device_add(&device); + current_devices_guard.push(device); + } + } +} + +#[derive(Debug)] +struct Device { + cat: gst::DebugCategory, + source: OnceCell<(ndi::Source<'static>, glib::Type)>, +} + +impl ObjectSubclass for Device { + const NAME: &'static str = "NdiDevice"; + type ParentType = gst::Device; + type Instance = subclass::simple::InstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn new() -> Self { + Self { + cat: gst::DebugCategory::new( + "ndidevice", + gst::DebugColorFlags::empty(), + Some("NewTek NDI Device"), + ), + source: OnceCell::new(), + } + } +} + +impl ObjectImpl for Device { + glib_object_impl!(); +} + +impl DeviceImpl for Device { + fn create_element( + &self, + _device: &gst::Device, + name: Option<&str>, + ) -> Result { + let source_info = self.source.get().unwrap(); + let element = glib::Object::new( + source_info.1, + &[ + ("name", &name), + ("ndi-name", &source_info.0.ndi_name()), + ("url-address", &source_info.0.url_address()), + ], + ) + .unwrap() + .dynamic_cast::() + .unwrap(); + + Ok(element) + } +} + +impl Device { + fn new(source: &ndi::Source<'_>, is_audio: bool) -> gst::Device { + let display_name = format!( + "{} ({})", + source.ndi_name(), + if is_audio { "Audio" } else { "Video" } + ); + let device_class = format!( + "Source/{}/Network", + if is_audio { "Audio" } else { "Video" } + ); + + // Get the caps from the template caps of the corresponding source element + let element_type = if is_audio { + crate::ndiaudiosrc::NdiAudioSrc::get_type() + } else { + crate::ndivideosrc::NdiVideoSrc::get_type() + }; + let element_class = gst::ElementClass::from_type(element_type).unwrap(); + let templ = element_class.get_pad_template("src").unwrap(); + let caps = templ.get_caps().unwrap(); + + // Put the url-address into the extra properties + let extra_properties = gst::Structure::builder("properties") + .field("ndi-name", &source.ndi_name()) + .field("url-address", &source.url_address()) + .build(); + + let device = glib::Object::new( + Device::get_type(), + &[ + ("caps", &caps), + ("display-name", &display_name), + ("device-class", &device_class), + ("properties", &extra_properties), + ], + ) + .unwrap() + .dynamic_cast::() + .unwrap(); + let device_impl = Device::from_instance(&device); + + device_impl + .source + .set((source.to_owned(), element_type)) + .unwrap(); + + device + } +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::DeviceProvider::register( + Some(plugin), + "ndideviceprovider", + gst::Rank::Primary, + DeviceProvider::get_type(), + ) +} diff --git a/src/lib.rs b/src/lib.rs index 190b4ec5..5f608ad5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ extern crate gstreamer_video as gst_video; extern crate lazy_static; extern crate byte_slice_cast; +mod device_provider; pub mod ndi; mod ndiaudiosrc; pub mod ndisys; @@ -39,6 +40,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { ndivideosrc::register(plugin)?; ndiaudiosrc::register(plugin)?; + device_provider::register(plugin)?; Ok(()) } diff --git a/src/ndi.rs b/src/ndi.rs index f17170b8..060e0a45 100644 --- a/src/ndi.rs +++ b/src/ndi.rs @@ -122,6 +122,7 @@ pub enum Source<'a> { } unsafe impl<'a> Send for Source<'a> {} +unsafe impl<'a> Sync for Source<'a> {} impl<'a> Source<'a> { pub fn ndi_name(&self) -> &str { @@ -191,6 +192,12 @@ impl<'a> Source<'a> { } } +impl<'a> PartialEq for Source<'a> { + fn eq(&self, other: &Source<'a>) -> bool { + self.ndi_name() == other.ndi_name() && self.url_address() == other.url_address() + } +} + #[derive(Debug)] pub struct RecvBuilder<'a> { source_to_connect_to: &'a Source<'a>,