diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index cca33829..82bcfdcd 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -4204,6 +4204,11 @@ "desc": "Receive Time", "name": "receive-time", "value": "5" + }, + { + "desc": "Clocked", + "name": "clocked", + "value": "6" } ] } diff --git a/net/ndi/src/lib.rs b/net/ndi/src/lib.rs index 4e87faab..7072ee9c 100644 --- a/net/ndi/src/lib.rs +++ b/net/ndi/src/lib.rs @@ -49,6 +49,8 @@ pub enum TimestampMode { Timestamp = 4, #[enum_value(name = "Receive Time", nick = "receive-time")] ReceiveTime = 5, + #[enum_value(name = "Clocked", nick = "clocked")] + Clocked = 6, } #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] diff --git a/net/ndi/src/ndisrc/imp.rs b/net/ndi/src/ndisrc/imp.rs index 28a0da8d..7da3761c 100644 --- a/net/ndi/src/ndisrc/imp.rs +++ b/net/ndi/src/ndisrc/imp.rs @@ -65,6 +65,10 @@ impl Default for Settings { } } +const OBSERVATIONS_IDX_AUDIO: usize = 0; +const OBSERVATIONS_IDX_VIDEO: usize = 1; +const OBSERVATIONS_IDX_METADATA: usize = 2; + #[derive(Default)] struct State { receiver: Option, @@ -73,6 +77,17 @@ struct State { observations_timestamp: [Observations; 3], observations_timecode: [Observations; 3], current_latency: Option, + // Clock and other state when in TimestampMode::Clocked + clock_state: Option, +} + +struct ClockState { + clock: gst::Clock, + // base timecode and base capture time to convert a timecode to its clock time + base_timecode: Option, + base_receive_time: Option, + // last min delta from the timecode observations + last_min_delta: Option, } pub struct NdiSrc { @@ -165,6 +180,7 @@ impl ObjectImpl for NdiSrc { let obj = self.obj(); obj.set_live(true); obj.set_format(gst::Format::Time); + obj.set_element_flags(gst::ElementFlags::REQUIRE_CLOCK | gst::ElementFlags::PROVIDE_CLOCK); } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { @@ -431,6 +447,24 @@ impl ElementImpl for NdiSrc { let mut state = self.state.lock().unwrap(); state.receiver = Some(receiver); state.timestamp_mode = settings.timestamp_mode; + if state.timestamp_mode == TimestampMode::Clocked { + let clock = glib::Object::builder::() + .property("name", format!("{}-clock", self.obj().name())) + .build() + .upcast::(); + state.clock_state = Some(ClockState { + clock: clock.clone(), + base_timecode: None, + base_receive_time: None, + last_min_delta: None, + }); + drop(state); + let _ = self.obj().post_message( + gst::message::ClockProvide::builder(&clock, true) + .src(&*self.obj()) + .build(), + ); + } } } } @@ -443,13 +477,28 @@ impl ElementImpl for NdiSrc { match transition { gst::StateChange::PausedToReady => { *self.receiver_controller.lock().unwrap() = None; - *self.state.lock().unwrap() = State::default(); + let mut state = self.state.lock().unwrap(); + let clock = state.clock_state.as_ref().map(|s| s.clock.clone()); + *state = State::default(); + drop(state); + if let Some(clock) = clock { + let _ = self.obj().post_message( + gst::message::ClockLost::builder(&clock) + .src(&*self.obj()) + .build(), + ); + } } _ => (), } Ok(res) } + + fn provide_clock(&self) -> Option { + let state = self.state.lock().unwrap(); + state.clock_state.as_ref().map(|s| s.clock.clone()) + } } impl BaseSrcImpl for NdiSrc { @@ -494,6 +543,7 @@ impl BaseSrcImpl for NdiSrc { TimestampMode::Auto | TimestampMode::ReceiveTimeTimecode | TimestampMode::ReceiveTimeTimestamp + | TimestampMode::Clocked ) { latency } else { @@ -666,7 +716,7 @@ impl NdiSrc { let res_timecode = if matches!( state.timestamp_mode, - TimestampMode::ReceiveTimeTimecode | TimestampMode::Auto + TimestampMode::ReceiveTimeTimecode | TimestampMode::Auto | TimestampMode::Clocked ) { state.observations_timecode[idx].process( self.obj().upcast_ref(), @@ -719,6 +769,14 @@ impl NdiSrc { .or(res_timestamp) .unwrap_or((receive_time_gst, duration, false)) } + TimestampMode::Clocked => self.calculate_timestamp_from_clock( + state.clock_state.as_mut().unwrap(), + &state.observations_timecode[..2], + res_timecode.map(|(_, _, discont)| discont).unwrap_or(true), + receive_time_gst, + timecode, + duration, + ), }; gst::log!( @@ -746,7 +804,7 @@ impl NdiSrc { self.calculate_timestamp( state, - 0, + OBSERVATIONS_IDX_VIDEO, receive_time_gst, receive_time_real, video_frame.timestamp(), @@ -769,7 +827,7 @@ impl NdiSrc { self.calculate_timestamp( state, - 1, + OBSERVATIONS_IDX_AUDIO, receive_time_gst, receive_time_real, audio_frame.timestamp(), @@ -787,7 +845,7 @@ impl NdiSrc { ) -> (gst::ClockTime, Option, bool) { self.calculate_timestamp( state, - 2, + OBSERVATIONS_IDX_METADATA, receive_time_gst, receive_time_real, ndisys::NDIlib_recv_timestamp_undefined, @@ -795,18 +853,148 @@ impl NdiSrc { gst::ClockTime::NONE, ) } + + fn calculate_timestamp_from_clock( + &self, + state: &mut ClockState, + observations: &[Observations], + mut discont: bool, + receive_time: gst::ClockTime, + timecode: gst::ClockTime, + duration: Option, + ) -> (gst::ClockTime, Option, bool) { + let current_min_delta = observations + .iter() + .filter_map(|o| o.min_delta()) + .min_by_key(|delta| delta.delta); + + // If the minimum delta was updated then update the clock mapping + if let Some(current_min_delta) = current_min_delta { + if Some(current_min_delta) != state.last_min_delta { + state.last_min_delta = Some(current_min_delta); + + if discont || Option::zip(state.base_timecode, state.base_receive_time).is_none() { + // On DISCONT or if we don't have a base timecode / base capture time mapping yet, + // select one and update the clock calibration in a way that this base clock time + // maps to the current time. This is needed so that the clock time is + // continuous all the time. + let (internal, external, num, denom) = state.clock.calibration(); + + let clock_time = gst::Clock::adjust_with_calibration( + current_min_delta.local_time, + internal, + external, + num, + denom, + ); + + gst::debug!( + CAT, + imp = self, + "Initializing clock with internal {} external {clock_time} at timecode {}", + current_min_delta.local_time, + current_min_delta.remote_time, + ); + + state.base_timecode = Some(current_min_delta.remote_time); + state.base_receive_time = Some(current_min_delta.local_time); + discont = true; + } else { + let (base_timecode, base_receive_time) = + Option::zip(state.base_timecode, state.base_receive_time).unwrap(); + // Calculate the clock time from the timecode by offsetting accordingly + let clock_time = (current_min_delta.remote_time + base_receive_time) + .saturating_sub(base_timecode); + + gst::trace!( + CAT, + imp = self, + "Adding observation internal {} external {clock_time} at timecode {}", + current_min_delta.local_time, + current_min_delta.remote_time, + ); + + if let Some(r_squared) = state + .clock + .add_observation(current_min_delta.local_time, clock_time) + { + gst::trace!(CAT, imp = self, "R² = {r_squared}"); + } + } + } + } + + let clock_time = if let Some((base_timecode, base_receive_time)) = + Option::zip(state.base_timecode, state.base_receive_time) + { + // Calculate the clock time from the timecode by offsetting accordingly + (timecode + base_receive_time).saturating_sub(base_timecode) + } else { + // If we have no base yet then simply convert the receive time to the clock + let (internal, external, num, denom) = state.clock.calibration(); + gst::Clock::adjust_with_calibration(receive_time, internal, external, num, denom) + }; + + let external_clock = self.obj().clock().unwrap(); + let external_clock_time; + + if external_clock == state.clock { + // If the internal and external clock are the same we can just use the + // calculated clock time above verbatim + external_clock_time = clock_time; + } else if external_clock + .downcast_ref::() + .is_some_and(|external_clock| external_clock.clock_type() == gst::ClockType::Monotonic) + { + // If the external clock is the monotonic system clock then we can use the + // calibration of the internal clock to calculate the corresponding monotonic + // clock time. + // + // While we have the actual monotonic clock time as capture time above this + // would be very jittery. + let (internal, external, num, denom) = external_clock.calibration(); + external_clock_time = + gst::Clock::unadjust_with_calibration(clock_time, internal, external, num, denom); + } else { + // Otherwise measure the difference between both clocks and work with that. + let now_internal = state.clock.time().unwrap(); + let now_external = external_clock.time().unwrap(); + + if now_internal > now_external { + let diff = now_internal - now_external; + external_clock_time = clock_time.saturating_sub(diff); + } else { + let diff = now_external - now_internal; + external_clock_time = clock_time + diff; + } + } + + let base_time = self.obj().base_time(); + let pts = base_time + .map(|base_time| external_clock_time.saturating_sub(base_time)) + .unwrap_or(gst::ClockTime::ZERO); + + (pts, duration, discont) + } } const WINDOW_LENGTH: u64 = 512; -const WINDOW_DURATION: u64 = 2_000_000_000; +const WINDOW_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(2); + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +struct Delta { + delta: i64, + local_time: gst::ClockTime, + remote_time: gst::ClockTime, +} struct Observations { - base_local_time: Option, - base_remote_time: Option, - // Difference between local and remote time - deltas: VecDeque, - // Current minimum difference - min_delta: i64, + base_local_time: Option, + base_remote_time: Option, + // Difference between local and remote time, and local/remote time relative to the base times + deltas: VecDeque, + // Current minimum difference and the corresponding local/remote time + min_delta: Delta, // Running average of the minimum difference skew: i64, filling: bool, @@ -819,7 +1007,7 @@ impl Default for Observations { base_local_time: None, base_remote_time: None, deltas: VecDeque::new(), - min_delta: 0, + min_delta: Delta::default(), skew: 0, filling: true, window_size: 0, @@ -832,7 +1020,7 @@ impl Observations { self.base_local_time = None; self.base_remote_time = None; self.deltas = VecDeque::new(); - self.min_delta = 0; + self.min_delta = Delta::default(); self.skew = 0; self.filling = true; self.window_size = 0; @@ -848,15 +1036,14 @@ impl Observations { local_time: gst::ClockTime, duration: Option, ) -> Option<(gst::ClockTime, Option, bool)> { - let remote_time = remote_time?.nseconds(); - let local_time = local_time.nseconds(); + let remote_time = remote_time?; gst::trace!( CAT, obj = element, "Local time {}, remote time {}", - local_time.nseconds(), - remote_time.nseconds(), + local_time, + remote_time, ); let (base_remote_time, base_local_time) = @@ -870,31 +1057,29 @@ impl Observations { local_time.nseconds(), remote_time.nseconds(), ); - self.base_remote_time = Some(remote_time); self.base_local_time = Some(local_time); + self.base_remote_time = Some(remote_time); - return Some((local_time.nseconds(), duration, true)); + return Some((local_time, duration, true)); } }; - let remote_diff = remote_time.saturating_sub(base_remote_time); let local_diff = local_time.saturating_sub(base_local_time); - let delta = (local_diff as i64) - (remote_diff as i64); - let slope = local_diff as f64 / remote_diff as f64; + let remote_diff = remote_time.saturating_sub(base_remote_time); + let delta = (local_diff.nseconds() as i64) - (remote_diff.nseconds() as i64); + let slope = local_diff.nseconds() as f64 / remote_diff.nseconds() as f64; gst::trace!( CAT, obj = element, "Local diff {}, remote diff {}, delta {}, slope {}", - local_diff.nseconds(), - remote_diff.nseconds(), + local_diff, + remote_diff, delta, slope, ); - if local_diff > gst::ClockTime::from_mseconds(500).nseconds() - && !(0.5..1.5).contains(&slope) - { + if local_diff > gst::ClockTime::from_mseconds(500) && !(0.5..1.5).contains(&slope) { gst::warning!( CAT, obj = element, @@ -908,15 +1093,15 @@ impl Observations { CAT, obj = element, "Initializing base time: local {}, remote {}", - local_time.nseconds(), - remote_time.nseconds(), + local_time, + remote_time, ); self.reset(); - self.base_remote_time = Some(remote_time); self.base_local_time = Some(local_time); + self.base_remote_time = Some(remote_time); - return Some((local_time.nseconds(), duration, discont)); + return Some((local_time, duration, discont)); } if (delta > self.skew && delta - self.skew > 1_000_000_000) @@ -936,65 +1121,101 @@ impl Observations { CAT, obj = element, "Initializing base time: local {}, remote {}", - local_time.nseconds(), - remote_time.nseconds(), + local_time, + remote_time, ); self.reset(); - self.base_remote_time = Some(remote_time); self.base_local_time = Some(local_time); + self.base_remote_time = Some(remote_time); - return Some((local_time.nseconds(), duration, discont)); + return Some((local_time, duration, discont)); } if self.filling { - if self.deltas.is_empty() || delta < self.min_delta { - self.min_delta = delta; + if self.deltas.is_empty() || delta < self.min_delta.delta { + self.min_delta = Delta { + delta, + local_time: local_diff, + remote_time: remote_diff, + }; } - self.deltas.push_back(delta); + self.deltas.push_back(Delta { + delta, + local_time: local_diff, + remote_time: remote_diff, + }); if remote_diff > WINDOW_DURATION || self.deltas.len() as u64 == WINDOW_LENGTH { self.window_size = self.deltas.len(); - self.skew = self.min_delta; + self.skew = self.min_delta.delta; self.filling = false; } else { - let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64; + let perc_time = remote_diff + .mul_div_floor(*gst::ClockTime::from_nseconds(100), *WINDOW_DURATION) + .unwrap() + .nseconds() as i64; let perc_window = (self.deltas.len() as u64) .mul_div_floor(100, WINDOW_LENGTH) .unwrap() as i64; let perc = cmp::max(perc_time, perc_window); - self.skew = (perc * self.min_delta + ((10_000 - perc) * self.skew)) / 10_000; + self.skew = (perc * self.min_delta.delta + ((10_000 - perc) * self.skew)) / 10_000; } } else { let old = self.deltas.pop_front().unwrap(); - self.deltas.push_back(delta); + self.deltas.push_back(Delta { + delta, + local_time: local_diff, + remote_time: remote_diff, + }); - if delta <= self.min_delta { - self.min_delta = delta; - } else if old == self.min_delta { - self.min_delta = self.deltas.iter().copied().min().unwrap(); + if delta <= self.min_delta.delta { + self.min_delta = Delta { + delta, + local_time: local_diff, + remote_time: remote_diff, + }; + } else if old.delta == self.min_delta.delta { + self.min_delta = self + .deltas + .iter() + .copied() + .min_by_key(|delta| delta.delta) + .unwrap(); } - self.skew = (self.min_delta + (124 * self.skew)) / 125; + self.skew = (self.min_delta.delta + (124 * self.skew)) / 125; } let out_time = base_local_time + remote_diff; let out_time = if self.skew < 0 { - out_time.saturating_sub((-self.skew) as u64) + out_time.saturating_sub(gst::ClockTime::from_nseconds((-self.skew) as u64)) } else { - out_time + (self.skew as u64) + out_time + gst::ClockTime::from_nseconds(self.skew as u64) }; gst::trace!( CAT, obj = element, - "Skew {}, min delta {}", + "Skew {}, min delta {} at local {} remote {}", self.skew, - self.min_delta + self.min_delta.delta, + self.min_delta.local_time, + self.min_delta.remote_time, ); - gst::trace!(CAT, obj = element, "Outputting {}", out_time.nseconds()); + gst::trace!(CAT, obj = element, "Outputting {}", out_time); - Some((out_time.nseconds(), duration, false)) + Some((out_time, duration, false)) + } + + fn min_delta(&self) -> Option { + Option::zip(self.base_local_time, self.base_remote_time).map( + |(base_local_time, base_remote_time)| Delta { + delta: self.min_delta.delta, + local_time: base_local_time + self.min_delta.local_time, + remote_time: base_remote_time + self.min_delta.remote_time, + }, + ) } } diff --git a/net/ndi/src/ndisrc/receiver.rs b/net/ndi/src/ndisrc/receiver.rs index 03681d0f..dae70c99 100644 --- a/net/ndi/src/ndisrc/receiver.rs +++ b/net/ndi/src/ndisrc/receiver.rs @@ -322,7 +322,19 @@ impl Receiver { continue; } Ok(Some(frame)) => { - if let Some(receive_time_gst) = element.current_running_time() { + // If TimestampMode::Clocked is used then directly use the clock time here, + // otherwise work with the running time. + let receive_time_gst = if let Some(clock) = element.provide_clock() { + Some(clock.internal_time()) + } else if let Some((clock, base_time)) = + Option::zip(element.clock(), element.base_time()) + { + clock.time().map(|now| now.saturating_sub(base_time)) + } else { + None + }; + + if let Some(receive_time_gst) = receive_time_gst { let receive_time_real = (glib::real_time() as u64 * 1000).nseconds(); if matches!(frame, Frame::Video(_) | Frame::Audio(_)) {