diff --git a/Cargo.toml b/Cargo.toml index c97cccf9..d81e64e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ description = "NewTek NDI Plugin" [dependencies] glib = { version = "0.8.0", features = ["subclassing"] } +gobject-sys = "0.9" gstreamer = { version = "0.14.3", features = ["subclassing"] } gstreamer-base = { version = "0.14.0", features = ["subclassing"] } gstreamer-audio = "0.14.0" diff --git a/src/lib.rs b/src/lib.rs index c7ab76e2..4bef1639 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,14 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] +#[repr(u32)] +pub enum TimestampMode { + ReceiveTime = 0, + Timecode = 1, + Timestamp = 2, +} + fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { if !ndi::initialize() { return Err(glib_bool_error!("Cannot initialize NDI")); @@ -199,6 +207,98 @@ fn stop_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, id: usize) -> true } +impl glib::translate::ToGlib for TimestampMode { + type GlibType = i32; + + fn to_glib(&self) -> i32 { + *self as i32 + } +} + +impl glib::translate::FromGlib for TimestampMode { + fn from_glib(value: i32) -> Self { + match value { + 0 => TimestampMode::ReceiveTime, + 1 => TimestampMode::Timecode, + 2 => TimestampMode::Timestamp, + _ => unreachable!(), + } + } +} + +impl StaticType for TimestampMode { + fn static_type() -> glib::Type { + timestamp_mode_get_type() + } +} + +impl<'a> glib::value::FromValueOptional<'a> for TimestampMode { + unsafe fn from_value_optional(value: &glib::Value) -> Option { + Some(glib::value::FromValue::from_value(value)) + } +} + +impl<'a> glib::value::FromValue<'a> for TimestampMode { + unsafe fn from_value(value: &glib::Value) -> Self { + use glib::translate::ToGlibPtr; + + glib::translate::from_glib(gobject_sys::g_value_get_enum(value.to_glib_none().0)) + } +} + +impl glib::value::SetValue for TimestampMode { + unsafe fn set_value(value: &mut glib::Value, this: &Self) { + use glib::translate::{ToGlib, ToGlibPtrMut}; + + gobject_sys::g_value_set_enum(value.to_glib_none_mut().0, this.to_glib()) + } +} + +fn timestamp_mode_get_type() -> glib::Type { + use std::sync::Once; + static ONCE: Once = Once::new(); + static mut TYPE: glib::Type = glib::Type::Invalid; + + ONCE.call_once(|| { + use std::ffi; + use std::ptr; + + static mut VALUES: [gobject_sys::GEnumValue; 4] = [ + gobject_sys::GEnumValue { + value: TimestampMode::ReceiveTime as i32, + value_name: b"Receive Time\0" as *const _ as *const _, + value_nick: b"receive-time\0" as *const _ as *const _, + }, + gobject_sys::GEnumValue { + value: TimestampMode::Timecode as i32, + value_name: b"NDI Timecode\0" as *const _ as *const _, + value_nick: b"timecode\0" as *const _ as *const _, + }, + gobject_sys::GEnumValue { + value: TimestampMode::Timestamp as i32, + value_name: b"NDI Timestamp\0" as *const _ as *const _, + value_nick: b"timestamp\0" as *const _ as *const _, + }, + gobject_sys::GEnumValue { + value: 0, + value_name: ptr::null(), + value_nick: ptr::null(), + }, + ]; + + let name = ffi::CString::new("GstNdiTimestampMode").unwrap(); + unsafe { + let type_ = gobject_sys::g_enum_register_static(name.as_ptr(), VALUES.as_ptr()); + TYPE = glib::translate::from_glib(type_); + } + }); + + unsafe { + assert_ne!(TYPE, glib::Type::Invalid); + TYPE + } +} + gst_plugin_define!( ndi, env!("CARGO_PKG_DESCRIPTION"), diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc.rs index cb879911..897f3342 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc.rs @@ -17,6 +17,7 @@ use ndi::*; use ndisys; use stop_ndi; +use TimestampMode; use HASHMAP_RECEIVERS; #[cfg(feature = "reference-timestamps")] use TIMECODE_CAPS; @@ -30,6 +31,7 @@ struct Settings { stream_name: String, ip: String, loss_threshold: u32, + timestamp_mode: TimestampMode, } impl Default for Settings { @@ -38,32 +40,33 @@ impl Default for Settings { stream_name: String::from("Fixed ndi stream name"), ip: String::from(""), loss_threshold: 5, + timestamp_mode: TimestampMode::ReceiveTime, } } } -static PROPERTIES: [subclass::Property; 3] = [ - subclass::Property("stream-name", |_| { +static PROPERTIES: [subclass::Property; 4] = [ + subclass::Property("stream-name", |name| { glib::ParamSpec::string( - "stream-name", - "Sream Name", + name, + "Stream Name", "Name of the streaming device", None, glib::ParamFlags::READWRITE, ) }), - subclass::Property("ip", |_| { + subclass::Property("ip", |name| { glib::ParamSpec::string( - "ip", + name, "Stream IP", "IP of the streaming device. Ex: 127.0.0.1:5961", None, glib::ParamFlags::READWRITE, ) }), - subclass::Property("loss-threshold", |_| { + subclass::Property("loss-threshold", |name| { glib::ParamSpec::uint( - "loss-threshold", + name, "Loss threshold", "Loss threshold", 0, @@ -72,11 +75,22 @@ static PROPERTIES: [subclass::Property; 3] = [ glib::ParamFlags::READWRITE, ) }), + subclass::Property("timestamp-mode", |name| { + glib::ParamSpec::enum_( + name, + "Timestamp Mode", + "Timestamp information to use for outgoing PTS", + TimestampMode::static_type(), + TimestampMode::ReceiveTime as i32, + glib::ParamFlags::READWRITE, + ) + }), ]; struct State { info: Option, id_receiver: Option, + current_latency: gst::ClockTime, } impl Default for State { @@ -84,6 +98,7 @@ impl Default for State { State { info: None, id_receiver: None, + current_latency: gst::CLOCK_TIME_NONE, } } } @@ -177,7 +192,6 @@ impl ObjectImpl for NdiAudioSrc { stream_name ); settings.stream_name = stream_name; - drop(settings); } subclass::Property("ip", ..) => { let mut settings = self.settings.lock().unwrap(); @@ -190,7 +204,6 @@ impl ObjectImpl for NdiAudioSrc { ip ); settings.ip = ip; - drop(settings); } subclass::Property("loss-threshold", ..) => { let mut settings = self.settings.lock().unwrap(); @@ -203,7 +216,22 @@ impl ObjectImpl for NdiAudioSrc { loss_threshold ); settings.loss_threshold = loss_threshold; - drop(settings); + } + subclass::Property("timestamp-mode", ..) => { + let mut settings = self.settings.lock().unwrap(); + let timestamp_mode = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing timestamp mode from {:?} to {:?}", + settings.timestamp_mode, + timestamp_mode + ); + if settings.timestamp_mode != timestamp_mode { + let _ = basesrc + .post_message(&gst::Message::new_latency().src(Some(basesrc)).build()); + } + settings.timestamp_mode = timestamp_mode; } _ => unimplemented!(), } @@ -225,6 +253,10 @@ impl ObjectImpl for NdiAudioSrc { let settings = self.settings.lock().unwrap(); Ok(settings.loss_threshold.to_value()) } + subclass::Property("timestamp-mode", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.timestamp_mode.to_value()) + } _ => unimplemented!(), } } @@ -274,6 +306,24 @@ impl BaseSrcImpl for NdiAudioSrc { q.add_scheduling_modes(&[gst::PadMode::Push]); true } + QueryView::Latency(ref mut q) => { + let state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if state.current_latency.is_some() { + let latency = if settings.timestamp_mode == TimestampMode::Timestamp { + state.current_latency + } else { + 0.into() + }; + + gst_debug!(self.cat, obj: element, "Returning latency {}", latency); + q.set(true, latency, gst::CLOCK_TIME_NONE); + true + } else { + false + } + } _ => BaseSrcImplExt::parent_query(self, element, query), } } @@ -346,23 +396,52 @@ impl BaseSrcImpl for NdiAudioSrc { // will want to work with the timestamp given by the NDI SDK if available let now = clock.get_time(); let base_time = element.get_base_time(); - let pts = now - base_time; + let receive_time = now - base_time; + + let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000); + let timestamp = if audio_frame.timestamp() == ndisys::NDIlib_recv_timestamp_undefined { + gst::CLOCK_TIME_NONE + } else { + gst::ClockTime::from(audio_frame.timestamp() as u64 * 100) + }; + let timecode = gst::ClockTime::from(audio_frame.timecode() as u64 * 100); gst_log!( self.cat, obj: element, - "NDI audio frame received: {:?} with timecode {} and timestamp {}", + "NDI audio frame received: {:?} with timecode {} and timestamp {}, receive time {}, local time now {}", audio_frame, - if audio_frame.timecode() == ndisys::NDIlib_send_timecode_synthesize { - gst::CLOCK_TIME_NONE - } else { - gst::ClockTime::from(audio_frame.timecode() as u64 * 100) - }, - if audio_frame.timestamp() == ndisys::NDIlib_recv_timestamp_undefined { - gst::CLOCK_TIME_NONE - } else { - gst::ClockTime::from(audio_frame.timestamp() as u64 * 100) - }, + timecode, + timestamp, + receive_time, + real_time_now, + ); + + let pts = match settings.timestamp_mode { + TimestampMode::ReceiveTime => receive_time, + TimestampMode::Timecode => timecode, + TimestampMode::Timestamp if timestamp.is_none() => receive_time, + TimestampMode::Timestamp => { + // Timestamps are relative to the UNIX epoch + if real_time_now > timestamp { + let diff = real_time_now - timestamp; + if diff > receive_time { + 0.into() + } else { + receive_time - diff + } + } else { + let diff = timestamp - real_time_now; + receive_time + diff + } + } + }; + + gst_log!( + self.cat, + obj: element, + "Calculated pts for audio frame: {:?}", + pts ); let info = gst_audio::AudioInfo::new( @@ -376,18 +455,20 @@ impl BaseSrcImpl for NdiAudioSrc { if state.info.as_ref() != Some(&info) { let caps = info.to_caps().unwrap(); state.info = Some(info); + state.current_latency = gst::SECOND + .mul_div_ceil( + audio_frame.no_samples() as u64, + audio_frame.sample_rate() as u64, + ) + .unwrap_or(gst::CLOCK_TIME_NONE); + gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); element .set_caps(&caps) .map_err(|_| gst::FlowError::NotNegotiated)?; - } - gst_log!( - self.cat, - obj: element, - "Calculated pts for audio frame: {:?}", - pts - ); + let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + } // We multiply by 2 because is the size in bytes of an i16 variable let buff_size = (audio_frame.no_samples() * 2 * audio_frame.no_channels()) as usize; diff --git a/src/ndivideosrc.rs b/src/ndivideosrc.rs index daf2192d..33e991c1 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc.rs @@ -20,6 +20,7 @@ use ndisys; use connect_ndi; use stop_ndi; +use TimestampMode; use HASHMAP_RECEIVERS; #[cfg(feature = "reference-timestamps")] use TIMECODE_CAPS; @@ -31,6 +32,7 @@ struct Settings { stream_name: String, ip: String, loss_threshold: u32, + timestamp_mode: TimestampMode, } impl Default for Settings { @@ -39,32 +41,33 @@ impl Default for Settings { stream_name: String::from("Fixed ndi stream name"), ip: String::from(""), loss_threshold: 5, + timestamp_mode: TimestampMode::ReceiveTime, } } } -static PROPERTIES: [subclass::Property; 3] = [ - subclass::Property("stream-name", |_| { +static PROPERTIES: [subclass::Property; 4] = [ + subclass::Property("stream-name", |name| { glib::ParamSpec::string( - "stream-name", + name, "Stream Name", "Name of the streaming device", None, glib::ParamFlags::READWRITE, ) }), - subclass::Property("ip", |_| { + subclass::Property("ip", |name| { glib::ParamSpec::string( - "ip", + name, "Stream IP", "IP of the streaming device. Ex: 127.0.0.1:5961", None, glib::ParamFlags::READWRITE, ) }), - subclass::Property("loss-threshold", |_| { + subclass::Property("loss-threshold", |name| { glib::ParamSpec::uint( - "loss-threshold", + name, "Loss threshold", "Loss threshold", 0, @@ -73,11 +76,22 @@ static PROPERTIES: [subclass::Property; 3] = [ glib::ParamFlags::READWRITE, ) }), + subclass::Property("timestamp-mode", |name| { + glib::ParamSpec::enum_( + name, + "Timestamp Mode", + "Timestamp information to use for outgoing PTS", + TimestampMode::static_type(), + TimestampMode::ReceiveTime as i32, + glib::ParamFlags::READWRITE, + ) + }), ]; struct State { info: Option, id_receiver: Option, + current_latency: gst::ClockTime, } impl Default for State { @@ -85,6 +99,7 @@ impl Default for State { State { info: None, id_receiver: None, + current_latency: gst::CLOCK_TIME_NONE, } } } @@ -212,7 +227,6 @@ impl ObjectImpl for NdiVideoSrc { stream_name ); settings.stream_name = stream_name; - drop(settings); } subclass::Property("ip", ..) => { let mut settings = self.settings.lock().unwrap(); @@ -225,7 +239,6 @@ impl ObjectImpl for NdiVideoSrc { ip ); settings.ip = ip; - drop(settings); } subclass::Property("loss-threshold", ..) => { let mut settings = self.settings.lock().unwrap(); @@ -238,7 +251,22 @@ impl ObjectImpl for NdiVideoSrc { loss_threshold ); settings.loss_threshold = loss_threshold; - drop(settings); + } + subclass::Property("timestamp-mode", ..) => { + let mut settings = self.settings.lock().unwrap(); + let timestamp_mode = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing timestamp mode from {:?} to {:?}", + settings.timestamp_mode, + timestamp_mode + ); + if settings.timestamp_mode != timestamp_mode { + let _ = basesrc + .post_message(&gst::Message::new_latency().src(Some(basesrc)).build()); + } + settings.timestamp_mode = timestamp_mode; } _ => unimplemented!(), } @@ -260,6 +288,10 @@ impl ObjectImpl for NdiVideoSrc { let settings = self.settings.lock().unwrap(); Ok(settings.loss_threshold.to_value()) } + subclass::Property("timestamp-mode", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.timestamp_mode.to_value()) + } _ => unimplemented!(), } } @@ -304,6 +336,24 @@ impl BaseSrcImpl for NdiVideoSrc { q.add_scheduling_modes(&[gst::PadMode::Push]); true } + QueryView::Latency(ref mut q) => { + let state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if state.current_latency.is_some() { + let latency = if settings.timestamp_mode == TimestampMode::Timestamp { + state.current_latency + } else { + 0.into() + }; + + gst_debug!(self.cat, obj: element, "Returning latency {}", latency); + q.set(true, latency, gst::CLOCK_TIME_NONE); + true + } else { + false + } + } _ => BaseSrcImplExt::parent_query(self, element, query), } } @@ -380,23 +430,52 @@ impl BaseSrcImpl for NdiVideoSrc { // will want to work with the timestamp given by the NDI SDK if available let now = clock.get_time(); let base_time = element.get_base_time(); - let pts = now - base_time; + let receive_time = now - base_time; + + let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000); + let timestamp = if video_frame.timestamp() == ndisys::NDIlib_recv_timestamp_undefined { + gst::CLOCK_TIME_NONE + } else { + gst::ClockTime::from(video_frame.timestamp() as u64 * 100) + }; + let timecode = gst::ClockTime::from(video_frame.timecode() as u64 * 100); gst_log!( self.cat, obj: element, - "NDI video frame received: {:?} with timecode {} and timestamp {}", + "NDI video frame received: {:?} with timecode {} and timestamp {}, receive time {}, local time now {}", video_frame, - if video_frame.timecode() == ndisys::NDIlib_send_timecode_synthesize { - gst::CLOCK_TIME_NONE - } else { - gst::ClockTime::from(video_frame.timecode() as u64 * 100) - }, - if video_frame.timestamp() == ndisys::NDIlib_recv_timestamp_undefined { - gst::CLOCK_TIME_NONE - } else { - gst::ClockTime::from(video_frame.timestamp() as u64 * 100) - }, + timecode, + timestamp, + receive_time, + real_time_now, + ); + + let pts = match settings.timestamp_mode { + TimestampMode::ReceiveTime => receive_time, + TimestampMode::Timecode => timecode, + TimestampMode::Timestamp if timestamp.is_none() => receive_time, + TimestampMode::Timestamp => { + // Timestamps are relative to the UNIX epoch + if real_time_now > timestamp { + let diff = real_time_now - timestamp; + if diff > receive_time { + 0.into() + } else { + receive_time - diff + } + } else { + let diff = timestamp - real_time_now; + receive_time + diff + } + } + }; + + gst_log!( + self.cat, + obj: element, + "Calculated pts for video frame: {:?}", + pts ); // YV12 and I420 are swapped in the NDI SDK compared to GStreamer @@ -476,18 +555,19 @@ impl BaseSrcImpl for NdiVideoSrc { if state.info.as_ref() != Some(&info) { let caps = info.to_caps().unwrap(); state.info = Some(info); + state.current_latency = gst::SECOND + .mul_div_ceil( + video_frame.frame_rate().1 as u64, + video_frame.frame_rate().0 as u64, + ) + .unwrap_or(gst::CLOCK_TIME_NONE); gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); element .set_caps(&caps) .map_err(|_| gst::FlowError::NotNegotiated)?; - } - gst_log!( - self.cat, - obj: element, - "Calculated pts for video frame: {:?}", - pts - ); + let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + } let mut buffer = gst::Buffer::with_size(state.info.as_ref().unwrap().size()).unwrap(); {