ndisrc: Add a clocked timestamp mode that provides a clock that follows the remote timecodes

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1727>
This commit is contained in:
Sebastian Dröge 2024-08-19 17:07:05 +03:00
parent ab3db748be
commit ee4416ee5f
4 changed files with 295 additions and 55 deletions

View file

@ -4204,6 +4204,11 @@
"desc": "Receive Time", "desc": "Receive Time",
"name": "receive-time", "name": "receive-time",
"value": "5" "value": "5"
},
{
"desc": "Clocked",
"name": "clocked",
"value": "6"
} }
] ]
} }

View file

@ -49,6 +49,8 @@ pub enum TimestampMode {
Timestamp = 4, Timestamp = 4,
#[enum_value(name = "Receive Time", nick = "receive-time")] #[enum_value(name = "Receive Time", nick = "receive-time")]
ReceiveTime = 5, ReceiveTime = 5,
#[enum_value(name = "Clocked", nick = "clocked")]
Clocked = 6,
} }
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]

View file

@ -65,6 +65,10 @@ impl Default for Settings {
} }
} }
const OBSERVATIONS_IDX_AUDIO: usize = 0;
const OBSERVATIONS_IDX_VIDEO: usize = 1;
const OBSERVATIONS_IDX_METADATA: usize = 2;
#[derive(Default)] #[derive(Default)]
struct State { struct State {
receiver: Option<Receiver>, receiver: Option<Receiver>,
@ -73,6 +77,17 @@ struct State {
observations_timestamp: [Observations; 3], observations_timestamp: [Observations; 3],
observations_timecode: [Observations; 3], observations_timecode: [Observations; 3],
current_latency: Option<gst::ClockTime>, current_latency: Option<gst::ClockTime>,
// Clock and other state when in TimestampMode::Clocked
clock_state: Option<ClockState>,
}
struct ClockState {
clock: gst::Clock,
// base timecode and base capture time to convert a timecode to its clock time
base_timecode: Option<gst::ClockTime>,
base_receive_time: Option<gst::ClockTime>,
// last min delta from the timecode observations
last_min_delta: Option<Delta>,
} }
pub struct NdiSrc { pub struct NdiSrc {
@ -165,6 +180,7 @@ impl ObjectImpl for NdiSrc {
let obj = self.obj(); let obj = self.obj();
obj.set_live(true); obj.set_live(true);
obj.set_format(gst::Format::Time); obj.set_format(gst::Format::Time);
obj.set_element_flags(gst::ElementFlags::REQUIRE_CLOCK | gst::ElementFlags::PROVIDE_CLOCK);
} }
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
@ -431,6 +447,24 @@ impl ElementImpl for NdiSrc {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.receiver = Some(receiver); state.receiver = Some(receiver);
state.timestamp_mode = settings.timestamp_mode; state.timestamp_mode = settings.timestamp_mode;
if state.timestamp_mode == TimestampMode::Clocked {
let clock = glib::Object::builder::<gst::SystemClock>()
.property("name", format!("{}-clock", self.obj().name()))
.build()
.upcast::<gst::Clock>();
state.clock_state = Some(ClockState {
clock: clock.clone(),
base_timecode: None,
base_receive_time: None,
last_min_delta: None,
});
drop(state);
let _ = self.obj().post_message(
gst::message::ClockProvide::builder(&clock, true)
.src(&*self.obj())
.build(),
);
}
} }
} }
} }
@ -443,13 +477,28 @@ impl ElementImpl for NdiSrc {
match transition { match transition {
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
*self.receiver_controller.lock().unwrap() = None; *self.receiver_controller.lock().unwrap() = None;
*self.state.lock().unwrap() = State::default(); let mut state = self.state.lock().unwrap();
let clock = state.clock_state.as_ref().map(|s| s.clock.clone());
*state = State::default();
drop(state);
if let Some(clock) = clock {
let _ = self.obj().post_message(
gst::message::ClockLost::builder(&clock)
.src(&*self.obj())
.build(),
);
}
} }
_ => (), _ => (),
} }
Ok(res) Ok(res)
} }
fn provide_clock(&self) -> Option<gst::Clock> {
let state = self.state.lock().unwrap();
state.clock_state.as_ref().map(|s| s.clock.clone())
}
} }
impl BaseSrcImpl for NdiSrc { impl BaseSrcImpl for NdiSrc {
@ -494,6 +543,7 @@ impl BaseSrcImpl for NdiSrc {
TimestampMode::Auto TimestampMode::Auto
| TimestampMode::ReceiveTimeTimecode | TimestampMode::ReceiveTimeTimecode
| TimestampMode::ReceiveTimeTimestamp | TimestampMode::ReceiveTimeTimestamp
| TimestampMode::Clocked
) { ) {
latency latency
} else { } else {
@ -666,7 +716,7 @@ impl NdiSrc {
let res_timecode = if matches!( let res_timecode = if matches!(
state.timestamp_mode, state.timestamp_mode,
TimestampMode::ReceiveTimeTimecode | TimestampMode::Auto TimestampMode::ReceiveTimeTimecode | TimestampMode::Auto | TimestampMode::Clocked
) { ) {
state.observations_timecode[idx].process( state.observations_timecode[idx].process(
self.obj().upcast_ref(), self.obj().upcast_ref(),
@ -719,6 +769,14 @@ impl NdiSrc {
.or(res_timestamp) .or(res_timestamp)
.unwrap_or((receive_time_gst, duration, false)) .unwrap_or((receive_time_gst, duration, false))
} }
TimestampMode::Clocked => self.calculate_timestamp_from_clock(
state.clock_state.as_mut().unwrap(),
&state.observations_timecode[..2],
res_timecode.map(|(_, _, discont)| discont).unwrap_or(true),
receive_time_gst,
timecode,
duration,
),
}; };
gst::log!( gst::log!(
@ -746,7 +804,7 @@ impl NdiSrc {
self.calculate_timestamp( self.calculate_timestamp(
state, state,
0, OBSERVATIONS_IDX_VIDEO,
receive_time_gst, receive_time_gst,
receive_time_real, receive_time_real,
video_frame.timestamp(), video_frame.timestamp(),
@ -769,7 +827,7 @@ impl NdiSrc {
self.calculate_timestamp( self.calculate_timestamp(
state, state,
1, OBSERVATIONS_IDX_AUDIO,
receive_time_gst, receive_time_gst,
receive_time_real, receive_time_real,
audio_frame.timestamp(), audio_frame.timestamp(),
@ -787,7 +845,7 @@ impl NdiSrc {
) -> (gst::ClockTime, Option<gst::ClockTime>, bool) { ) -> (gst::ClockTime, Option<gst::ClockTime>, bool) {
self.calculate_timestamp( self.calculate_timestamp(
state, state,
2, OBSERVATIONS_IDX_METADATA,
receive_time_gst, receive_time_gst,
receive_time_real, receive_time_real,
ndisys::NDIlib_recv_timestamp_undefined, ndisys::NDIlib_recv_timestamp_undefined,
@ -795,18 +853,148 @@ impl NdiSrc {
gst::ClockTime::NONE, gst::ClockTime::NONE,
) )
} }
fn calculate_timestamp_from_clock(
&self,
state: &mut ClockState,
observations: &[Observations],
mut discont: bool,
receive_time: gst::ClockTime,
timecode: gst::ClockTime,
duration: Option<gst::ClockTime>,
) -> (gst::ClockTime, Option<gst::ClockTime>, bool) {
let current_min_delta = observations
.iter()
.filter_map(|o| o.min_delta())
.min_by_key(|delta| delta.delta);
// If the minimum delta was updated then update the clock mapping
if let Some(current_min_delta) = current_min_delta {
if Some(current_min_delta) != state.last_min_delta {
state.last_min_delta = Some(current_min_delta);
if discont || Option::zip(state.base_timecode, state.base_receive_time).is_none() {
// On DISCONT or if we don't have a base timecode / base capture time mapping yet,
// select one and update the clock calibration in a way that this base clock time
// maps to the current time. This is needed so that the clock time is
// continuous all the time.
let (internal, external, num, denom) = state.clock.calibration();
let clock_time = gst::Clock::adjust_with_calibration(
current_min_delta.local_time,
internal,
external,
num,
denom,
);
gst::debug!(
CAT,
imp = self,
"Initializing clock with internal {} external {clock_time} at timecode {}",
current_min_delta.local_time,
current_min_delta.remote_time,
);
state.base_timecode = Some(current_min_delta.remote_time);
state.base_receive_time = Some(current_min_delta.local_time);
discont = true;
} else {
let (base_timecode, base_receive_time) =
Option::zip(state.base_timecode, state.base_receive_time).unwrap();
// Calculate the clock time from the timecode by offsetting accordingly
let clock_time = (current_min_delta.remote_time + base_receive_time)
.saturating_sub(base_timecode);
gst::trace!(
CAT,
imp = self,
"Adding observation internal {} external {clock_time} at timecode {}",
current_min_delta.local_time,
current_min_delta.remote_time,
);
if let Some(r_squared) = state
.clock
.add_observation(current_min_delta.local_time, clock_time)
{
gst::trace!(CAT, imp = self, "R² = {r_squared}");
}
}
}
}
let clock_time = if let Some((base_timecode, base_receive_time)) =
Option::zip(state.base_timecode, state.base_receive_time)
{
// Calculate the clock time from the timecode by offsetting accordingly
(timecode + base_receive_time).saturating_sub(base_timecode)
} else {
// If we have no base yet then simply convert the receive time to the clock
let (internal, external, num, denom) = state.clock.calibration();
gst::Clock::adjust_with_calibration(receive_time, internal, external, num, denom)
};
let external_clock = self.obj().clock().unwrap();
let external_clock_time;
if external_clock == state.clock {
// If the internal and external clock are the same we can just use the
// calculated clock time above verbatim
external_clock_time = clock_time;
} else if external_clock
.downcast_ref::<gst::SystemClock>()
.is_some_and(|external_clock| external_clock.clock_type() == gst::ClockType::Monotonic)
{
// If the external clock is the monotonic system clock then we can use the
// calibration of the internal clock to calculate the corresponding monotonic
// clock time.
//
// While we have the actual monotonic clock time as capture time above this
// would be very jittery.
let (internal, external, num, denom) = external_clock.calibration();
external_clock_time =
gst::Clock::unadjust_with_calibration(clock_time, internal, external, num, denom);
} else {
// Otherwise measure the difference between both clocks and work with that.
let now_internal = state.clock.time().unwrap();
let now_external = external_clock.time().unwrap();
if now_internal > now_external {
let diff = now_internal - now_external;
external_clock_time = clock_time.saturating_sub(diff);
} else {
let diff = now_external - now_internal;
external_clock_time = clock_time + diff;
}
}
let base_time = self.obj().base_time();
let pts = base_time
.map(|base_time| external_clock_time.saturating_sub(base_time))
.unwrap_or(gst::ClockTime::ZERO);
(pts, duration, discont)
}
} }
const WINDOW_LENGTH: u64 = 512; const WINDOW_LENGTH: u64 = 512;
const WINDOW_DURATION: u64 = 2_000_000_000; const WINDOW_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(2);
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
struct Delta {
delta: i64,
local_time: gst::ClockTime,
remote_time: gst::ClockTime,
}
struct Observations { struct Observations {
base_local_time: Option<u64>, base_local_time: Option<gst::ClockTime>,
base_remote_time: Option<u64>, base_remote_time: Option<gst::ClockTime>,
// Difference between local and remote time // Difference between local and remote time, and local/remote time relative to the base times
deltas: VecDeque<i64>, deltas: VecDeque<Delta>,
// Current minimum difference // Current minimum difference and the corresponding local/remote time
min_delta: i64, min_delta: Delta,
// Running average of the minimum difference // Running average of the minimum difference
skew: i64, skew: i64,
filling: bool, filling: bool,
@ -819,7 +1007,7 @@ impl Default for Observations {
base_local_time: None, base_local_time: None,
base_remote_time: None, base_remote_time: None,
deltas: VecDeque::new(), deltas: VecDeque::new(),
min_delta: 0, min_delta: Delta::default(),
skew: 0, skew: 0,
filling: true, filling: true,
window_size: 0, window_size: 0,
@ -832,7 +1020,7 @@ impl Observations {
self.base_local_time = None; self.base_local_time = None;
self.base_remote_time = None; self.base_remote_time = None;
self.deltas = VecDeque::new(); self.deltas = VecDeque::new();
self.min_delta = 0; self.min_delta = Delta::default();
self.skew = 0; self.skew = 0;
self.filling = true; self.filling = true;
self.window_size = 0; self.window_size = 0;
@ -848,15 +1036,14 @@ impl Observations {
local_time: gst::ClockTime, local_time: gst::ClockTime,
duration: Option<gst::ClockTime>, duration: Option<gst::ClockTime>,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> { ) -> Option<(gst::ClockTime, Option<gst::ClockTime>, bool)> {
let remote_time = remote_time?.nseconds(); let remote_time = remote_time?;
let local_time = local_time.nseconds();
gst::trace!( gst::trace!(
CAT, CAT,
obj = element, obj = element,
"Local time {}, remote time {}", "Local time {}, remote time {}",
local_time.nseconds(), local_time,
remote_time.nseconds(), remote_time,
); );
let (base_remote_time, base_local_time) = let (base_remote_time, base_local_time) =
@ -870,31 +1057,29 @@ impl Observations {
local_time.nseconds(), local_time.nseconds(),
remote_time.nseconds(), remote_time.nseconds(),
); );
self.base_remote_time = Some(remote_time);
self.base_local_time = Some(local_time); self.base_local_time = Some(local_time);
self.base_remote_time = Some(remote_time);
return Some((local_time.nseconds(), duration, true)); return Some((local_time, duration, true));
} }
}; };
let remote_diff = remote_time.saturating_sub(base_remote_time);
let local_diff = local_time.saturating_sub(base_local_time); let local_diff = local_time.saturating_sub(base_local_time);
let delta = (local_diff as i64) - (remote_diff as i64); let remote_diff = remote_time.saturating_sub(base_remote_time);
let slope = local_diff as f64 / remote_diff as f64; let delta = (local_diff.nseconds() as i64) - (remote_diff.nseconds() as i64);
let slope = local_diff.nseconds() as f64 / remote_diff.nseconds() as f64;
gst::trace!( gst::trace!(
CAT, CAT,
obj = element, obj = element,
"Local diff {}, remote diff {}, delta {}, slope {}", "Local diff {}, remote diff {}, delta {}, slope {}",
local_diff.nseconds(), local_diff,
remote_diff.nseconds(), remote_diff,
delta, delta,
slope, slope,
); );
if local_diff > gst::ClockTime::from_mseconds(500).nseconds() if local_diff > gst::ClockTime::from_mseconds(500) && !(0.5..1.5).contains(&slope) {
&& !(0.5..1.5).contains(&slope)
{
gst::warning!( gst::warning!(
CAT, CAT,
obj = element, obj = element,
@ -908,15 +1093,15 @@ impl Observations {
CAT, CAT,
obj = element, obj = element,
"Initializing base time: local {}, remote {}", "Initializing base time: local {}, remote {}",
local_time.nseconds(), local_time,
remote_time.nseconds(), remote_time,
); );
self.reset(); self.reset();
self.base_remote_time = Some(remote_time);
self.base_local_time = Some(local_time); self.base_local_time = Some(local_time);
self.base_remote_time = Some(remote_time);
return Some((local_time.nseconds(), duration, discont)); return Some((local_time, duration, discont));
} }
if (delta > self.skew && delta - self.skew > 1_000_000_000) if (delta > self.skew && delta - self.skew > 1_000_000_000)
@ -936,65 +1121,101 @@ impl Observations {
CAT, CAT,
obj = element, obj = element,
"Initializing base time: local {}, remote {}", "Initializing base time: local {}, remote {}",
local_time.nseconds(), local_time,
remote_time.nseconds(), remote_time,
); );
self.reset(); self.reset();
self.base_remote_time = Some(remote_time);
self.base_local_time = Some(local_time); self.base_local_time = Some(local_time);
self.base_remote_time = Some(remote_time);
return Some((local_time.nseconds(), duration, discont)); return Some((local_time, duration, discont));
} }
if self.filling { if self.filling {
if self.deltas.is_empty() || delta < self.min_delta { if self.deltas.is_empty() || delta < self.min_delta.delta {
self.min_delta = delta; self.min_delta = Delta {
delta,
local_time: local_diff,
remote_time: remote_diff,
};
} }
self.deltas.push_back(delta); self.deltas.push_back(Delta {
delta,
local_time: local_diff,
remote_time: remote_diff,
});
if remote_diff > WINDOW_DURATION || self.deltas.len() as u64 == WINDOW_LENGTH { if remote_diff > WINDOW_DURATION || self.deltas.len() as u64 == WINDOW_LENGTH {
self.window_size = self.deltas.len(); self.window_size = self.deltas.len();
self.skew = self.min_delta; self.skew = self.min_delta.delta;
self.filling = false; self.filling = false;
} else { } else {
let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64; let perc_time = remote_diff
.mul_div_floor(*gst::ClockTime::from_nseconds(100), *WINDOW_DURATION)
.unwrap()
.nseconds() as i64;
let perc_window = (self.deltas.len() as u64) let perc_window = (self.deltas.len() as u64)
.mul_div_floor(100, WINDOW_LENGTH) .mul_div_floor(100, WINDOW_LENGTH)
.unwrap() as i64; .unwrap() as i64;
let perc = cmp::max(perc_time, perc_window); let perc = cmp::max(perc_time, perc_window);
self.skew = (perc * self.min_delta + ((10_000 - perc) * self.skew)) / 10_000; self.skew = (perc * self.min_delta.delta + ((10_000 - perc) * self.skew)) / 10_000;
} }
} else { } else {
let old = self.deltas.pop_front().unwrap(); let old = self.deltas.pop_front().unwrap();
self.deltas.push_back(delta); self.deltas.push_back(Delta {
delta,
local_time: local_diff,
remote_time: remote_diff,
});
if delta <= self.min_delta { if delta <= self.min_delta.delta {
self.min_delta = delta; self.min_delta = Delta {
} else if old == self.min_delta { delta,
self.min_delta = self.deltas.iter().copied().min().unwrap(); local_time: local_diff,
remote_time: remote_diff,
};
} else if old.delta == self.min_delta.delta {
self.min_delta = self
.deltas
.iter()
.copied()
.min_by_key(|delta| delta.delta)
.unwrap();
} }
self.skew = (self.min_delta + (124 * self.skew)) / 125; self.skew = (self.min_delta.delta + (124 * self.skew)) / 125;
} }
let out_time = base_local_time + remote_diff; let out_time = base_local_time + remote_diff;
let out_time = if self.skew < 0 { let out_time = if self.skew < 0 {
out_time.saturating_sub((-self.skew) as u64) out_time.saturating_sub(gst::ClockTime::from_nseconds((-self.skew) as u64))
} else { } else {
out_time + (self.skew as u64) out_time + gst::ClockTime::from_nseconds(self.skew as u64)
}; };
gst::trace!( gst::trace!(
CAT, CAT,
obj = element, obj = element,
"Skew {}, min delta {}", "Skew {}, min delta {} at local {} remote {}",
self.skew, self.skew,
self.min_delta self.min_delta.delta,
self.min_delta.local_time,
self.min_delta.remote_time,
); );
gst::trace!(CAT, obj = element, "Outputting {}", out_time.nseconds()); gst::trace!(CAT, obj = element, "Outputting {}", out_time);
Some((out_time.nseconds(), duration, false)) Some((out_time, duration, false))
}
fn min_delta(&self) -> Option<Delta> {
Option::zip(self.base_local_time, self.base_remote_time).map(
|(base_local_time, base_remote_time)| Delta {
delta: self.min_delta.delta,
local_time: base_local_time + self.min_delta.local_time,
remote_time: base_remote_time + self.min_delta.remote_time,
},
)
} }
} }

View file

@ -322,7 +322,19 @@ impl Receiver {
continue; continue;
} }
Ok(Some(frame)) => { Ok(Some(frame)) => {
if let Some(receive_time_gst) = element.current_running_time() { // If TimestampMode::Clocked is used then directly use the clock time here,
// otherwise work with the running time.
let receive_time_gst = if let Some(clock) = element.provide_clock() {
Some(clock.internal_time())
} else if let Some((clock, base_time)) =
Option::zip(element.clock(), element.base_time())
{
clock.time().map(|now| now.saturating_sub(base_time))
} else {
None
};
if let Some(receive_time_gst) = receive_time_gst {
let receive_time_real = (glib::real_time() as u64 * 1000).nseconds(); let receive_time_real = (glib::real_time() as u64 * 1000).nseconds();
if matches!(frame, Frame::Video(_) | Frame::Audio(_)) { if matches!(frame, Frame::Video(_) | Frame::Audio(_)) {