diff --git a/net/ndi/src/lib.rs b/net/ndi/src/lib.rs index dfdabf29..d1287ec4 100644 --- a/net/ndi/src/lib.rs +++ b/net/ndi/src/lib.rs @@ -24,16 +24,18 @@ use once_cell::sync::Lazy; #[repr(u32)] #[enum_type(name = "GstNdiTimestampMode")] pub enum TimestampMode { + #[enum_value(name = "Auto", nick = "auto")] + Auto = 0, #[enum_value(name = "Receive Time / Timecode", nick = "receive-time-vs-timecode")] - ReceiveTimeTimecode = 0, + ReceiveTimeTimecode = 1, #[enum_value(name = "Receive Time / Timestamp", nick = "receive-time-vs-timestamp")] - ReceiveTimeTimestamp = 1, + ReceiveTimeTimestamp = 2, #[enum_value(name = "NDI Timecode", nick = "timecode")] - Timecode = 2, + Timecode = 3, #[enum_value(name = "NDI Timestamp", nick = "timestamp")] - Timestamp = 3, + Timestamp = 4, #[enum_value(name = "Receive Time", nick = "receive-time")] - ReceiveTime = 4, + ReceiveTime = 5, } #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] diff --git a/net/ndi/src/ndisrc/imp.rs b/net/ndi/src/ndisrc/imp.rs index 8bed052a..bc19331c 100644 --- a/net/ndi/src/ndisrc/imp.rs +++ b/net/ndi/src/ndisrc/imp.rs @@ -53,7 +53,7 @@ impl Default for Settings { max_queue_length: 10, bandwidth: ndisys::NDIlib_recv_bandwidth_highest, color_format: RecvColorFormat::UyvyBgra, - timestamp_mode: TimestampMode::ReceiveTimeTimecode, + timestamp_mode: TimestampMode::Auto, } } } diff --git a/net/ndi/src/receiver.rs b/net/ndi/src/receiver.rs index c01b8403..e2228873 100644 --- a/net/ndi/src/receiver.rs +++ b/net/ndi/src/receiver.rs @@ -22,7 +22,6 @@ static CAT: Lazy = Lazy::new(|| { ) }); -#[derive(Clone)] pub struct Receiver(Arc); #[derive(Debug, PartialEq, Eq)] @@ -226,6 +225,7 @@ struct ReceiverQueueInner { timeout: bool, } +const PREFILL_WINDOW_LENGTH: usize = 12; const WINDOW_LENGTH: u64 = 512; const WINDOW_DURATION: u64 = 2_000_000_000; @@ -240,6 +240,11 @@ struct ObservationsInner { skew: i64, filling: bool, window_size: usize, + + // Remote/local times for workaround around fundamentally wrong slopes + // This is not reset below and has a bigger window. + times: VecDeque<(u64, u64)>, + slope_correction: (u64, u64), } impl Default for ObservationsInner { @@ -252,10 +257,24 @@ impl Default for ObservationsInner { skew: 0, filling: true, window_size: 0, + times: VecDeque::new(), + slope_correction: (1, 1), } } } +impl ObservationsInner { + fn reset(&mut self) { + self.base_local_time = None; + self.base_remote_time = None; + self.deltas = VecDeque::new(); + self.min_delta = 0; + self.skew = 0; + self.filling = true; + self.window_size = 0; + } +} + impl Observations { // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays": @@ -266,22 +285,41 @@ impl Observations { remote_time: Option, local_time: gst::ClockTime, duration: Option, - ) -> (gst::ClockTime, Option, bool) { - let remote_time = match remote_time { - None => return (local_time, duration, false), - Some(remote_time) => remote_time.nseconds(), - }; + ) -> Option<(gst::ClockTime, Option, bool)> { + let remote_time = remote_time?.nseconds(); let local_time = local_time.nseconds(); + let mut inner = self.0.borrow_mut(); + gst_trace!( CAT, obj: element, - "Local time {}, remote time {}", + "Local time {}, remote time {}, slope correct {}/{}", gst::ClockTime::from_nseconds(local_time), gst::ClockTime::from_nseconds(remote_time), + inner.slope_correction.0, + inner.slope_correction.1, ); - let mut inner = self.0.borrow_mut(); + inner.times.push_back((remote_time, local_time)); + while inner + .times + .back() + .unwrap() + .1 + .saturating_sub(inner.times.front().unwrap().1) + > WINDOW_DURATION + { + let _ = inner.times.pop_front(); + } + + // Static remote times + if inner.slope_correction.1 == 0 { + return None; + } + + let remote_time = + remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; let (base_remote_time, base_local_time) = match (inner.base_remote_time, inner.base_local_time) { @@ -297,10 +335,102 @@ impl Observations { inner.base_remote_time = Some(remote_time); inner.base_local_time = Some(local_time); - return (gst::ClockTime::from_nseconds(local_time), duration, true); + return Some((gst::ClockTime::from_nseconds(local_time), duration, true)); } }; + if inner.times.len() < PREFILL_WINDOW_LENGTH { + return Some((gst::ClockTime::from_nseconds(local_time), duration, false)); + } + + // Check if the slope is simply wrong and try correcting + { + let local_diff = inner + .times + .back() + .unwrap() + .1 + .saturating_sub(inner.times.front().unwrap().1); + let remote_diff = inner + .times + .back() + .unwrap() + .0 + .saturating_sub(inner.times.front().unwrap().0); + + if remote_diff == 0 { + inner.reset(); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + // Static remote times + inner.slope_correction = (0, 0); + return None; + } else { + let slope = local_diff as f64 / remote_diff as f64; + let scaled_slope = + slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64); + + // Check for some obviously wrong slopes and try to correct for that + if !(0.5..1.5).contains(&scaled_slope) { + gst_warning!( + CAT, + obj: element, + "Too small/big slope {}, resetting", + scaled_slope + ); + + let discont = !inner.deltas.is_empty(); + inner.reset(); + + if (0.0005..0.0015).contains(&slope) { + // Remote unit was actually 0.1ns + inner.slope_correction = (1, 1000); + } else if (0.005..0.015).contains(&slope) { + // Remote unit was actually 1ns + inner.slope_correction = (1, 100); + } else if (0.05..0.15).contains(&slope) { + // Remote unit was actually 10ns + inner.slope_correction = (1, 10); + } else if (5.0..15.0).contains(&slope) { + // Remote unit was actually 1us + inner.slope_correction = (10, 1); + } else if (50.0..150.0).contains(&slope) { + // Remote unit was actually 10us + inner.slope_correction = (100, 1); + } else if (50.0..150.0).contains(&slope) { + // Remote unit was actually 100us + inner.slope_correction = (1000, 1); + } else if (50.0..150.0).contains(&slope) { + // Remote unit was actually 1ms + inner.slope_correction = (10000, 1); + } else { + inner.slope_correction = (1, 1); + } + + let remote_time = inner + .times + .back() + .unwrap() + .0 + .mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; + gst_debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}, slope correction {}/{}", + gst::ClockTime::from_nseconds(local_time), + gst::ClockTime::from_nseconds(remote_time), + inner.slope_correction.0, + inner.slope_correction.1, + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return Some((gst::ClockTime::from_nseconds(local_time), duration, discont)); + } + } + } + let remote_diff = remote_time.saturating_sub(base_remote_time); let local_diff = local_time.saturating_sub(base_local_time); let delta = (local_diff as i64) - (remote_diff as i64); @@ -314,33 +444,6 @@ impl Observations { delta, ); - if remote_diff > 0 && local_diff > 0 { - let slope = (local_diff as f64) / (remote_diff as f64); - if !(0.8..1.2).contains(&slope) { - gst_warning!( - CAT, - obj: element, - "Too small/big slope {}, resetting", - slope - ); - - let discont = !inner.deltas.is_empty(); - *inner = ObservationsInner::default(); - - gst_debug!( - CAT, - obj: element, - "Initializing base time: local {}, remote {}", - gst::ClockTime::from_nseconds(local_time), - gst::ClockTime::from_nseconds(remote_time), - ); - inner.base_remote_time = Some(remote_time); - inner.base_local_time = Some(local_time); - - return (gst::ClockTime::from_nseconds(local_time), duration, discont); - } - } - if (delta > inner.skew && delta - inner.skew > 1_000_000_000) || (delta < inner.skew && inner.skew - delta > 1_000_000_000) { @@ -353,7 +456,6 @@ impl Observations { ); let discont = !inner.deltas.is_empty(); - *inner = ObservationsInner::default(); gst_debug!( CAT, @@ -362,10 +464,12 @@ impl Observations { gst::ClockTime::from_nseconds(local_time), gst::ClockTime::from_nseconds(remote_time), ); + + inner.reset(); inner.base_remote_time = Some(remote_time); inner.base_local_time = Some(local_time); - return (gst::ClockTime::from_nseconds(local_time), duration, discont); + return Some((gst::ClockTime::from_nseconds(local_time), duration, discont)); } if inner.filling { @@ -421,7 +525,7 @@ impl Observations { gst::ClockTime::from_nseconds(out_time) ); - (gst::ClockTime::from_nseconds(out_time), duration, false) + Some((gst::ClockTime::from_nseconds(out_time), duration, false)) } } @@ -820,8 +924,23 @@ impl Receiver { ); let (pts, duration, discont) = match self.0.timestamp_mode { - TimestampMode::ReceiveTimeTimecode => res_timecode, - TimestampMode::ReceiveTimeTimestamp => res_timestamp, + TimestampMode::ReceiveTimeTimecode => match res_timecode { + Some((pts, duration, discont)) => (pts, duration, discont), + None => { + gst_warning!(CAT, obj: element, "Can't calculate timestamp"); + (receive_time, duration, false) + } + }, + TimestampMode::ReceiveTimeTimestamp => match res_timestamp { + Some((pts, duration, discont)) => (pts, duration, discont), + None => { + if timestamp.is_some() { + gst_warning!(CAT, obj: element, "Can't calculate timestamp"); + } + + (receive_time, duration, false) + } + }, TimestampMode::Timecode => (timecode, duration, false), TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration, false), TimestampMode::Timestamp => { @@ -840,6 +959,11 @@ impl Receiver { } } TimestampMode::ReceiveTime => (receive_time, duration, false), + TimestampMode::Auto => { + res_timecode + .or(res_timestamp) + .unwrap_or((receive_time, duration, false)) + } }; gst_log!(