From 291d951b01bfc79b88b286855862cea471396bb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 13 Sep 2021 12:26:56 +0300 Subject: [PATCH] Update to gstreamer-rs 0.17 --- Cargo.toml | 12 +- build.rs | 4 +- .../imp.rs} | 114 ++--- src/device_provider/mod.rs | 26 + src/lib.rs | 6 +- src/ndi.rs | 8 +- src/{ndiaudiosrc.rs => ndiaudiosrc/imp.rs} | 410 ++++++++------- src/ndiaudiosrc/mod.rs | 19 + src/{ndisink.rs => ndisink/imp.rs} | 260 +++++----- src/ndisink/mod.rs | 19 + .../imp.rs} | 340 +++++++------ src/ndisinkcombiner/mod.rs | 19 + src/ndisinkmeta.rs | 53 +- src/{ndivideosrc.rs => ndivideosrc/imp.rs} | 478 +++++++++--------- src/ndivideosrc/mod.rs | 19 + src/receiver.rs | 241 +++++---- 16 files changed, 1036 insertions(+), 992 deletions(-) rename src/{device_provider.rs => device_provider/imp.rs} (78%) create mode 100644 src/device_provider/mod.rs rename src/{ndiaudiosrc.rs => ndiaudiosrc/imp.rs} (62%) create mode 100644 src/ndiaudiosrc/mod.rs rename src/{ndisink.rs => ndisink/imp.rs} (52%) create mode 100644 src/ndisink/mod.rs rename src/{ndisinkcombiner.rs => ndisinkcombiner/imp.rs} (69%) create mode 100644 src/ndisinkcombiner/mod.rs rename src/{ndivideosrc.rs => ndivideosrc/imp.rs} (59%) create mode 100644 src/ndivideosrc/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 379583f3..8b587426 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,16 +8,16 @@ description = "NewTek NDI Plugin" edition = "2018" [dependencies] -glib = "0.10" -gst = { package = "gstreamer", version = "0.16", features = ["v1_12"] } -gst-base = { package = "gstreamer-base", version = "0.16" } -gst-audio = { package = "gstreamer-audio", version = "0.16" } -gst-video = { package = "gstreamer-video", version = "0.16", features = ["v1_12"] } +glib = "0.14" +gst = { package = "gstreamer", version = "0.17", features = ["v1_12"] } +gst-base = { package = "gstreamer-base", version = "0.17" } +gst-audio = { package = "gstreamer-audio", version = "0.17" } +gst-video = { package = "gstreamer-video", version = "0.17", features = ["v1_12"] } byte-slice-cast = "1" once_cell = "1.0" [build-dependencies] -gst-plugin-version-helper = "0.2" +gst-plugin-version-helper = "0.7" [features] default = ["interlaced-fields", "reference-timestamps", "sink"] diff --git a/build.rs b/build.rs index 0d1ddb61..cda12e57 100644 --- a/build.rs +++ b/build.rs @@ -1,5 +1,3 @@ -extern crate gst_plugin_version_helper; - fn main() { - gst_plugin_version_helper::get_info() + gst_plugin_version_helper::info() } diff --git a/src/device_provider.rs b/src/device_provider/imp.rs similarity index 78% rename from src/device_provider.rs rename to src/device_provider/imp.rs index e2155740..5933741d 100644 --- a/src/device_provider.rs +++ b/src/device_provider/imp.rs @@ -1,4 +1,3 @@ -use glib::subclass; use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_error, gst_log, gst_trace}; @@ -9,24 +8,24 @@ use std::sync::atomic; use std::sync::Mutex; use std::thread; +use once_cell::sync::Lazy; + use crate::ndi; #[derive(Debug)] -struct DeviceProvider { +pub struct DeviceProvider { cat: gst::DebugCategory, thread: Mutex>>, - current_devices: Mutex>, + current_devices: Mutex>, find: Mutex>, is_running: atomic::AtomicBool, } +#[glib::object_subclass] impl ObjectSubclass for DeviceProvider { const NAME: &'static str = "NdiDeviceProvider"; + type Type = super::DeviceProvider; type ParentType = gst::DeviceProvider; - type Instance = subclass::simple::InstanceStruct; - type Class = subclass::simple::ClassStruct; - - glib::glib_object_subclass!(); fn new() -> Self { Self { @@ -41,26 +40,32 @@ impl ObjectSubclass for DeviceProvider { is_running: atomic::AtomicBool::new(false), } } - - fn class_init(klass: &mut subclass::simple::ClassStruct) { - klass.set_metadata( - "NewTek NDI Device Provider", - "Source/Audio/Video/Network", - "NewTek NDI Device Provider", - "Ruben Gonzalez , Daniel Vilar , Sebastian Dröge ", - ); - } } -impl ObjectImpl for DeviceProvider { - glib::glib_object_impl!(); -} +impl ObjectImpl for DeviceProvider {} impl DeviceProviderImpl for DeviceProvider { - fn probe(&self, _device_provider: &gst::DeviceProvider) -> Vec { - self.current_devices.lock().unwrap().clone() + fn metadata() -> Option<&'static gst::subclass::DeviceProviderMetadata> { + static METADATA: Lazy = Lazy::new(|| { + gst::subclass::DeviceProviderMetadata::new("NewTek NDI Device Provider", + "Source/Audio/Video/Network", + "NewTek NDI Device Provider", + "Ruben Gonzalez , Daniel Vilar , Sebastian Dröge ") + }); + + Some(&*METADATA) } - fn start(&self, device_provider: &gst::DeviceProvider) -> Result<(), gst::LoggableError> { + + fn probe(&self, _device_provider: &Self::Type) -> Vec { + self.current_devices + .lock() + .unwrap() + .iter() + .map(|d| d.clone().upcast()) + .collect() + } + + fn start(&self, device_provider: &Self::Type) -> Result<(), gst::LoggableError> { let mut thread_guard = self.thread.lock().unwrap(); if thread_guard.is_some() { gst_log!( @@ -121,7 +126,8 @@ impl DeviceProviderImpl for DeviceProvider { Ok(()) } - fn stop(&self, _device_provider: &gst::DeviceProvider) { + + fn stop(&self, _device_provider: &Self::Type) { if let Some(_thread) = self.thread.lock().unwrap().take() { self.is_running.store(false, atomic::Ordering::SeqCst); // Don't actually join because that might take a while @@ -130,7 +136,7 @@ impl DeviceProviderImpl for DeviceProvider { } impl DeviceProvider { - fn poll(&self, device_provider: &gst::DeviceProvider, first: bool) { + fn poll(&self, device_provider: &super::DeviceProvider, first: bool) { let mut find_guard = self.find.lock().unwrap(); let find = match *find_guard { None => return, @@ -189,11 +195,11 @@ impl DeviceProvider { source ); // Add once for audio, another time for video - let device = Device::new(&source, true); + let device = super::Device::new(&source, true); device_provider.device_add(&device); current_devices_guard.push(device); - let device = Device::new(&source, false); + let device = super::Device::new(&source, false); device_provider.device_add(&device); current_devices_guard.push(device); } @@ -201,18 +207,16 @@ impl DeviceProvider { } #[derive(Debug)] -struct Device { +pub struct Device { cat: gst::DebugCategory, source: OnceCell<(ndi::Source<'static>, glib::Type)>, } +#[glib::object_subclass] impl ObjectSubclass for Device { const NAME: &'static str = "NdiDevice"; + type Type = super::Device; type ParentType = gst::Device; - type Instance = subclass::simple::InstanceStruct; - type Class = subclass::simple::ClassStruct; - - glib::glib_object_subclass!(); fn new() -> Self { Self { @@ -226,18 +230,16 @@ impl ObjectSubclass for Device { } } -impl ObjectImpl for Device { - glib::glib_object_impl!(); -} +impl ObjectImpl for Device {} impl DeviceImpl for Device { fn create_element( &self, - _device: &gst::Device, + _device: &Self::Type, name: Option<&str>, ) -> Result { let source_info = self.source.get().unwrap(); - let element = glib::Object::new( + let element = glib::Object::with_type( source_info.1, &[ ("name", &name), @@ -253,8 +255,8 @@ impl DeviceImpl for Device { } } -impl Device { - fn new(source: &ndi::Source<'_>, is_audio: bool) -> gst::Device { +impl super::Device { + fn new(source: &ndi::Source<'_>, is_audio: bool) -> super::Device { let display_name = format!( "{} ({})", source.ndi_name(), @@ -267,13 +269,13 @@ impl Device { // Get the caps from the template caps of the corresponding source element let element_type = if is_audio { - crate::ndiaudiosrc::NdiAudioSrc::get_type() + crate::ndiaudiosrc::NdiAudioSrc::static_type() } else { - crate::ndivideosrc::NdiVideoSrc::get_type() + crate::ndivideosrc::NdiVideoSrc::static_type() }; - let element_class = gst::ElementClass::from_type(element_type).unwrap(); - let templ = element_class.get_pad_template("src").unwrap(); - let caps = templ.get_caps().unwrap(); + let element_class = glib::Class::::from_type(element_type).unwrap(); + let templ = element_class.pad_template("src").unwrap(); + let caps = templ.caps(); // Put the url-address into the extra properties let extra_properties = gst::Structure::builder("properties") @@ -281,17 +283,12 @@ impl Device { .field("url-address", &source.url_address()) .build(); - let device = glib::Object::new( - Device::get_type(), - &[ - ("caps", &caps), - ("display-name", &display_name), - ("device-class", &device_class), - ("properties", &extra_properties), - ], - ) - .unwrap() - .dynamic_cast::() + let device = glib::Object::new::(&[ + ("caps", &caps), + ("display-name", &display_name), + ("device-class", &device_class), + ("properties", &extra_properties), + ]) .unwrap(); let device_impl = Device::from_instance(&device); @@ -303,12 +300,3 @@ impl Device { device } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::DeviceProvider::register( - Some(plugin), - "ndideviceprovider", - gst::Rank::Primary, - DeviceProvider::get_type(), - ) -} diff --git a/src/device_provider/mod.rs b/src/device_provider/mod.rs new file mode 100644 index 00000000..cda26ece --- /dev/null +++ b/src/device_provider/mod.rs @@ -0,0 +1,26 @@ +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct DeviceProvider(ObjectSubclass) @extends gst::DeviceProvider, gst::Object; +} + +unsafe impl Send for DeviceProvider {} +unsafe impl Sync for DeviceProvider {} + +glib::wrapper! { + pub struct Device(ObjectSubclass) @extends gst::Device, gst::Object; +} + +unsafe impl Send for Device {} +unsafe impl Sync for Device {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::DeviceProvider::register( + Some(plugin), + "ndideviceprovider", + gst::Rank::Primary, + DeviceProvider::static_type(), + ) +} diff --git a/src/lib.rs b/src/lib.rs index 5c0366b9..752ea686 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,3 @@ -use glib::prelude::*; - mod device_provider; pub mod ndi; mod ndiaudiosrc; @@ -38,7 +36,7 @@ pub enum TimestampMode { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { if !ndi::initialize() { - return Err(glib::glib_bool_error!("Cannot initialize NDI")); + return Err(glib::bool_error!("Cannot initialize NDI")); } device_provider::register(plugin)?; @@ -68,7 +66,7 @@ static TIMECODE_CAPS: Lazy = static TIMESTAMP_CAPS: Lazy = Lazy::new(|| gst::Caps::new_simple("timestamp/x-ndi-timestamp", &[])); -gst::gst_plugin_define!( +gst::plugin_define!( ndi, env!("CARGO_PKG_DESCRIPTION"), plugin_init, diff --git a/src/ndi.rs b/src/ndi.rs index 6c963efc..b4043b92 100644 --- a/src/ndi.rs +++ b/src/ndi.rs @@ -420,7 +420,7 @@ pub struct SendInstance(ptr::NonNull<::std::os::raw::c_void>); unsafe impl Send for SendInstance {} impl SendInstance { - pub fn builder<'a>(ndi_name: &'a str) -> SendBuilder<'a> { + pub fn builder(ndi_name: &str) -> SendBuilder { SendBuilder { ndi_name, clock_video: false, @@ -749,7 +749,7 @@ impl<'a> VideoFrame<'a> { impl<'a> Drop for VideoFrame<'a> { #[allow(irrefutable_let_patterns)] fn drop(&mut self) { - if let VideoFrame::BorrowedRecv(ref mut frame, ref recv) = *self { + if let VideoFrame::BorrowedRecv(ref mut frame, recv) = *self { unsafe { NDIlib_recv_free_video_v2(((recv.0).0).0.as_ptr() as *mut _, frame); } @@ -918,7 +918,7 @@ impl<'a> AudioFrame<'a> { impl<'a> Drop for AudioFrame<'a> { #[allow(irrefutable_let_patterns)] fn drop(&mut self) { - if let AudioFrame::BorrowedRecv(ref mut frame, ref recv) = *self { + if let AudioFrame::BorrowedRecv(ref mut frame, recv) = *self { unsafe { NDIlib_recv_free_audio_v2(((recv.0).0).0.as_ptr() as *mut _, frame); } @@ -1010,7 +1010,7 @@ impl<'a> Default for MetadataFrame<'a> { impl<'a> Drop for MetadataFrame<'a> { fn drop(&mut self) { - if let MetadataFrame::Borrowed(ref mut frame, ref recv) = *self { + if let MetadataFrame::Borrowed(ref mut frame, recv) = *self { unsafe { NDIlib_recv_free_metadata(((recv.0).0).0.as_ptr() as *mut _, frame); } diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc/imp.rs similarity index 62% rename from src/ndiaudiosrc.rs rename to src/ndiaudiosrc/imp.rs index b4cff6fa..3891a41d 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc/imp.rs @@ -1,7 +1,6 @@ -use glib::subclass; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg}; +use gst::{gst_debug, gst_error}; use gst_base::prelude::*; use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; @@ -9,6 +8,8 @@ use gst_base::subclass::prelude::*; use std::sync::Mutex; use std::{i32, u32}; +use once_cell::sync::Lazy; + use crate::connect_ndi; use crate::ndisys; @@ -46,94 +47,10 @@ impl Default for Settings { } } -static PROPERTIES: [subclass::Property; 8] = [ - subclass::Property("ndi-name", |name| { - glib::ParamSpec::string( - name, - "NDI Name", - "NDI stream name of the sender", - None, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("url-address", |name| { - glib::ParamSpec::string( - name, - "URL/Address", - "URL/address and port of the sender, e.g. 127.0.0.1:5961", - None, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("receiver-ndi-name", |name| { - glib::ParamSpec::string( - name, - "Receiver NDI Name", - "NDI stream name of this receiver", - Some(&*DEFAULT_RECEIVER_NDI_NAME), - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("connect-timeout", |name| { - glib::ParamSpec::uint( - name, - "Connect Timeout", - "Connection timeout in ms", - 0, - u32::MAX, - 10000, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("timeout", |name| { - glib::ParamSpec::uint( - name, - "Timeout", - "Receive timeout in ms", - 0, - u32::MAX, - 5000, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("max-queue-length", |name| { - glib::ParamSpec::uint( - name, - "Max Queue Length", - "Maximum receive queue length", - 0, - u32::MAX, - 5, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("bandwidth", |name| { - glib::ParamSpec::int( - name, - "Bandwidth", - "Bandwidth, -10 metadata-only, 10 audio-only, 100 highest", - -10, - 100, - 100, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("timestamp-mode", |name| { - glib::ParamSpec::enum_( - name, - "Timestamp Mode", - "Timestamp information to use for outgoing PTS", - TimestampMode::static_type(), - TimestampMode::ReceiveTimeTimecode as i32, - glib::ParamFlags::READWRITE, - ) - }), -]; - struct State { info: Option, receiver: Option>, - current_latency: gst::ClockTime, + current_latency: Option, } impl Default for State { @@ -141,25 +58,23 @@ impl Default for State { State { info: None, receiver: None, - current_latency: gst::CLOCK_TIME_NONE, + current_latency: gst::ClockTime::NONE, } } } -pub(crate) struct NdiAudioSrc { +pub struct NdiAudioSrc { cat: gst::DebugCategory, settings: Mutex, state: Mutex, receiver_controller: Mutex>>, } +#[glib::object_subclass] impl ObjectSubclass for NdiAudioSrc { const NAME: &'static str = "NdiAudioSrc"; + type Type = super::NdiAudioSrc; type ParentType = gst_base::BaseSrc; - type Instance = gst::subclass::ElementInstanceStruct; - type Class = subclass::simple::ClassStruct; - - glib::glib_object_subclass!(); fn new() -> Self { Self { @@ -173,89 +88,130 @@ impl ObjectSubclass for NdiAudioSrc { receiver_controller: Mutex::new(None), } } - - fn class_init(klass: &mut subclass::simple::ClassStruct) { - klass.set_metadata( - "NewTek NDI Audio Source", - "Source", - "NewTek NDI audio source", - "Ruben Gonzalez , Daniel Vilar , Sebastian Dröge ", - ); - - let caps = gst::Caps::new_simple( - "audio/x-raw", - &[ - ( - "format", - &gst::List::new(&[&gst_audio::AUDIO_FORMAT_S16.to_string()]), - ), - ("rate", &gst::IntRange::::new(1, i32::MAX)), - ("channels", &gst::IntRange::::new(1, i32::MAX)), - ("layout", &"interleaved"), - ], - ); - - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ) - .unwrap(); - klass.add_pad_template(src_pad_template); - - klass.install_properties(&PROPERTIES); - } } impl ObjectImpl for NdiAudioSrc { - glib::glib_object_impl!(); + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpec::new_string( + "ndi-name", + "NDI Name", + "NDI stream name of the sender", + None, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_string( + "url-address", + "URL/Address", + "URL/address and port of the sender, e.g. 127.0.0.1:5961", + None, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_string( + "receiver-ndi-name", + "Receiver NDI Name", + "NDI stream name of this receiver", + Some(&*DEFAULT_RECEIVER_NDI_NAME), + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_uint( + "connect-timeout", + "Connect Timeout", + "Connection timeout in ms", + 0, + u32::MAX, + 10000, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_uint( + "timeout", + "Timeout", + "Receive timeout in ms", + 0, + u32::MAX, + 5000, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_uint( + "max-queue-length", + "Max Queue Length", + "Maximum receive queue length", + 0, + u32::MAX, + 5, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_int( + "bandwidth", + "Bandwidth", + "Bandwidth, -10 metadata-only, 10 audio-only, 100 highest", + -10, + 100, + 100, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_enum( + "timestamp-mode", + "Timestamp Mode", + "Timestamp information to use for outgoing PTS", + TimestampMode::static_type(), + TimestampMode::ReceiveTimeTimecode as i32, + glib::ParamFlags::READWRITE, + ), + ] + }); - fn constructed(&self, obj: &glib::Object) { - self.parent_constructed(obj); - - let basesrc = obj.downcast_ref::().unwrap(); - // Initialize live-ness and notify the base class that - // we'd like to operate in Time format - basesrc.set_live(true); - basesrc.set_format(gst::Format::Time); + PROPERTIES.as_ref() } - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { - let prop = &PROPERTIES[id]; - let basesrc = obj.downcast_ref::().unwrap(); + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); - match *prop { - subclass::Property("ndi-name", ..) => { + // Initialize live-ness and notify the base class that + // we'd like to operate in Time format + obj.set_live(true); + obj.set_format(gst::Format::Time); + } + + fn set_property( + &self, + obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + "ndi-name" => { let mut settings = self.settings.lock().unwrap(); let ndi_name = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing ndi-name from {:?} to {:?}", settings.ndi_name, ndi_name, ); settings.ndi_name = ndi_name; } - subclass::Property("url-address", ..) => { + "url-address" => { let mut settings = self.settings.lock().unwrap(); let url_address = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing url-address from {:?} to {:?}", settings.url_address, url_address, ); settings.url_address = url_address; } - subclass::Property("receiver-ndi-name", ..) => { + "receiver-ndi-name" => { let mut settings = self.settings.lock().unwrap(); - let receiver_ndi_name = value.get().unwrap(); + let receiver_ndi_name = value.get::>().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing receiver-ndi-name from {:?} to {:?}", settings.receiver_ndi_name, receiver_ndi_name, @@ -263,67 +219,66 @@ impl ObjectImpl for NdiAudioSrc { settings.receiver_ndi_name = receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone()); } - subclass::Property("connect-timeout", ..) => { + "connect-timeout" => { let mut settings = self.settings.lock().unwrap(); - let connect_timeout = value.get_some().unwrap(); + let connect_timeout = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing connect-timeout from {} to {}", settings.connect_timeout, connect_timeout, ); settings.connect_timeout = connect_timeout; } - subclass::Property("timeout", ..) => { + "timeout" => { let mut settings = self.settings.lock().unwrap(); - let timeout = value.get_some().unwrap(); + let timeout = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing timeout from {} to {}", settings.timeout, timeout, ); settings.timeout = timeout; } - subclass::Property("max-queue-length", ..) => { + "max-queue-length" => { let mut settings = self.settings.lock().unwrap(); - let max_queue_length = value.get_some().unwrap(); + let max_queue_length = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing max-queue-length from {} to {}", settings.max_queue_length, max_queue_length, ); settings.max_queue_length = max_queue_length; } - subclass::Property("bandwidth", ..) => { + "bandwidth" => { let mut settings = self.settings.lock().unwrap(); - let bandwidth = value.get_some().unwrap(); + let bandwidth = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing bandwidth from {} to {}", settings.bandwidth, bandwidth, ); settings.bandwidth = bandwidth; } - subclass::Property("timestamp-mode", ..) => { + "timestamp-mode" => { let mut settings = self.settings.lock().unwrap(); - let timestamp_mode = value.get_some().unwrap(); + let timestamp_mode = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing timestamp mode from {:?} to {:?}", settings.timestamp_mode, timestamp_mode ); if settings.timestamp_mode != timestamp_mode { - let _ = - basesrc.post_message(gst::message::Latency::builder().src(basesrc).build()); + let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); } settings.timestamp_mode = timestamp_mode; } @@ -331,41 +286,39 @@ impl ObjectImpl for NdiAudioSrc { } } - fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { - let prop = &PROPERTIES[id]; - - match *prop { - subclass::Property("ndi-name", ..) => { + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "ndi-name" => { let settings = self.settings.lock().unwrap(); - Ok(settings.ndi_name.to_value()) + settings.ndi_name.to_value() } - subclass::Property("url-address", ..) => { + "url-address" => { let settings = self.settings.lock().unwrap(); - Ok(settings.url_address.to_value()) + settings.url_address.to_value() } - subclass::Property("receiver-ndi-name", ..) => { + "receiver-ndi-name" => { let settings = self.settings.lock().unwrap(); - Ok(settings.receiver_ndi_name.to_value()) + settings.receiver_ndi_name.to_value() } - subclass::Property("connect-timeout", ..) => { + "connect-timeout" => { let settings = self.settings.lock().unwrap(); - Ok(settings.connect_timeout.to_value()) + settings.connect_timeout.to_value() } - subclass::Property("timeout", ..) => { + "timeout" => { let settings = self.settings.lock().unwrap(); - Ok(settings.timeout.to_value()) + settings.timeout.to_value() } - subclass::Property("max-queue-length", ..) => { + "max-queue-length" => { let settings = self.settings.lock().unwrap(); - Ok(settings.max_queue_length.to_value()) + settings.max_queue_length.to_value() } - subclass::Property("bandwidth", ..) => { + "bandwidth" => { let settings = self.settings.lock().unwrap(); - Ok(settings.bandwidth.to_value()) + settings.bandwidth.to_value() } - subclass::Property("timestamp-mode", ..) => { + "timestamp-mode" => { let settings = self.settings.lock().unwrap(); - Ok(settings.timestamp_mode.to_value()) + settings.timestamp_mode.to_value() } _ => unimplemented!(), } @@ -373,9 +326,51 @@ impl ObjectImpl for NdiAudioSrc { } impl ElementImpl for NdiAudioSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "NewTek NDI Audio Source", + "Source", + "NewTek NDI audio source", + "Ruben Gonzalez , Daniel Vilar , Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::new_simple( + "audio/x-raw", + &[ + ( + "format", + &gst::List::new(&[&gst_audio::AUDIO_FORMAT_S16.to_string()]), + ), + ("rate", &gst::IntRange::::new(1, i32::MAX)), + ("channels", &gst::IntRange::::new(1, i32::MAX)), + ("layout", &"interleaved"), + ], + ); + + let audio_src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + ) + .unwrap(); + + vec![audio_src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + fn change_state( &self, - element: &gst::Element, + element: &Self::Type, transition: gst::StateChange, ) -> Result { match transition { @@ -402,13 +397,13 @@ impl ElementImpl for NdiAudioSrc { } impl BaseSrcImpl for NdiAudioSrc { - fn negotiate(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::LoggableError> { + fn negotiate(&self, _element: &Self::Type) -> Result<(), gst::LoggableError> { // Always succeed here without doing anything: we will set the caps once we received a // buffer, there's nothing we can negotiate Ok(()) } - fn unlock(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn unlock(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { gst_debug!(self.cat, obj: element, "Unlocking",); if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { controller.set_flushing(true); @@ -416,7 +411,7 @@ impl BaseSrcImpl for NdiAudioSrc { Ok(()) } - fn unlock_stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn unlock_stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { gst_debug!(self.cat, obj: element, "Stop unlocking",); if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { controller.set_flushing(false); @@ -424,12 +419,12 @@ impl BaseSrcImpl for NdiAudioSrc { Ok(()) } - fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { *self.state.lock().unwrap() = Default::default(); let settings = self.settings.lock().unwrap().clone(); if settings.ndi_name.is_none() && settings.url_address.is_none() { - return Err(gst_error_msg!( + return Err(gst::error_msg!( gst::LibraryError::Settings, ["No NDI name or URL/address given"] )); @@ -437,7 +432,7 @@ impl BaseSrcImpl for NdiAudioSrc { let receiver = connect_ndi( self.cat, - element, + element.upcast_ref(), settings.ndi_name.as_deref(), settings.url_address.as_deref(), &settings.receiver_ndi_name, @@ -450,7 +445,7 @@ impl BaseSrcImpl for NdiAudioSrc { // settings.id_receiver exists match receiver { - None => Err(gst_error_msg!( + None => Err(gst::error_msg!( gst::ResourceError::NotFound, ["Could not connect to this source"] )), @@ -465,7 +460,7 @@ impl BaseSrcImpl for NdiAudioSrc { } } - fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() { controller.shutdown(); } @@ -473,7 +468,7 @@ impl BaseSrcImpl for NdiAudioSrc { Ok(()) } - fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { + fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool { use gst::QueryView; match query.view_mut() { @@ -486,14 +481,14 @@ impl BaseSrcImpl for NdiAudioSrc { let state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap(); - if state.current_latency.is_some() { + if let Some(latency) = state.current_latency { let min = if settings.timestamp_mode != TimestampMode::Timecode { - state.current_latency + latency } else { - 0.into() + gst::ClockTime::ZERO }; - let max = 5 * state.current_latency; + let max = 5 * latency; gst_debug!( self.cat, @@ -512,11 +507,11 @@ impl BaseSrcImpl for NdiAudioSrc { } } - fn fixate(&self, element: &gst_base::BaseSrc, mut caps: gst::Caps) -> gst::Caps { + fn fixate(&self, element: &Self::Type, mut caps: gst::Caps) -> gst::Caps { caps.truncate(); { let caps = caps.make_mut(); - let s = caps.get_mut_structure(0).unwrap(); + let s = caps.structure_mut(0).unwrap(); s.fixate_field_nearest_int("rate", 48_000); s.fixate_field_nearest_int("channels", 2); } @@ -526,7 +521,7 @@ impl BaseSrcImpl for NdiAudioSrc { fn create( &self, - element: &gst_base::BaseSrc, + element: &Self::Type, _offset: u64, _buffer: Option<&mut gst::BufferRef>, _length: u32, @@ -548,7 +543,7 @@ impl BaseSrcImpl for NdiAudioSrc { state.receiver = Some(recv); if state.info.as_ref() != Some(&info) { let caps = info.to_caps().map_err(|_| { - gst_element_error!( + gst::element_error!( element, gst::ResourceError::Settings, ["Invalid audio info received: {:?}", info] @@ -556,11 +551,11 @@ impl BaseSrcImpl for NdiAudioSrc { gst::FlowError::NotNegotiated })?; state.info = Some(info); - state.current_latency = buffer.get_duration(); + state.current_latency = buffer.duration(); drop(state); gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); element.set_caps(&caps).map_err(|_| { - gst_element_error!( + gst::element_error!( element, gst::CoreError::Negotiation, ["Failed to negotiate caps: {:?}", caps] @@ -580,12 +575,3 @@ impl BaseSrcImpl for NdiAudioSrc { } } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "ndiaudiosrc", - gst::Rank::None, - NdiAudioSrc::get_type(), - ) -} diff --git a/src/ndiaudiosrc/mod.rs b/src/ndiaudiosrc/mod.rs new file mode 100644 index 00000000..11b94849 --- /dev/null +++ b/src/ndiaudiosrc/mod.rs @@ -0,0 +1,19 @@ +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct NdiAudioSrc(ObjectSubclass) @extends gst_base::BaseSrc, gst::Element, gst::Object; +} + +unsafe impl Send for NdiAudioSrc {} +unsafe impl Sync for NdiAudioSrc {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ndiaudiosrc", + gst::Rank::None, + NdiAudioSrc::static_type(), + ) +} diff --git a/src/ndisink.rs b/src/ndisink/imp.rs similarity index 52% rename from src/ndisink.rs rename to src/ndisink/imp.rs index b4f5a0bb..fc23ff01 100644 --- a/src/ndisink.rs +++ b/src/ndisink/imp.rs @@ -1,15 +1,15 @@ -use glib::subclass; use glib::subclass::prelude::*; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{gst_debug, gst_error, gst_error_msg, gst_info, gst_loggable_error, gst_trace}; -use gst_base::{subclass::prelude::*, BaseSinkExtManual}; +use gst::{gst_debug, gst_error, gst_info, gst_trace}; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; use std::sync::Mutex; use once_cell::sync::Lazy; -use super::ndi::SendInstance; +use crate::ndi::SendInstance; static DEFAULT_SENDER_NDI_NAME: Lazy = Lazy::new(|| { format!( @@ -32,16 +32,6 @@ impl Default for Settings { } } -static PROPERTIES: [subclass::Property; 1] = [subclass::Property("ndi-name", |name| { - glib::ParamSpec::string( - name, - "NDI Name", - "NDI Name to use", - Some(DEFAULT_SENDER_NDI_NAME.as_ref()), - glib::ParamFlags::READWRITE, - ) -})]; - struct State { send: SendInstance, video_info: Option, @@ -57,13 +47,11 @@ static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new("ndisink", gst::DebugColorFlags::empty(), Some("NDI Sink")) }); +#[glib::object_subclass] impl ObjectSubclass for NdiSink { const NAME: &'static str = "NdiSink"; + type Type = super::NdiSink; type ParentType = gst_base::BaseSink; - type Instance = gst::subclass::ElementInstanceStruct; - type Class = subclass::simple::ClassStruct; - - glib::glib_object_subclass!(); fn new() -> Self { Self { @@ -71,106 +59,129 @@ impl ObjectSubclass for NdiSink { state: Mutex::new(Default::default()), } } - - fn class_init(klass: &mut subclass::simple::ClassStruct) { - klass.set_metadata( - "NDI Sink", - "Sink/Audio/Video", - "Render as an NDI stream", - "Sebastian Dröge ", - ); - - let caps = gst::Caps::builder_full() - .structure( - gst::Structure::builder("video/x-raw") - .field( - "format", - &gst::List::new(&[ - &gst_video::VideoFormat::Uyvy.to_str(), - &gst_video::VideoFormat::I420.to_str(), - &gst_video::VideoFormat::Nv12.to_str(), - &gst_video::VideoFormat::Nv21.to_str(), - &gst_video::VideoFormat::Yv12.to_str(), - &gst_video::VideoFormat::Bgra.to_str(), - &gst_video::VideoFormat::Bgrx.to_str(), - &gst_video::VideoFormat::Rgba.to_str(), - &gst_video::VideoFormat::Rgbx.to_str(), - ]), - ) - .field("width", &gst::IntRange::::new(1, std::i32::MAX)) - .field("height", &gst::IntRange::::new(1, std::i32::MAX)) - .field( - "framerate", - &gst::FractionRange::new( - gst::Fraction::new(0, 1), - gst::Fraction::new(std::i32::MAX, 1), - ), - ) - .build(), - ) - .structure( - gst::Structure::builder("audio/x-raw") - .field("format", &gst_audio::AUDIO_FORMAT_S16.to_str()) - .field("rate", &gst::IntRange::::new(1, i32::MAX)) - .field("channels", &gst::IntRange::::new(1, i32::MAX)) - .field("layout", &"interleaved") - .build(), - ) - .build(); - - let sink_pad_template = gst::PadTemplate::new( - "sink", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &caps, - ) - .unwrap(); - klass.add_pad_template(sink_pad_template); - - klass.install_properties(&PROPERTIES); - } } impl ObjectImpl for NdiSink { - glib::glib_object_impl!(); + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![glib::ParamSpec::new_string( + "ndi-name", + "NDI Name", + "NDI Name to use", + Some(DEFAULT_SENDER_NDI_NAME.as_ref()), + glib::ParamFlags::READWRITE, + )] + }); - fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { - let prop = &PROPERTIES[id]; - match *prop { - subclass::Property("ndi-name", ..) => { + PROPERTIES.as_ref() + } + + fn set_property( + &self, + _obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + "ndi-name" => { let mut settings = self.settings.lock().unwrap(); settings.ndi_name = value .get::() - .unwrap() - .unwrap_or_else(|| DEFAULT_SENDER_NDI_NAME.clone()); + .unwrap_or_else(|_| DEFAULT_SENDER_NDI_NAME.clone()); } _ => unimplemented!(), }; } - fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { - let prop = &PROPERTIES[id]; - match *prop { - subclass::Property("ndi-name", ..) => { + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "ndi-name" => { let settings = self.settings.lock().unwrap(); - Ok(settings.ndi_name.to_value()) + settings.ndi_name.to_value() } _ => unimplemented!(), } } } -impl ElementImpl for NdiSink {} +impl ElementImpl for NdiSink { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "NDI Sink", + "Sink/Audio/Video", + "Render as an NDI stream", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::builder_full() + .structure( + gst::Structure::builder("video/x-raw") + .field( + "format", + &gst::List::new(&[ + &gst_video::VideoFormat::Uyvy.to_str(), + &gst_video::VideoFormat::I420.to_str(), + &gst_video::VideoFormat::Nv12.to_str(), + &gst_video::VideoFormat::Nv21.to_str(), + &gst_video::VideoFormat::Yv12.to_str(), + &gst_video::VideoFormat::Bgra.to_str(), + &gst_video::VideoFormat::Bgrx.to_str(), + &gst_video::VideoFormat::Rgba.to_str(), + &gst_video::VideoFormat::Rgbx.to_str(), + ]), + ) + .field("width", &gst::IntRange::::new(1, std::i32::MAX)) + .field("height", &gst::IntRange::::new(1, std::i32::MAX)) + .field( + "framerate", + &gst::FractionRange::new( + gst::Fraction::new(0, 1), + gst::Fraction::new(std::i32::MAX, 1), + ), + ) + .build(), + ) + .structure( + gst::Structure::builder("audio/x-raw") + .field("format", &gst_audio::AUDIO_FORMAT_S16.to_str()) + .field("rate", &gst::IntRange::::new(1, i32::MAX)) + .field("channels", &gst::IntRange::::new(1, i32::MAX)) + .field("layout", &"interleaved") + .build(), + ) + .build(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + vec![sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} impl BaseSinkImpl for NdiSink { - fn start(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { let mut state_storage = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap(); let send = SendInstance::builder(&settings.ndi_name) .build() .ok_or_else(|| { - gst_error_msg!( + gst::error_msg!( gst::ResourceError::OpenWrite, ["Could not create send instance"] ) @@ -187,7 +198,7 @@ impl BaseSinkImpl for NdiSink { Ok(()) } - fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { let mut state_storage = self.state.lock().unwrap(); *state_storage = None; @@ -196,37 +207,33 @@ impl BaseSinkImpl for NdiSink { Ok(()) } - fn unlock(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + fn unlock(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { Ok(()) } - fn unlock_stop(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { + fn unlock_stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { Ok(()) } - fn set_caps( - &self, - element: &gst_base::BaseSink, - caps: &gst::Caps, - ) -> Result<(), gst::LoggableError> { + fn set_caps(&self, element: &Self::Type, caps: &gst::Caps) -> Result<(), gst::LoggableError> { gst_debug!(CAT, obj: element, "Setting caps {}", caps); let mut state_storage = self.state.lock().unwrap(); let state = match &mut *state_storage { - None => return Err(gst_loggable_error!(CAT, "Sink not started yet")), + None => return Err(gst::loggable_error!(CAT, "Sink not started yet")), Some(ref mut state) => state, }; - let s = caps.get_structure(0).unwrap(); - if s.get_name() == "video/x-raw" { + let s = caps.structure(0).unwrap(); + if s.name() == "video/x-raw" { let info = gst_video::VideoInfo::from_caps(caps) - .map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?; + .map_err(|_| gst::loggable_error!(CAT, "Couldn't parse caps {}", caps))?; state.video_info = Some(info); state.audio_info = None; } else { let info = gst_audio::AudioInfo::from_caps(caps) - .map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?; + .map_err(|_| gst::loggable_error!(CAT, "Couldn't parse caps {}", caps))?; state.audio_info = Some(info); state.video_info = None; @@ -237,7 +244,7 @@ impl BaseSinkImpl for NdiSink { fn render( &self, - element: &gst_base::BaseSink, + element: &Self::Type, buffer: &gst::Buffer, ) -> Result { let mut state_storage = self.state.lock().unwrap(); @@ -247,7 +254,7 @@ impl BaseSinkImpl for NdiSink { }; if let Some(ref info) = state.video_info { - if let Some(audio_meta) = buffer.get_meta::() { + if let Some(audio_meta) = buffer.meta::() { for (buffer, info, timecode) in audio_meta.buffers() { let frame = crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, *timecode) @@ -262,9 +269,9 @@ impl BaseSinkImpl for NdiSink { "Sending audio buffer {:?} with timecode {} and format {:?}", buffer, if *timecode < 0 { - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE.display() } else { - gst::ClockTime::from(*timecode as u64 * 100) + Some(gst::ClockTime::from_nseconds(*timecode as u64 * 100)).display() }, info, ); @@ -273,15 +280,18 @@ impl BaseSinkImpl for NdiSink { } // Skip empty/gap buffers from ndisinkcombiner - if buffer.get_size() != 0 { + if buffer.size() != 0 { let timecode = element - .get_segment() + .segment() .downcast::() .ok() .and_then(|segment| { - *(segment.to_running_time(buffer.get_pts()) + element.get_base_time()) + segment + .to_running_time(buffer.pts()) + .zip(element.base_time()) }) - .map(|time| (time / 100) as i64) + .and_then(|(running_time, base_time)| running_time.checked_add(base_time)) + .map(|time| (time.nseconds() / 100) as i64) .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, info) @@ -302,9 +312,9 @@ impl BaseSinkImpl for NdiSink { "Sending video buffer {:?} with timecode {} and format {:?}", buffer, if timecode < 0 { - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE.display() } else { - gst::ClockTime::from(timecode as u64 * 100) + Some(gst::ClockTime::from_nseconds(timecode as u64 * 100)).display() }, info ); @@ -312,13 +322,16 @@ impl BaseSinkImpl for NdiSink { } } else if let Some(ref info) = state.audio_info { let timecode = element - .get_segment() + .segment() .downcast::() .ok() .and_then(|segment| { - *(segment.to_running_time(buffer.get_pts()) + element.get_base_time()) + segment + .to_running_time(buffer.pts()) + .zip(element.base_time()) }) - .map(|time| (time / 100) as i64) + .and_then(|(running_time, base_time)| running_time.checked_add(base_time)) + .map(|time| (time.nseconds() / 100) as i64) .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); let frame = crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, timecode) @@ -333,9 +346,9 @@ impl BaseSinkImpl for NdiSink { "Sending audio buffer {:?} with timecode {} and format {:?}", buffer, if timecode < 0 { - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE.display() } else { - gst::ClockTime::from(timecode as u64 * 100) + Some(gst::ClockTime::from_nseconds(timecode as u64 * 100)).display() }, info, ); @@ -347,12 +360,3 @@ impl BaseSinkImpl for NdiSink { Ok(gst::FlowSuccess::Ok) } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "ndisink", - gst::Rank::None, - NdiSink::get_type(), - ) -} diff --git a/src/ndisink/mod.rs b/src/ndisink/mod.rs new file mode 100644 index 00000000..8d6d955a --- /dev/null +++ b/src/ndisink/mod.rs @@ -0,0 +1,19 @@ +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct NdiSink(ObjectSubclass) @extends gst_base::BaseSink, gst::Element, gst::Object; +} + +unsafe impl Send for NdiSink {} +unsafe impl Sync for NdiSink {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ndisink", + gst::Rank::None, + NdiSink::static_type(), + ) +} diff --git a/src/ndisinkcombiner.rs b/src/ndisinkcombiner/imp.rs similarity index 69% rename from src/ndisinkcombiner.rs rename to src/ndisinkcombiner/imp.rs index ddc91798..9ba0e28a 100644 --- a/src/ndisinkcombiner.rs +++ b/src/ndisinkcombiner/imp.rs @@ -1,5 +1,4 @@ use glib::prelude::*; -use glib::subclass; use glib::subclass::prelude::*; use gst::prelude::*; use gst::subclass::prelude::*; @@ -7,6 +6,8 @@ use gst::{gst_debug, gst_error, gst_trace, gst_warning}; use gst_base::prelude::*; use gst_base::subclass::prelude::*; +use once_cell::sync::Lazy; + use std::mem; use std::sync::Mutex; @@ -27,92 +28,20 @@ struct State { current_audio_buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>, } -struct NdiSinkCombiner { +pub struct NdiSinkCombiner { video_pad: gst_base::AggregatorPad, audio_pad: Mutex>, state: Mutex>, } +#[glib::object_subclass] impl ObjectSubclass for NdiSinkCombiner { const NAME: &'static str = "NdiSinkCombiner"; + type Type = super::NdiSinkCombiner; type ParentType = gst_base::Aggregator; - type Instance = gst::subclass::ElementInstanceStruct; - type Class = subclass::simple::ClassStruct; - - glib::glib_object_subclass!(); - - fn class_init(klass: &mut subclass::simple::ClassStruct) { - klass.set_metadata( - "NDI Sink Combiner", - "Combiner/Audio/Video", - "NDI sink audio/video combiner", - "Sebastian Dröge ", - ); - - let caps = gst::Caps::builder("video/x-raw") - .field( - "format", - &gst::List::new(&[ - &gst_video::VideoFormat::Uyvy.to_str(), - &gst_video::VideoFormat::I420.to_str(), - &gst_video::VideoFormat::Nv12.to_str(), - &gst_video::VideoFormat::Nv21.to_str(), - &gst_video::VideoFormat::Yv12.to_str(), - &gst_video::VideoFormat::Bgra.to_str(), - &gst_video::VideoFormat::Bgrx.to_str(), - &gst_video::VideoFormat::Rgba.to_str(), - &gst_video::VideoFormat::Rgbx.to_str(), - ]), - ) - .field("width", &gst::IntRange::::new(1, i32::MAX)) - .field("height", &gst::IntRange::::new(1, i32::MAX)) - .field( - "framerate", - &gst::FractionRange::new( - gst::Fraction::new(1, i32::MAX), - gst::Fraction::new(i32::MAX, 1), - ), - ) - .build(); - let src_pad_template = gst::PadTemplate::with_gtype( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - gst_base::AggregatorPad::static_type(), - ) - .unwrap(); - klass.add_pad_template(src_pad_template); - - let sink_pad_template = gst::PadTemplate::with_gtype( - "video", - gst::PadDirection::Sink, - gst::PadPresence::Always, - &caps, - gst_base::AggregatorPad::static_type(), - ) - .unwrap(); - klass.add_pad_template(sink_pad_template); - - let caps = gst::Caps::builder("audio/x-raw") - .field("format", &gst_audio::AUDIO_FORMAT_S16.to_str()) - .field("rate", &gst::IntRange::::new(1, i32::MAX)) - .field("channels", &gst::IntRange::::new(1, i32::MAX)) - .field("layout", &"interleaved") - .build(); - let sink_pad_template = gst::PadTemplate::with_gtype( - "audio", - gst::PadDirection::Sink, - gst::PadPresence::Request, - &caps, - gst_base::AggregatorPad::static_type(), - ) - .unwrap(); - klass.add_pad_template(sink_pad_template); - } fn with_class(klass: &Self::Class) -> Self { - let templ = klass.get_pad_template("video").unwrap(); + let templ = klass.pad_template("video").unwrap(); let video_pad = gst::PadBuilder::::from_template(&templ, Some("video")) .build(); @@ -126,18 +55,97 @@ impl ObjectSubclass for NdiSinkCombiner { } impl ObjectImpl for NdiSinkCombiner { - glib::glib_object_impl!(); - - fn constructed(&self, obj: &glib::Object) { - let element = obj.downcast_ref::().unwrap(); - element.add_pad(&self.video_pad).unwrap(); + fn constructed(&self, obj: &Self::Type) { + obj.add_pad(&self.video_pad).unwrap(); self.parent_constructed(obj); } } impl ElementImpl for NdiSinkCombiner { - fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "NDI Sink Combiner", + "Combiner/Audio/Video", + "NDI sink audio/video combiner", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::builder("video/x-raw") + .field( + "format", + &gst::List::new(&[ + &gst_video::VideoFormat::Uyvy.to_str(), + &gst_video::VideoFormat::I420.to_str(), + &gst_video::VideoFormat::Nv12.to_str(), + &gst_video::VideoFormat::Nv21.to_str(), + &gst_video::VideoFormat::Yv12.to_str(), + &gst_video::VideoFormat::Bgra.to_str(), + &gst_video::VideoFormat::Bgrx.to_str(), + &gst_video::VideoFormat::Rgba.to_str(), + &gst_video::VideoFormat::Rgbx.to_str(), + ]), + ) + .field("width", &gst::IntRange::::new(1, i32::MAX)) + .field("height", &gst::IntRange::::new(1, i32::MAX)) + .field( + "framerate", + &gst::FractionRange::new( + gst::Fraction::new(1, i32::MAX), + gst::Fraction::new(i32::MAX, 1), + ), + ) + .build(); + let src_pad_template = gst::PadTemplate::with_gtype( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + gst_base::AggregatorPad::static_type(), + ) + .unwrap(); + + let video_sink_pad_template = gst::PadTemplate::with_gtype( + "video", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + gst_base::AggregatorPad::static_type(), + ) + .unwrap(); + + let caps = gst::Caps::builder("audio/x-raw") + .field("format", &gst_audio::AUDIO_FORMAT_S16.to_str()) + .field("rate", &gst::IntRange::::new(1, i32::MAX)) + .field("channels", &gst::IntRange::::new(1, i32::MAX)) + .field("layout", &"interleaved") + .build(); + let audio_sink_pad_template = gst::PadTemplate::with_gtype( + "audio", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + gst_base::AggregatorPad::static_type(), + ) + .unwrap(); + vec![ + src_pad_template, + video_sink_pad_template, + audio_sink_pad_template, + ] + }); + + PAD_TEMPLATES.as_ref() + } + + fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) { let mut audio_pad_storage = self.audio_pad.lock().unwrap(); if audio_pad_storage.as_ref().map(|p| p.upcast_ref()) == Some(pad) { @@ -151,7 +159,7 @@ impl ElementImpl for NdiSinkCombiner { impl AggregatorImpl for NdiSinkCombiner { fn create_new_pad( &self, - agg: &gst_base::Aggregator, + agg: &Self::Type, templ: &gst::PadTemplate, _req_name: Option<&str>, _caps: Option<&gst::Caps>, @@ -163,7 +171,7 @@ impl AggregatorImpl for NdiSinkCombiner { return None; } - let sink_templ = agg.get_pad_template("audio").unwrap(); + let sink_templ = agg.pad_template("audio").unwrap(); if templ != &sink_templ { gst_error!(CAT, obj: agg, "Wrong pad template"); return None; @@ -178,7 +186,7 @@ impl AggregatorImpl for NdiSinkCombiner { Some(pad) } - fn start(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> { + fn start(&self, agg: &Self::Type) -> Result<(), gst::ErrorMessage> { let mut state_storage = self.state.lock().unwrap(); *state_storage = Some(State { audio_info: None, @@ -192,7 +200,7 @@ impl AggregatorImpl for NdiSinkCombiner { Ok(()) } - fn stop(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> { + fn stop(&self, agg: &Self::Type) -> Result<(), gst::ErrorMessage> { // Drop our state now let _ = self.state.lock().unwrap().take(); @@ -201,18 +209,18 @@ impl AggregatorImpl for NdiSinkCombiner { Ok(()) } - fn get_next_time(&self, _agg: &gst_base::Aggregator) -> gst::ClockTime { + fn next_time(&self, _agg: &Self::Type) -> Option { // FIXME: What to do here? We don't really know when the next buffer is expected - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE } fn clip( &self, - agg: &gst_base::Aggregator, + agg: &Self::Type, agg_pad: &gst_base::AggregatorPad, mut buffer: gst::Buffer, ) -> Option { - let segment = match agg_pad.get_segment().downcast::() { + let segment = match agg_pad.segment().downcast::() { Ok(segment) => segment, Err(_) => { gst_error!(CAT, obj: agg, "Only TIME segments supported"); @@ -220,25 +228,21 @@ impl AggregatorImpl for NdiSinkCombiner { } }; - let pts = buffer.get_pts(); + let pts = buffer.pts(); if pts.is_none() { gst_error!(CAT, obj: agg, "Only buffers with PTS supported"); return Some(buffer); } - let duration = if buffer.get_duration().is_some() { - buffer.get_duration() - } else { - gst::CLOCK_TIME_NONE - }; + let duration = buffer.duration(); gst_trace!( CAT, obj: agg_pad, "Clipping buffer {:?} with PTS {} and duration {}", buffer, - pts, - duration + pts.display(), + duration.display(), ); let state_storage = self.state.lock().unwrap(); @@ -247,25 +251,21 @@ impl AggregatorImpl for NdiSinkCombiner { None => return None, }; - let duration = if buffer.get_duration().is_some() { - buffer.get_duration() + let duration = if duration.is_some() { + duration } else if let Some(ref audio_info) = state.audio_info { - gst::SECOND - .mul_div_floor( - buffer.get_size() as u64, - audio_info.rate() as u64 * audio_info.bpf() as u64, - ) - .unwrap() + gst::ClockTime::SECOND.mul_div_floor( + buffer.size() as u64, + audio_info.rate() as u64 * audio_info.bpf() as u64, + ) } else if let Some(ref video_info) = state.video_info { if *video_info.fps().numer() > 0 { - gst::SECOND - .mul_div_floor( - *video_info.fps().denom() as u64, - *video_info.fps().numer() as u64, - ) - .unwrap() + gst::ClockTime::SECOND.mul_div_floor( + *video_info.fps().denom() as u64, + *video_info.fps().numer() as u64, + ) } else { - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE } } else { unreachable!() @@ -276,18 +276,23 @@ impl AggregatorImpl for NdiSinkCombiner { obj: agg_pad, "Clipping buffer {:?} with PTS {} and duration {}", buffer, - pts, - duration + pts.display(), + duration.display(), ); if agg_pad == &self.video_pad { - segment.clip(pts, pts + duration).map(|(start, stop)| { + let end_pts = pts + .zip(duration) + .and_then(|(pts, duration)| pts.checked_add(duration)); + + segment.clip(pts, end_pts).map(|(start, stop)| { { let buffer = buffer.make_mut(); buffer.set_pts(start); - if duration.is_some() { - buffer.set_duration(stop - start); - } + buffer.set_duration( + stop.zip(start) + .and_then(|(stop, start)| stop.checked_sub(start)), + ); } buffer @@ -307,7 +312,7 @@ impl AggregatorImpl for NdiSinkCombiner { fn aggregate( &self, - agg: &gst_base::Aggregator, + agg: &Self::Type, timeout: bool, ) -> Result { // FIXME: Can't really happen because we always return NONE from get_next_time() but that @@ -318,7 +323,7 @@ impl AggregatorImpl for NdiSinkCombiner { // first try getting buffers from both pads here let video_buffer_and_segment = match self.video_pad.peek_buffer() { Some(video_buffer) => { - let video_segment = self.video_pad.get_segment(); + let video_segment = self.video_pad.segment(); let video_segment = match video_segment.downcast::() { Ok(video_segment) => video_segment, Err(video_segment) => { @@ -326,7 +331,7 @@ impl AggregatorImpl for NdiSinkCombiner { CAT, obj: agg, "Video segment of wrong format {:?}", - video_segment.get_format() + video_segment.format() ); return Err(gst::FlowError::Error); } @@ -344,14 +349,14 @@ impl AggregatorImpl for NdiSinkCombiner { let audio_buffer_segment_and_pad; if let Some(audio_pad) = self.audio_pad.lock().unwrap().clone() { audio_buffer_segment_and_pad = match audio_pad.peek_buffer() { - Some(audio_buffer) if audio_buffer.get_size() == 0 => { + Some(audio_buffer) if audio_buffer.size() == 0 => { // Skip empty/gap audio buffer audio_pad.drop_buffer(); gst_trace!(CAT, obj: agg, "Empty audio buffer, waiting for next"); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } Some(audio_buffer) => { - let audio_segment = audio_pad.get_segment(); + let audio_segment = audio_pad.segment(); let audio_segment = match audio_segment.downcast::() { Ok(audio_segment) => audio_segment, Err(audio_segment) => { @@ -359,7 +364,7 @@ impl AggregatorImpl for NdiSinkCombiner { CAT, obj: agg, "Audio segment of wrong format {:?}", - audio_segment.get_format() + audio_segment.format() ); return Err(gst::FlowError::Error); } @@ -385,8 +390,7 @@ impl AggregatorImpl for NdiSinkCombiner { let (mut current_video_buffer, current_video_running_time_end, next_video_buffer) = if let Some((video_buffer, video_segment)) = video_buffer_and_segment { - let video_running_time = video_segment.to_running_time(video_buffer.get_pts()); - assert!(video_running_time.is_some()); + let video_running_time = video_segment.to_running_time(video_buffer.pts()).unwrap(); match state.current_video_buffer { None => { @@ -398,7 +402,7 @@ impl AggregatorImpl for NdiSinkCombiner { } Some((ref buffer, _)) => ( buffer.clone(), - video_running_time, + Some(video_running_time), Some((video_buffer, video_running_time)), ), } @@ -416,10 +420,9 @@ impl AggregatorImpl for NdiSinkCombiner { // Create an empty dummy buffer for attaching the audio. This is going to // be dropped by the sink later. let audio_running_time = - audio_segment.to_running_time(audio_buffer.get_pts()); - assert!(audio_running_time.is_some()); + audio_segment.to_running_time(audio_buffer.pts()).unwrap(); - let video_segment = self.video_pad.get_segment(); + let video_segment = self.video_pad.segment(); let video_segment = match video_segment.downcast::() { Ok(video_segment) => video_segment, Err(video_segment) => { @@ -427,7 +430,7 @@ impl AggregatorImpl for NdiSinkCombiner { CAT, obj: agg, "Video segment of wrong format {:?}", - video_segment.get_format() + video_segment.format() ); return Err(gst::FlowError::Error); } @@ -445,9 +448,9 @@ impl AggregatorImpl for NdiSinkCombiner { buffer.set_pts(video_pts); } - (buffer, gst::CLOCK_TIME_NONE, None) + (buffer, gst::ClockTime::NONE, None) } - (Some((ref buffer, _)), _) => (buffer.clone(), gst::CLOCK_TIME_NONE, None), + (Some((ref buffer, _)), _) => (buffer.clone(), gst::ClockTime::NONE, None), } }; @@ -460,22 +463,26 @@ impl AggregatorImpl for NdiSinkCombiner { } }; - let audio_running_time = audio_segment.to_running_time(audio_buffer.get_pts()); - assert!(audio_running_time.is_some()); - let duration = gst::SECOND - .mul_div_floor( - audio_buffer.get_size() as u64 / audio_info.bpf() as u64, - audio_info.rate() as u64, - ) - .unwrap_or(gst::CLOCK_TIME_NONE); - let audio_running_time_end = audio_running_time + duration; - assert!(audio_running_time_end.is_some()); + let audio_running_time = audio_segment.to_running_time(audio_buffer.pts()); + let duration = gst::ClockTime::SECOND.mul_div_floor( + audio_buffer.size() as u64 / audio_info.bpf() as u64, + audio_info.rate() as u64, + ); + let audio_running_time_end = audio_running_time + .zip(duration) + .and_then(|(running_time, duration)| running_time.checked_add(duration)); - if audio_running_time_end <= current_video_running_time_end - || current_video_running_time_end.is_none() + if audio_running_time_end + .zip(current_video_running_time_end) + .map(|(audio, video)| audio <= video) + .unwrap_or(true) { - let timecode = (audio_running_time + agg.get_base_time()) - .map(|t| (t / 100) as i64) + let timecode = agg + .base_time() + .zip(audio_running_time) + .map(|(base_time, audio_running_time)| { + ((base_time.nseconds() + audio_running_time.nseconds()) / 100) as i64 + }) .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); gst_trace!( @@ -484,8 +491,8 @@ impl AggregatorImpl for NdiSinkCombiner { "Including audio buffer {:?} with timecode {}: {} <= {}", audio_buffer, timecode, - audio_running_time_end, - current_video_running_time_end, + audio_running_time_end.display(), + current_video_running_time_end.display(), ); state .current_audio_buffers @@ -503,7 +510,7 @@ impl AggregatorImpl for NdiSinkCombiner { // far } - let audio_buffers = mem::replace(&mut state.current_audio_buffers, Vec::new()); + let audio_buffers = mem::take(&mut state.current_audio_buffers); if !audio_buffers.is_empty() { let current_video_buffer = current_video_buffer.make_mut(); @@ -530,7 +537,7 @@ impl AggregatorImpl for NdiSinkCombiner { fn sink_event( &self, - agg: &gst_base::Aggregator, + agg: &Self::Type, pad: &gst_base::AggregatorPad, event: gst::Event, ) -> bool { @@ -538,7 +545,7 @@ impl AggregatorImpl for NdiSinkCombiner { match event.view() { EventView::Caps(caps) => { - let caps = caps.get_caps_owned(); + let caps = caps.caps_owned(); let mut state_storage = self.state.lock().unwrap(); let state = match &mut *state_storage { @@ -558,22 +565,22 @@ impl AggregatorImpl for NdiSinkCombiner { // 2 frames latency because we queue 1 frame and wait until audio // up to the end of that frame has arrived. let latency = if *info.fps().numer() > 0 { - gst::SECOND + gst::ClockTime::SECOND .mul_div_floor( 2 * *info.fps().denom() as u64, *info.fps().numer() as u64, ) - .unwrap_or(80 * gst::MSECOND) + .unwrap_or(80 * gst::ClockTime::MSECOND) } else { // let's assume 25fps and 2 frames latency - 80 * gst::MSECOND + 80 * gst::ClockTime::MSECOND }; state.video_info = Some(info); drop(state_storage); - agg.set_latency(latency, gst::CLOCK_TIME_NONE); + agg.set_latency(latency, gst::ClockTime::NONE); // The video caps are passed through as the audio is included only in a meta agg.set_src_caps(&caps); @@ -591,7 +598,7 @@ impl AggregatorImpl for NdiSinkCombiner { } // The video segment is passed through as-is and the video timestamps are preserved EventView::Segment(segment) if pad == &self.video_pad => { - let segment = segment.get_segment(); + let segment = segment.segment(); gst_debug!(CAT, obj: agg, "Updating segment {:?}", segment); agg.update_segment(segment); } @@ -603,7 +610,7 @@ impl AggregatorImpl for NdiSinkCombiner { fn sink_query( &self, - agg: &gst_base::Aggregator, + agg: &Self::Type, pad: &gst_base::AggregatorPad, query: &mut gst::QueryRef, ) -> bool { @@ -612,7 +619,7 @@ impl AggregatorImpl for NdiSinkCombiner { match query.view_mut() { QueryView::Caps(_) if pad == &self.video_pad => { // Directly forward caps queries - let srcpad = agg.get_static_pad("src").unwrap(); + let srcpad = agg.static_pad("src").unwrap(); return srcpad.peer_query(query); } _ => (), @@ -621,17 +628,8 @@ impl AggregatorImpl for NdiSinkCombiner { self.parent_sink_query(agg, pad, query) } - fn negotiate(&self, _agg: &gst_base::Aggregator) -> bool { + fn negotiate(&self, _agg: &Self::Type) -> bool { // No negotiation needed as the video caps are just passed through true } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "ndisinkcombiner", - gst::Rank::None, - NdiSinkCombiner::get_type(), - ) -} diff --git a/src/ndisinkcombiner/mod.rs b/src/ndisinkcombiner/mod.rs new file mode 100644 index 00000000..b86c4cab --- /dev/null +++ b/src/ndisinkcombiner/mod.rs @@ -0,0 +1,19 @@ +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct NdiSinkCombiner(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object; +} + +unsafe impl Send for NdiSinkCombiner {} +unsafe impl Sync for NdiSinkCombiner {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ndisinkcombiner", + gst::Rank::None, + NdiSinkCombiner::static_type(), + ) +} diff --git a/src/ndisinkmeta.rs b/src/ndisinkmeta.rs index f53449ec..14aee926 100644 --- a/src/ndisinkmeta.rs +++ b/src/ndisinkmeta.rs @@ -1,4 +1,3 @@ -use gst::gst_sys; use gst::prelude::*; use std::fmt; use std::mem; @@ -19,10 +18,10 @@ impl NdiSinkAudioMeta { // content of the struct let mut params = mem::ManuallyDrop::new(imp::NdiSinkAudioMetaParams { buffers }); - let meta = gst_sys::gst_buffer_add_meta( + let meta = gst::ffi::gst_buffer_add_meta( buffer.as_mut_ptr(), imp::ndi_sink_audio_meta_get_info(), - &mut *params as *mut imp::NdiSinkAudioMetaParams as glib::glib_sys::gpointer, + &mut *params as *mut imp::NdiSinkAudioMetaParams as glib::ffi::gpointer, ) as *mut imp::NdiSinkAudioMeta; Self::from_mut_ptr(buffer, meta) @@ -37,7 +36,7 @@ impl NdiSinkAudioMeta { unsafe impl MetaAPI for NdiSinkAudioMeta { type GstType = imp::NdiSinkAudioMeta; - fn get_meta_api() -> glib::Type { + fn meta_api() -> glib::Type { imp::ndi_sink_audio_meta_api_get_type() } } @@ -51,9 +50,7 @@ impl fmt::Debug for NdiSinkAudioMeta { } mod imp { - use glib::glib_sys; use glib::translate::*; - use gst::gst_sys; use once_cell::sync::Lazy; use std::mem; use std::ptr; @@ -64,18 +61,18 @@ mod imp { #[repr(C)] pub struct NdiSinkAudioMeta { - parent: gst_sys::GstMeta, + parent: gst::ffi::GstMeta, pub(super) buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>, } pub(super) fn ndi_sink_audio_meta_api_get_type() -> glib::Type { static TYPE: Lazy = Lazy::new(|| unsafe { - let t = from_glib(gst_sys::gst_meta_api_type_register( + let t = from_glib(gst::ffi::gst_meta_api_type_register( b"GstNdiSinkAudioMetaAPI\0".as_ptr() as *const _, [ptr::null::()].as_ptr() as *mut *const _, )); - assert_ne!(t, glib::Type::Invalid); + assert_ne!(t, glib::Type::INVALID); t }); @@ -84,10 +81,10 @@ mod imp { } unsafe extern "C" fn ndi_sink_audio_meta_init( - meta: *mut gst_sys::GstMeta, - params: glib_sys::gpointer, - _buffer: *mut gst_sys::GstBuffer, - ) -> glib_sys::gboolean { + meta: *mut gst::ffi::GstMeta, + params: glib::ffi::gpointer, + _buffer: *mut gst::ffi::GstBuffer, + ) -> glib::ffi::gboolean { assert!(!params.is_null()); let meta = &mut *(meta as *mut NdiSinkAudioMeta); @@ -95,12 +92,12 @@ mod imp { ptr::write(&mut meta.buffers, params.buffers); - true.to_glib() + true.into_glib() } unsafe extern "C" fn ndi_sink_audio_meta_free( - meta: *mut gst_sys::GstMeta, - _buffer: *mut gst_sys::GstBuffer, + meta: *mut gst::ffi::GstMeta, + _buffer: *mut gst::ffi::GstBuffer, ) { let meta = &mut *(meta as *mut NdiSinkAudioMeta); @@ -108,34 +105,34 @@ mod imp { } unsafe extern "C" fn ndi_sink_audio_meta_transform( - dest: *mut gst_sys::GstBuffer, - meta: *mut gst_sys::GstMeta, - _buffer: *mut gst_sys::GstBuffer, - _type_: glib_sys::GQuark, - _data: glib_sys::gpointer, - ) -> glib_sys::gboolean { + dest: *mut gst::ffi::GstBuffer, + meta: *mut gst::ffi::GstMeta, + _buffer: *mut gst::ffi::GstBuffer, + _type_: glib::ffi::GQuark, + _data: glib::ffi::gpointer, + ) -> glib::ffi::gboolean { let meta = &*(meta as *mut NdiSinkAudioMeta); super::NdiSinkAudioMeta::add(gst::BufferRef::from_mut_ptr(dest), meta.buffers.clone()); - true.to_glib() + true.into_glib() } - pub(super) fn ndi_sink_audio_meta_get_info() -> *const gst_sys::GstMetaInfo { - struct MetaInfo(ptr::NonNull); + pub(super) fn ndi_sink_audio_meta_get_info() -> *const gst::ffi::GstMetaInfo { + struct MetaInfo(ptr::NonNull); unsafe impl Send for MetaInfo {} unsafe impl Sync for MetaInfo {} static META_INFO: Lazy = Lazy::new(|| unsafe { MetaInfo( - ptr::NonNull::new(gst_sys::gst_meta_register( - ndi_sink_audio_meta_api_get_type().to_glib(), + ptr::NonNull::new(gst::ffi::gst_meta_register( + ndi_sink_audio_meta_api_get_type().into_glib(), b"GstNdiSinkAudioMeta\0".as_ptr() as *const _, mem::size_of::(), Some(ndi_sink_audio_meta_init), Some(ndi_sink_audio_meta_free), Some(ndi_sink_audio_meta_transform), - ) as *mut gst_sys::GstMetaInfo) + ) as *mut gst::ffi::GstMetaInfo) .expect("Failed to register meta API"), ) }); diff --git a/src/ndivideosrc.rs b/src/ndivideosrc/imp.rs similarity index 59% rename from src/ndivideosrc.rs rename to src/ndivideosrc/imp.rs index 7e41d231..d30ee802 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc/imp.rs @@ -1,7 +1,6 @@ -use glib::subclass; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg}; +use gst::{gst_debug, gst_error}; use gst_base::prelude::*; use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; @@ -9,6 +8,8 @@ use gst_base::subclass::prelude::*; use std::sync::Mutex; use std::{i32, u32}; +use once_cell::sync::Lazy; + use crate::ndisys; use crate::connect_ndi; @@ -47,93 +48,9 @@ impl Default for Settings { } } -static PROPERTIES: [subclass::Property; 8] = [ - subclass::Property("ndi-name", |name| { - glib::ParamSpec::string( - name, - "NDI Name", - "NDI stream name of the sender", - None, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("url-address", |name| { - glib::ParamSpec::string( - name, - "URL/Address", - "URL/address and port of the sender, e.g. 127.0.0.1:5961", - None, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("receiver-ndi-name", |name| { - glib::ParamSpec::string( - name, - "Receiver NDI Name", - "NDI stream name of this receiver", - Some(&*DEFAULT_RECEIVER_NDI_NAME), - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("connect-timeout", |name| { - glib::ParamSpec::uint( - name, - "Connect Timeout", - "Connection timeout in ms", - 0, - u32::MAX, - 10000, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("timeout", |name| { - glib::ParamSpec::uint( - name, - "Timeout", - "Receive timeout in ms", - 0, - u32::MAX, - 5000, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("max-queue-length", |name| { - glib::ParamSpec::uint( - name, - "Max Queue Length", - "Maximum receive queue length", - 0, - u32::MAX, - 5, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("bandwidth", |name| { - glib::ParamSpec::int( - name, - "Bandwidth", - "Bandwidth, -10 metadata-only, 10 audio-only, 100 highest", - -10, - 100, - 100, - glib::ParamFlags::READWRITE, - ) - }), - subclass::Property("timestamp-mode", |name| { - glib::ParamSpec::enum_( - name, - "Timestamp Mode", - "Timestamp information to use for outgoing PTS", - TimestampMode::static_type(), - TimestampMode::ReceiveTimeTimecode as i32, - glib::ParamFlags::READWRITE, - ) - }), -]; - struct State { info: Option, - current_latency: gst::ClockTime, + current_latency: Option, receiver: Option>, } @@ -141,26 +58,24 @@ impl Default for State { fn default() -> State { State { info: None, - current_latency: gst::CLOCK_TIME_NONE, + current_latency: gst::ClockTime::NONE, receiver: None, } } } -pub(crate) struct NdiVideoSrc { +pub struct NdiVideoSrc { cat: gst::DebugCategory, settings: Mutex, state: Mutex, receiver_controller: Mutex>>, } +#[glib::object_subclass] impl ObjectSubclass for NdiVideoSrc { const NAME: &'static str = "NdiVideoSrc"; + type Type = super::NdiVideoSrc; type ParentType = gst_base::BaseSrc; - type Instance = gst::subclass::ElementInstanceStruct; - type Class = subclass::simple::ClassStruct; - - glib::glib_object_subclass!(); fn new() -> Self { Self { @@ -174,123 +89,130 @@ impl ObjectSubclass for NdiVideoSrc { receiver_controller: Mutex::new(None), } } - - fn class_init(klass: &mut subclass::simple::ClassStruct) { - klass.set_metadata( - "NewTek NDI Video Source", - "Source", - "NewTek NDI video source", - "Ruben Gonzalez , Daniel Vilar , Sebastian Dröge ", - ); - - // On the src pad, we can produce F32/F64 with any sample rate - // and any number of channels - let caps = gst::Caps::new_simple( - "video/x-raw", - &[ - ( - "format", - &gst::List::new(&[ - &gst_video::VideoFormat::Uyvy.to_string(), - &gst_video::VideoFormat::Yv12.to_string(), - &gst_video::VideoFormat::Nv12.to_string(), - &gst_video::VideoFormat::I420.to_string(), - &gst_video::VideoFormat::Bgra.to_string(), - &gst_video::VideoFormat::Bgrx.to_string(), - &gst_video::VideoFormat::Rgba.to_string(), - &gst_video::VideoFormat::Rgbx.to_string(), - ]), - ), - ("width", &gst::IntRange::::new(0, i32::MAX)), - ("height", &gst::IntRange::::new(0, i32::MAX)), - ( - "framerate", - &gst::FractionRange::new( - gst::Fraction::new(0, 1), - gst::Fraction::new(i32::MAX, 1), - ), - ), - ], - ); - - #[cfg(feature = "interlaced-fields")] - let caps = { - let mut tmp = caps.copy(); - { - let tmp = tmp.get_mut().unwrap(); - tmp.set_features_simple(Some(gst::CapsFeatures::new(&["format:Interlaced"]))); - } - - let mut caps = caps; - { - let caps = caps.get_mut().unwrap(); - caps.append(tmp); - } - - caps - }; - - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ) - .unwrap(); - klass.add_pad_template(src_pad_template); - - klass.install_properties(&PROPERTIES); - } } impl ObjectImpl for NdiVideoSrc { - glib::glib_object_impl!(); + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpec::new_string( + "ndi-name", + "NDI Name", + "NDI stream name of the sender", + None, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_string( + "url-address", + "URL/Address", + "URL/address and port of the sender, e.g. 127.0.0.1:5961", + None, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_string( + "receiver-ndi-name", + "Receiver NDI Name", + "NDI stream name of this receiver", + Some(&*DEFAULT_RECEIVER_NDI_NAME), + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_uint( + "connect-timeout", + "Connect Timeout", + "Connection timeout in ms", + 0, + u32::MAX, + 10000, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_uint( + "timeout", + "Timeout", + "Receive timeout in ms", + 0, + u32::MAX, + 5000, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_uint( + "max-queue-length", + "Max Queue Length", + "Maximum receive queue length", + 0, + u32::MAX, + 5, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_int( + "bandwidth", + "Bandwidth", + "Bandwidth, -10 metadata-only, 10 audio-only, 100 highest", + -10, + 100, + 100, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpec::new_enum( + "timestamp-mode", + "Timestamp Mode", + "Timestamp information to use for outgoing PTS", + TimestampMode::static_type(), + TimestampMode::ReceiveTimeTimecode as i32, + glib::ParamFlags::READWRITE, + ), + ] + }); - fn constructed(&self, obj: &glib::Object) { - self.parent_constructed(obj); - - let basesrc = obj.downcast_ref::().unwrap(); - // Initialize live-ness and notify the base class that - // we'd like to operate in Time format - basesrc.set_live(true); - basesrc.set_format(gst::Format::Time); + PROPERTIES.as_ref() } - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { - let prop = &PROPERTIES[id]; - let basesrc = obj.downcast_ref::().unwrap(); + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); - match *prop { - subclass::Property("ndi-name", ..) => { + // Initialize live-ness and notify the base class that + // we'd like to operate in Time format + obj.set_live(true); + obj.set_format(gst::Format::Time); + } + + fn set_property( + &self, + obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + "ndi-name" => { let mut settings = self.settings.lock().unwrap(); let ndi_name = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing ndi-name from {:?} to {:?}", settings.ndi_name, ndi_name, ); settings.ndi_name = ndi_name; } - subclass::Property("url-address", ..) => { + "url-address" => { let mut settings = self.settings.lock().unwrap(); let url_address = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing url-address from {:?} to {:?}", settings.url_address, url_address, ); settings.url_address = url_address; } - subclass::Property("receiver-ndi-name", ..) => { + "receiver-ndi-name" => { let mut settings = self.settings.lock().unwrap(); - let receiver_ndi_name = value.get().unwrap(); + let receiver_ndi_name = value.get::>().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing receiver-ndi-name from {:?} to {:?}", settings.receiver_ndi_name, receiver_ndi_name, @@ -298,67 +220,66 @@ impl ObjectImpl for NdiVideoSrc { settings.receiver_ndi_name = receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone()); } - subclass::Property("connect-timeout", ..) => { + "connect-timeout" => { let mut settings = self.settings.lock().unwrap(); - let connect_timeout = value.get_some().unwrap(); + let connect_timeout = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing connect-timeout from {} to {}", settings.connect_timeout, connect_timeout, ); settings.connect_timeout = connect_timeout; } - subclass::Property("timeout", ..) => { + "timeout" => { let mut settings = self.settings.lock().unwrap(); - let timeout = value.get_some().unwrap(); + let timeout = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing timeout from {} to {}", settings.timeout, timeout, ); settings.timeout = timeout; } - subclass::Property("max-queue-length", ..) => { + "max-queue-length" => { let mut settings = self.settings.lock().unwrap(); - let max_queue_length = value.get_some().unwrap(); + let max_queue_length = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing max-queue-length from {} to {}", settings.max_queue_length, max_queue_length, ); settings.max_queue_length = max_queue_length; } - subclass::Property("bandwidth", ..) => { + "bandwidth" => { let mut settings = self.settings.lock().unwrap(); - let bandwidth = value.get_some().unwrap(); + let bandwidth = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing bandwidth from {} to {}", settings.bandwidth, bandwidth, ); settings.bandwidth = bandwidth; } - subclass::Property("timestamp-mode", ..) => { + "timestamp-mode" => { let mut settings = self.settings.lock().unwrap(); - let timestamp_mode = value.get_some().unwrap(); + let timestamp_mode = value.get().unwrap(); gst_debug!( self.cat, - obj: basesrc, + obj: obj, "Changing timestamp mode from {:?} to {:?}", settings.timestamp_mode, timestamp_mode ); if settings.timestamp_mode != timestamp_mode { - let _ = - basesrc.post_message(gst::message::Latency::builder().src(basesrc).build()); + let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); } settings.timestamp_mode = timestamp_mode; } @@ -366,41 +287,39 @@ impl ObjectImpl for NdiVideoSrc { } } - fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { - let prop = &PROPERTIES[id]; - - match *prop { - subclass::Property("ndi-name", ..) => { + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "ndi-name" => { let settings = self.settings.lock().unwrap(); - Ok(settings.ndi_name.to_value()) + settings.ndi_name.to_value() } - subclass::Property("url-address", ..) => { + "url-address" => { let settings = self.settings.lock().unwrap(); - Ok(settings.url_address.to_value()) + settings.url_address.to_value() } - subclass::Property("receiver-ndi-name", ..) => { + "receiver-ndi-name" => { let settings = self.settings.lock().unwrap(); - Ok(settings.receiver_ndi_name.to_value()) + settings.receiver_ndi_name.to_value() } - subclass::Property("connect-timeout", ..) => { + "connect-timeout" => { let settings = self.settings.lock().unwrap(); - Ok(settings.connect_timeout.to_value()) + settings.connect_timeout.to_value() } - subclass::Property("timeout", ..) => { + "timeout" => { let settings = self.settings.lock().unwrap(); - Ok(settings.timeout.to_value()) + settings.timeout.to_value() } - subclass::Property("max-queue-length", ..) => { + "max-queue-length" => { let settings = self.settings.lock().unwrap(); - Ok(settings.max_queue_length.to_value()) + settings.max_queue_length.to_value() } - subclass::Property("bandwidth", ..) => { + "bandwidth" => { let settings = self.settings.lock().unwrap(); - Ok(settings.bandwidth.to_value()) + settings.bandwidth.to_value() } - subclass::Property("timestamp-mode", ..) => { + "timestamp-mode" => { let settings = self.settings.lock().unwrap(); - Ok(settings.timestamp_mode.to_value()) + settings.timestamp_mode.to_value() } _ => unimplemented!(), } @@ -408,9 +327,85 @@ impl ObjectImpl for NdiVideoSrc { } impl ElementImpl for NdiVideoSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "NewTek NDI Video Source", + "Source", + "NewTek NDI video source", + "Ruben Gonzalez , Daniel Vilar , Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + // On the src pad, we can produce F32/F64 with any sample rate + // and any number of channels + let caps = gst::Caps::new_simple( + "video/x-raw", + &[ + ( + "format", + &gst::List::new(&[ + &gst_video::VideoFormat::Uyvy.to_string(), + &gst_video::VideoFormat::Yv12.to_string(), + &gst_video::VideoFormat::Nv12.to_string(), + &gst_video::VideoFormat::I420.to_string(), + &gst_video::VideoFormat::Bgra.to_string(), + &gst_video::VideoFormat::Bgrx.to_string(), + &gst_video::VideoFormat::Rgba.to_string(), + &gst_video::VideoFormat::Rgbx.to_string(), + ]), + ), + ("width", &gst::IntRange::::new(0, i32::MAX)), + ("height", &gst::IntRange::::new(0, i32::MAX)), + ( + "framerate", + &gst::FractionRange::new( + gst::Fraction::new(0, 1), + gst::Fraction::new(i32::MAX, 1), + ), + ), + ], + ); + + #[cfg(feature = "interlaced-fields")] + let caps = { + let mut tmp = caps.copy(); + { + let tmp = tmp.get_mut().unwrap(); + tmp.set_features_simple(Some(gst::CapsFeatures::new(&["format:Interlaced"]))); + } + + let mut caps = caps; + { + let caps = caps.get_mut().unwrap(); + caps.append(tmp); + } + + caps + }; + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + vec![src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + fn change_state( &self, - element: &gst::Element, + element: &Self::Type, transition: gst::StateChange, ) -> Result { match transition { @@ -437,13 +432,13 @@ impl ElementImpl for NdiVideoSrc { } impl BaseSrcImpl for NdiVideoSrc { - fn negotiate(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::LoggableError> { + fn negotiate(&self, _element: &Self::Type) -> Result<(), gst::LoggableError> { // Always succeed here without doing anything: we will set the caps once we received a // buffer, there's nothing we can negotiate Ok(()) } - fn unlock(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn unlock(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { gst_debug!(self.cat, obj: element, "Unlocking",); if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { controller.set_flushing(true); @@ -451,7 +446,7 @@ impl BaseSrcImpl for NdiVideoSrc { Ok(()) } - fn unlock_stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn unlock_stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { gst_debug!(self.cat, obj: element, "Stop unlocking",); if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { controller.set_flushing(false); @@ -459,12 +454,12 @@ impl BaseSrcImpl for NdiVideoSrc { Ok(()) } - fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> { *self.state.lock().unwrap() = Default::default(); let settings = self.settings.lock().unwrap().clone(); if settings.ndi_name.is_none() && settings.url_address.is_none() { - return Err(gst_error_msg!( + return Err(gst::error_msg!( gst::LibraryError::Settings, ["No NDI name or URL/address given"] )); @@ -472,7 +467,7 @@ impl BaseSrcImpl for NdiVideoSrc { let receiver = connect_ndi( self.cat, - element, + element.upcast_ref(), settings.ndi_name.as_deref(), settings.url_address.as_deref(), &settings.receiver_ndi_name, @@ -485,7 +480,7 @@ impl BaseSrcImpl for NdiVideoSrc { // settings.id_receiver exists match receiver { - None => Err(gst_error_msg!( + None => Err(gst::error_msg!( gst::ResourceError::NotFound, ["Could not connect to this source"] )), @@ -500,7 +495,7 @@ impl BaseSrcImpl for NdiVideoSrc { } } - fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + fn stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() { controller.shutdown(); } @@ -508,7 +503,7 @@ impl BaseSrcImpl for NdiVideoSrc { Ok(()) } - fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { + fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool { use gst::QueryView; match query.view_mut() { @@ -521,14 +516,14 @@ impl BaseSrcImpl for NdiVideoSrc { let state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap(); - if state.current_latency.is_some() { + if let Some(latency) = state.current_latency { let min = if settings.timestamp_mode != TimestampMode::Timecode { - state.current_latency + latency } else { - 0.into() + gst::ClockTime::ZERO }; - let max = 5 * state.current_latency; + let max = 5 * latency; println!("Returning latency min {} max {}", min, max,); @@ -549,11 +544,11 @@ impl BaseSrcImpl for NdiVideoSrc { } } - fn fixate(&self, element: &gst_base::BaseSrc, mut caps: gst::Caps) -> gst::Caps { + fn fixate(&self, element: &Self::Type, mut caps: gst::Caps) -> gst::Caps { caps.truncate(); { let caps = caps.make_mut(); - let s = caps.get_mut_structure(0).unwrap(); + let s = caps.structure_mut(0).unwrap(); s.fixate_field_nearest_int("width", 1920); s.fixate_field_nearest_int("height", 1080); if s.has_field("pixel-aspect-ratio") { @@ -567,7 +562,7 @@ impl BaseSrcImpl for NdiVideoSrc { //Creates the video buffers fn create( &self, - element: &gst_base::BaseSrc, + element: &Self::Type, _offset: u64, _buffer: Option<&mut gst::BufferRef>, _length: u32, @@ -589,7 +584,7 @@ impl BaseSrcImpl for NdiVideoSrc { state.receiver = Some(recv); if state.info.as_ref() != Some(&info) { let caps = info.to_caps().map_err(|_| { - gst_element_error!( + gst::element_error!( element, gst::ResourceError::Settings, ["Invalid audio info received: {:?}", info] @@ -597,11 +592,11 @@ impl BaseSrcImpl for NdiVideoSrc { gst::FlowError::NotNegotiated })?; state.info = Some(info); - state.current_latency = buffer.get_duration(); + state.current_latency = buffer.duration(); drop(state); gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); element.set_caps(&caps).map_err(|_| { - gst_element_error!( + gst::element_error!( element, gst::CoreError::Negotiation, ["Failed to negotiate caps: {:?}", caps] @@ -621,12 +616,3 @@ impl BaseSrcImpl for NdiVideoSrc { } } } - -pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register( - Some(plugin), - "ndivideosrc", - gst::Rank::None, - NdiVideoSrc::get_type(), - ) -} diff --git a/src/ndivideosrc/mod.rs b/src/ndivideosrc/mod.rs new file mode 100644 index 00000000..ba9a7552 --- /dev/null +++ b/src/ndivideosrc/mod.rs @@ -0,0 +1,19 @@ +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct NdiVideoSrc(ObjectSubclass) @extends gst_base::BaseSrc, gst::Element, gst::Object; +} + +unsafe impl Send for NdiVideoSrc {} +unsafe impl Sync for NdiVideoSrc {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ndivideosrc", + gst::Rank::None, + NdiVideoSrc::static_type(), + ) +} diff --git a/src/receiver.rs b/src/receiver.rs index dccb1907..8f7d12a0 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,6 +1,6 @@ use glib::prelude::*; use gst::prelude::*; -use gst::{gst_debug, gst_element_error, gst_error, gst_log, gst_warning}; +use gst::{gst_debug, gst_error, gst_log, gst_warning}; use gst_video::prelude::*; use byte_slice_cast::AsMutSliceOf; @@ -167,15 +167,14 @@ impl Observations { &self, cat: gst::DebugCategory, element: &gst_base::BaseSrc, - time: (gst::ClockTime, gst::ClockTime), - duration: gst::ClockTime, - ) -> (gst::ClockTime, gst::ClockTime) { - assert!(time.1.is_some()); + time: (Option, gst::ClockTime), + duration: Option, + ) -> (gst::ClockTime, Option) { if time.0.is_none() { return (time.1, duration); } - let time = (time.0.unwrap(), time.1.unwrap()); + let time = (time.0.unwrap(), time.1); let mut inner = self.0.lock().unwrap(); let ObservationsInner { @@ -190,8 +189,8 @@ impl Observations { } = *inner; if values.is_empty() { - current_mapping.xbase = time.0; - current_mapping.b = time.1; + current_mapping.xbase = time.0.nseconds(); + current_mapping.b = time.1.nseconds(); current_mapping.num = 1; current_mapping.den = 1; } @@ -207,7 +206,9 @@ impl Observations { // Start by first updating every frame, then every second frame, then every third // frame, etc. until we update once every quarter second - let framerate = (gst::SECOND / duration).unwrap_or(25) as usize; + let framerate = gst::ClockTime::SECOND + .checked_div(duration.unwrap_or(40 * gst::ClockTime::MSECOND).nseconds()) + .unwrap_or(25) as usize; if *skip_period < framerate / 4 + 1 { *skip_period += 1; @@ -221,7 +222,7 @@ impl Observations { if values.len() == WINDOW_LENGTH { values.remove(0); } - values.push(time); + values.push((time.0.nseconds(), time.1.nseconds())); if let Some((num, den, b, xbase, r_squared)) = gst::calculate_linear_regression(values, Some(values_tmp)) @@ -236,8 +237,8 @@ impl Observations { obj: element, "Calculated new time mapping: GStreamer time = {} * (NDI time - {}) + {} ({})", next_mapping.num as f64 / next_mapping.den as f64, - gst::ClockTime::from(next_mapping.xbase), - gst::ClockTime::from(next_mapping.b), + gst::ClockTime::from_nseconds(next_mapping.xbase), + gst::ClockTime::from_nseconds(next_mapping.b), r_squared, ); } @@ -250,81 +251,72 @@ impl Observations { if *time_mapping_pending { let expected = gst::Clock::adjust_with_calibration( - time.0.into(), - current_mapping.xbase.into(), - current_mapping.b.into(), - current_mapping.num.into(), - current_mapping.den.into(), + time.0, + gst::ClockTime::from_nseconds(current_mapping.xbase), + gst::ClockTime::from_nseconds(current_mapping.b), + gst::ClockTime::from_nseconds(current_mapping.num), + gst::ClockTime::from_nseconds(current_mapping.den), ); let new_calculated = gst::Clock::adjust_with_calibration( - time.0.into(), - next_mapping.xbase.into(), - next_mapping.b.into(), - next_mapping.num.into(), - next_mapping.den.into(), + time.0, + gst::ClockTime::from_nseconds(next_mapping.xbase), + gst::ClockTime::from_nseconds(next_mapping.b), + gst::ClockTime::from_nseconds(next_mapping.num), + gst::ClockTime::from_nseconds(next_mapping.den), ); - if let (Some(expected), Some(new_calculated)) = (*expected, *new_calculated) { - let diff = if new_calculated > expected { - new_calculated - expected - } else { - expected - new_calculated - }; - - // Allow at most 5% frame duration or 2ms difference per frame - let max_diff = cmp::max( - (duration / 10).unwrap_or(2 * gst::MSECOND_VAL), - 2 * gst::MSECOND_VAL, - ); - - if diff > max_diff { - gst_debug!( - cat, - obj: element, - "New time mapping causes difference {} but only {} allowed", - gst::ClockTime::from(diff), - gst::ClockTime::from(max_diff), - ); - - if new_calculated > expected { - current_mapping.b = expected + max_diff; - current_mapping.xbase = time.0; - } else { - current_mapping.b = expected - max_diff; - current_mapping.xbase = time.0; - } - } else { - *current_mapping = *next_mapping; - } + let diff = if new_calculated > expected { + new_calculated - expected } else { - gst_warning!( + expected - new_calculated + }; + + // Allow at most 5% frame duration or 2ms difference per frame + let max_diff = cmp::max( + (duration.map(|d| d / 10)).unwrap_or(2 * gst::ClockTime::MSECOND), + 2 * gst::ClockTime::MSECOND, + ); + + if diff > max_diff { + gst_debug!( cat, obj: element, - "Failed to calculate timestamps based on new mapping", + "New time mapping causes difference {} but only {} allowed", + diff, + max_diff, ); + + if new_calculated > expected { + current_mapping.b = (expected + max_diff).nseconds(); + current_mapping.xbase = time.0.nseconds(); + } else { + current_mapping.b = (expected - max_diff).nseconds(); + current_mapping.xbase = time.0.nseconds(); + } + } else { + *current_mapping = *next_mapping; } } let converted_timestamp = gst::Clock::adjust_with_calibration( - time.0.into(), - current_mapping.xbase.into(), - current_mapping.b.into(), - current_mapping.num.into(), - current_mapping.den.into(), + time.0, + gst::ClockTime::from_nseconds(current_mapping.xbase), + gst::ClockTime::from_nseconds(current_mapping.b), + gst::ClockTime::from_nseconds(current_mapping.num), + gst::ClockTime::from_nseconds(current_mapping.den), ); - let converted_duration = duration - .mul_div_floor(current_mapping.num, current_mapping.den) - .unwrap_or(gst::CLOCK_TIME_NONE); + let converted_duration = + duration.and_then(|d| d.mul_div_floor(current_mapping.num, current_mapping.den)); gst_debug!( cat, obj: element, "Converted timestamp {}/{} to {}, duration {} to {}", - gst::ClockTime::from(time.0), - gst::ClockTime::from(time.1), - converted_timestamp, - duration, - converted_duration, + time.0, + time.1, + converted_timestamp.display(), + duration.display(), + converted_duration.display(), ); (converted_timestamp, converted_duration) @@ -422,7 +414,7 @@ impl Receiver { Err(_) => { if let Some(receiver) = weak.upgrade().map(Receiver) { if let Some(element) = receiver.0.element.upgrade() { - gst_element_error!( + gst::element_error!( element, gst::LibraryError::Failed, ["Panic while connecting to NDI source"] @@ -558,7 +550,7 @@ where if (receiver.video.is_some() || !T::IS_VIDEO) && (receiver.audio.is_some() || T::IS_VIDEO) { - gst_element_error!( + gst::element_error!( element, gst::ResourceError::OpenRead, [ @@ -595,14 +587,14 @@ where // FIXME: Ideally we would use NDIlib_recv_color_format_fastest here but that seems to be // broken with interlaced content currently - let recv = RecvInstance::builder(ndi_name, url_address, &receiver_ndi_name) + let recv = RecvInstance::builder(ndi_name, url_address, receiver_ndi_name) .bandwidth(bandwidth) .color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA) .allow_video_fields(true) .build(); let recv = match recv { None => { - gst_element_error!( + gst::element_error!( element, gst::CoreError::Negotiation, ["Failed to connect to source"] @@ -785,44 +777,42 @@ impl Receiver { element: &gst_base::BaseSrc, timestamp: i64, timecode: i64, - duration: gst::ClockTime, - ) -> Option<(gst::ClockTime, gst::ClockTime)> { - let clock = match element.get_clock() { - None => return None, - Some(clock) => clock, - }; + duration: Option, + ) -> Option<(gst::ClockTime, Option)> { + let clock = element.clock()?; // For now take the current running time as PTS. At a later time we // will want to work with the timestamp given by the NDI SDK if available - let now = clock.get_time(); - let base_time = element.get_base_time(); + let now = clock.time()?; + let base_time = element.base_time()?; let receive_time = now - base_time; - let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000); + let real_time_now = gst::ClockTime::from_nseconds(glib::real_time() as u64 * 1000); let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined { - gst::CLOCK_TIME_NONE + gst::ClockTime::NONE } else { - gst::ClockTime::from(timestamp as u64 * 100) + Some(gst::ClockTime::from_nseconds(timestamp as u64 * 100)) }; - let timecode = gst::ClockTime::from(timecode as u64 * 100); + let timecode = gst::ClockTime::from_nseconds(timecode as u64 * 100); gst_log!( self.0.cat, obj: element, "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", timecode, - timestamp, - duration, - receive_time, + timestamp.display(), + duration.display(), + receive_time.display(), real_time_now, ); let (pts, duration) = match self.0.timestamp_mode { - TimestampMode::ReceiveTimeTimecode => { - self.0 - .observations - .process(self.0.cat, element, (timecode, receive_time), duration) - } + TimestampMode::ReceiveTimeTimecode => self.0.observations.process( + self.0.cat, + element, + (Some(timecode), receive_time), + duration, + ), TimestampMode::ReceiveTimeTimestamp => self.0.observations.process( self.0.cat, element, @@ -833,10 +823,11 @@ impl Receiver { TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration), TimestampMode::Timestamp => { // Timestamps are relative to the UNIX epoch + let timestamp = timestamp?; if real_time_now > timestamp { let diff = real_time_now - timestamp; if diff > receive_time { - (0.into(), duration) + (gst::ClockTime::ZERO, duration) } else { (receive_time - diff, duration) } @@ -851,8 +842,8 @@ impl Receiver { self.0.cat, obj: element, "Calculated PTS {}, duration {}", - pts, - duration, + pts.display(), + duration.display(), ); Some((pts, duration)) @@ -909,7 +900,7 @@ impl Receiver { let video_frame = match res { Err(_) => { - gst_element_error!( + gst::element_error!( element, gst::ResourceError::Read, ["Error receiving frame"] @@ -970,13 +961,11 @@ impl Receiver { &self, element: &gst_base::BaseSrc, video_frame: &VideoFrame, - ) -> Option<(gst::ClockTime, gst::ClockTime)> { - let duration = gst::SECOND - .mul_div_floor( - video_frame.frame_rate().1 as u64, - video_frame.frame_rate().0 as u64, - ) - .unwrap_or(gst::CLOCK_TIME_NONE); + ) -> Option<(gst::ClockTime, Option)> { + let duration = gst::ClockTime::SECOND.mul_div_floor( + video_frame.frame_rate().1 as u64, + video_frame.frame_rate().0 as u64, + ); self.calculate_timestamp( element, @@ -1004,7 +993,7 @@ impl Receiver { ndisys::NDIlib_FourCC_video_type_RGBA => gst_video::VideoFormat::Rgba, ndisys::NDIlib_FourCC_video_type_RGBX => gst_video::VideoFormat::Rgbx, _ => { - gst_element_error!( + gst::element_error!( element, gst::StreamError::Format, ["Unsupported video fourcc {:08x}", video_frame.fourcc()] @@ -1045,7 +1034,7 @@ impl Receiver { } builder.build().map_err(|_| { - gst_element_error!( + gst::element_error!( element, gst::StreamError::Format, ["Invalid video format configuration"] @@ -1062,7 +1051,7 @@ impl Receiver { && video_frame.frame_format_type() != ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved { - gst_element_error!( + gst::element_error!( element, gst::StreamError::Format, ["Separate field interlacing not supported"] @@ -1094,7 +1083,7 @@ impl Receiver { } builder.build().map_err(|_| { - gst_element_error!( + gst::element_error!( element, gst::StreamError::Format, ["Invalid video format configuration"] @@ -1109,7 +1098,7 @@ impl Receiver { &self, element: &gst_base::BaseSrc, pts: gst::ClockTime, - duration: gst::ClockTime, + duration: Option, info: &gst_video::VideoInfo, video_frame: &VideoFrame, ) -> gst::Buffer { @@ -1124,15 +1113,15 @@ impl Receiver { gst::ReferenceTimestampMeta::add( buffer, &*TIMECODE_CAPS, - gst::ClockTime::from(video_frame.timecode() as u64 * 100), - gst::CLOCK_TIME_NONE, + gst::ClockTime::from_nseconds(video_frame.timecode() as u64 * 100), + gst::ClockTime::NONE, ); if video_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined { gst::ReferenceTimestampMeta::add( buffer, &*TIMESTAMP_CAPS, - gst::ClockTime::from(video_frame.timestamp() as u64 * 100), - gst::CLOCK_TIME_NONE, + gst::ClockTime::from_nseconds(video_frame.timestamp() as u64 * 100), + gst::ClockTime::NONE, ); } } @@ -1337,7 +1326,7 @@ impl Receiver { let audio_frame = match res { Err(_) => { - gst_element_error!( + gst::element_error!( element, gst::ResourceError::Read, ["Error receiving frame"] @@ -1398,13 +1387,11 @@ impl Receiver { &self, element: &gst_base::BaseSrc, audio_frame: &AudioFrame, - ) -> Option<(gst::ClockTime, gst::ClockTime)> { - let duration = gst::SECOND - .mul_div_floor( - audio_frame.no_samples() as u64, - audio_frame.sample_rate() as u64, - ) - .unwrap_or(gst::CLOCK_TIME_NONE); + ) -> Option<(gst::ClockTime, Option)> { + let duration = gst::ClockTime::SECOND.mul_div_floor( + audio_frame.no_samples() as u64, + audio_frame.sample_rate() as u64, + ); self.calculate_timestamp( element, @@ -1426,7 +1413,7 @@ impl Receiver { ); builder.build().map_err(|_| { - gst_element_error!( + gst::element_error!( element, gst::StreamError::Format, ["Invalid audio format configuration"] @@ -1440,7 +1427,7 @@ impl Receiver { &self, _element: &gst_base::BaseSrc, pts: gst::ClockTime, - duration: gst::ClockTime, + duration: Option, info: &gst_audio::AudioInfo, audio_frame: &AudioFrame, ) -> gst::Buffer { @@ -1458,15 +1445,15 @@ impl Receiver { gst::ReferenceTimestampMeta::add( buffer, &*TIMECODE_CAPS, - gst::ClockTime::from(audio_frame.timecode() as u64 * 100), - gst::CLOCK_TIME_NONE, + gst::ClockTime::from_nseconds(audio_frame.timecode() as u64 * 100), + gst::ClockTime::NONE, ); if audio_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined { gst::ReferenceTimestampMeta::add( buffer, &*TIMESTAMP_CAPS, - gst::ClockTime::from(audio_frame.timestamp() as u64 * 100), - gst::CLOCK_TIME_NONE, + gst::ClockTime::from_nseconds(audio_frame.timestamp() as u64 * 100), + gst::ClockTime::NONE, ); } }