From febb2fb03594dc8d6cdef07fd8df5939715f8d4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 29 Sep 2021 16:44:16 +0300 Subject: [PATCH 1/5] Add receive-time timestamping mode This directly uses the receive times of each packet. --- src/lib.rs | 2 ++ src/ndisrc/imp.rs | 5 ++++- src/receiver.rs | 1 + 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 790c5bb1..ab2ff161 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,8 @@ pub enum TimestampMode { Timecode = 2, #[genum(name = "NDI Timestamp", nick = "timestamp")] Timestamp = 3, + #[genum(name = "Receive Time", nick = "receive-time")] + ReceiveTime = 4, } fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { diff --git a/src/ndisrc/imp.rs b/src/ndisrc/imp.rs index 4ab3e28d..6467596d 100644 --- a/src/ndisrc/imp.rs +++ b/src/ndisrc/imp.rs @@ -475,7 +475,10 @@ impl BaseSrcImpl for NdiSrc { let settings = self.settings.lock().unwrap(); if let Some(latency) = state.current_latency { - let min = if settings.timestamp_mode != TimestampMode::Timecode { + let min = if matches!( + settings.timestamp_mode, + TimestampMode::ReceiveTimeTimecode | TimestampMode::ReceiveTimeTimestamp + ) { latency } else { gst::ClockTime::ZERO diff --git a/src/receiver.rs b/src/receiver.rs index 9154fbc7..88ada59b 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -679,6 +679,7 @@ impl Receiver { (receive_time + diff, duration) } } + TimestampMode::ReceiveTime => (receive_time, duration), }; gst_log!( From 0c89e0819f2e9345adc4c2f242f8f936ee5d2eca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 29 Sep 2021 16:44:37 +0300 Subject: [PATCH 2/5] Use gst::Element::current_running_time() instead of manually implementing it --- src/receiver.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/receiver.rs b/src/receiver.rs index 88ada59b..2054e8f6 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -624,13 +624,7 @@ impl Receiver { timecode: i64, duration: Option, ) -> Option<(gst::ClockTime, Option)> { - let clock = element.clock()?; - - // For now take the current running time as PTS. At a later time we - // will want to work with the timestamp given by the NDI SDK if available - let now = clock.time()?; - let base_time = element.base_time()?; - let receive_time = now - base_time; + let receive_time = element.current_running_time()?; let real_time_now = gst::ClockTime::from_nseconds(glib::real_time() as u64 * 1000); let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined { From 9a53bcd4054167389b6be02a43859dbf9fe30ed5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 30 Sep 2021 13:24:46 +0300 Subject: [PATCH 3/5] Implement remote/local clock estimation with the same algorithm as the RTP jitterbuffer This gives fewer jumps and generally leads to smoother and more correct results, while at the same time also being faster. --- src/receiver.rs | 346 ++++++++++++++++++++++++------------------------ 1 file changed, 173 insertions(+), 173 deletions(-) diff --git a/src/receiver.rs b/src/receiver.rs index 2054e8f6..dd901edb 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,6 +1,6 @@ use glib::prelude::*; use gst::prelude::*; -use gst::{gst_debug, gst_error, gst_log, gst_warning}; +use gst::{gst_debug, gst_error, gst_log, gst_trace, gst_warning}; use gst_video::prelude::*; use byte_slice_cast::AsMutSliceOf; @@ -74,29 +74,34 @@ struct ReceiverQueueInner { timeout: bool, } -// 100 frames observations window over which we calculate the timestamp drift -// between sender and receiver. A bigger window allows more smoothing out of -// network effects -const WINDOW_LENGTH: usize = 100; +const WINDOW_LENGTH: u64 = 512; +const WINDOW_DURATION: u64 = 2_000_000_000; + #[derive(Clone)] struct Observations(Arc>); -struct ObservationsInner { - // NDI timestamp - GStreamer clock time tuples - values: Vec<(u64, u64)>, - values_tmp: [(u64, u64); WINDOW_LENGTH], - current_mapping: TimeMapping, - next_mapping: TimeMapping, - time_mapping_pending: bool, - // How many frames we skipped since last observation - // we took - skip_count: usize, - // How many frames we skip in this period. once skip_count - // reaches this, we take another observation - skip_period: usize, - // How many observations are left until we update the skip_period - // again. This is always initialized to WINDOW_LENGTH - skip_period_update_in: usize, +struct ObservationsInner { + base_remote_time: Option, + base_local_time: Option, + deltas: VecDeque, + min_delta: i64, + skew: i64, + filling: bool, + window_size: usize, +} + +impl Default for ObservationsInner { + fn default() -> ObservationsInner { + ObservationsInner { + base_local_time: None, + base_remote_time: None, + deltas: VecDeque::new(), + min_delta: 0, + skew: 0, + filling: true, + window_size: 0, + } + } } #[derive(Clone, Copy, Debug)] @@ -109,18 +114,12 @@ struct TimeMapping { impl Observations { fn new() -> Self { - Self(Arc::new(Mutex::new(ObservationsInner { - values: Vec::with_capacity(WINDOW_LENGTH), - values_tmp: [(0, 0); WINDOW_LENGTH], - current_mapping: TimeMapping::default(), - next_mapping: TimeMapping::default(), - time_mapping_pending: false, - skip_count: 0, - skip_period: 1, - skip_period_update_in: WINDOW_LENGTH, - }))) + Self(Arc::new(Mutex::new(ObservationsInner::default()))) } + // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from + // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays": + // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 fn process( &self, element: &gst_base::BaseSrc, @@ -132,151 +131,152 @@ impl Observations { } let time = (time.0.unwrap(), time.1); + let remote_time = time.0.nseconds(); + let local_time = time.1.nseconds(); - let mut inner = self.0.lock().unwrap(); - let ObservationsInner { - ref mut values, - ref mut values_tmp, - ref mut current_mapping, - ref mut next_mapping, - ref mut time_mapping_pending, - ref mut skip_count, - ref mut skip_period, - ref mut skip_period_update_in, - } = *inner; - - if values.is_empty() { - current_mapping.xbase = time.0.nseconds(); - current_mapping.b = time.1.nseconds(); - current_mapping.num = 1; - current_mapping.den = 1; - } - - if *skip_count == 0 { - *skip_count += 1; - if *skip_count >= *skip_period { - *skip_count = 0; - } - *skip_period_update_in -= 1; - if *skip_period_update_in == 0 { - *skip_period_update_in = WINDOW_LENGTH; - - // Start by first updating every frame, then every second frame, then every third - // frame, etc. until we update once every quarter second - let framerate = gst::ClockTime::SECOND - .checked_div(duration.unwrap_or(40 * gst::ClockTime::MSECOND).nseconds()) - .unwrap_or(25) as usize; - - if *skip_period < framerate / 4 + 1 { - *skip_period += 1; - } else { - *skip_period = framerate / 4 + 1; - } - } - - assert!(values.len() <= WINDOW_LENGTH); - - if values.len() == WINDOW_LENGTH { - values.remove(0); - } - values.push((time.0.nseconds(), time.1.nseconds())); - - if let Some((num, den, b, xbase, r_squared)) = - gst::calculate_linear_regression(values, Some(values_tmp)) - { - next_mapping.xbase = xbase; - next_mapping.b = b; - next_mapping.num = num; - next_mapping.den = den; - *time_mapping_pending = true; - gst_debug!( - CAT, - obj: element, - "Calculated new time mapping: GStreamer time = {} * (NDI time - {}) + {} ({})", - next_mapping.num as f64 / next_mapping.den as f64, - gst::ClockTime::from_nseconds(next_mapping.xbase), - gst::ClockTime::from_nseconds(next_mapping.b), - r_squared, - ); - } - } else { - *skip_count += 1; - if *skip_count >= *skip_period { - *skip_count = 0; - } - } - - if *time_mapping_pending { - let expected = gst::Clock::adjust_with_calibration( - time.0, - gst::ClockTime::from_nseconds(current_mapping.xbase), - gst::ClockTime::from_nseconds(current_mapping.b), - gst::ClockTime::from_nseconds(current_mapping.num), - gst::ClockTime::from_nseconds(current_mapping.den), - ); - let new_calculated = gst::Clock::adjust_with_calibration( - time.0, - gst::ClockTime::from_nseconds(next_mapping.xbase), - gst::ClockTime::from_nseconds(next_mapping.b), - gst::ClockTime::from_nseconds(next_mapping.num), - gst::ClockTime::from_nseconds(next_mapping.den), - ); - - let diff = if new_calculated > expected { - new_calculated - expected - } else { - expected - new_calculated - }; - - // Allow at most 5% frame duration or 2ms difference per frame - let max_diff = cmp::max( - (duration.map(|d| d / 10)).unwrap_or(2 * gst::ClockTime::MSECOND), - 2 * gst::ClockTime::MSECOND, - ); - - if diff > max_diff { - gst_debug!( - CAT, - obj: element, - "New time mapping causes difference {} but only {} allowed", - diff, - max_diff, - ); - - if new_calculated > expected { - current_mapping.b = (expected + max_diff).nseconds(); - current_mapping.xbase = time.0.nseconds(); - } else { - current_mapping.b = (expected - max_diff).nseconds(); - current_mapping.xbase = time.0.nseconds(); - } - } else { - *current_mapping = *next_mapping; - } - } - - let converted_timestamp = gst::Clock::adjust_with_calibration( - time.0, - gst::ClockTime::from_nseconds(current_mapping.xbase), - gst::ClockTime::from_nseconds(current_mapping.b), - gst::ClockTime::from_nseconds(current_mapping.num), - gst::ClockTime::from_nseconds(current_mapping.den), - ); - let converted_duration = - duration.and_then(|d| d.mul_div_floor(current_mapping.num, current_mapping.den)); - - gst_debug!( + gst_trace!( CAT, obj: element, - "Converted timestamp {}/{} to {}, duration {} to {}", - time.0, - time.1, - converted_timestamp.display(), - duration.display(), - converted_duration.display(), + "Local time {}, remote time {}", + gst::ClockTime::from_nseconds(local_time), + gst::ClockTime::from_nseconds(remote_time), ); - (converted_timestamp, converted_duration) + let mut inner = self.0.lock().unwrap(); + + let (base_remote_time, base_local_time) = + match (inner.base_remote_time, inner.base_local_time) { + (Some(remote), Some(local)) => (remote, local), + _ => { + gst_debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}", + gst::ClockTime::from_nseconds(local_time), + gst::ClockTime::from_nseconds(remote_time), + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return (gst::ClockTime::from_nseconds(local_time), duration); + } + }; + + let remote_diff = remote_time.saturating_sub(base_remote_time); + let local_diff = local_time.saturating_sub(base_local_time); + let delta = (local_diff as i64) - (remote_diff as i64); + + gst_trace!( + CAT, + obj: element, + "Local diff {}, remote diff {}, delta {}", + gst::ClockTime::from_nseconds(local_diff), + gst::ClockTime::from_nseconds(remote_diff), + delta, + ); + + if remote_diff > 0 && local_diff > 0 { + let slope = (local_diff as f64) / (remote_diff as f64); + if slope < 0.8 || slope > 1.2 { + gst_warning!( + CAT, + obj: element, + "Too small/big slope {}, resetting", + slope + ); + *inner = ObservationsInner::default(); + gst_debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}", + gst::ClockTime::from_nseconds(local_time), + gst::ClockTime::from_nseconds(remote_time), + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return (gst::ClockTime::from_nseconds(local_time), duration); + } + } + + if (delta > inner.skew && delta - inner.skew > 1_000_000_000) + || (delta < inner.skew && inner.skew - delta > 1_000_000_000) + { + gst_warning!( + CAT, + obj: element, + "Delta {} too far from skew {}, resetting", + delta, + inner.skew + ); + *inner = ObservationsInner::default(); + gst_debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}", + gst::ClockTime::from_nseconds(local_time), + gst::ClockTime::from_nseconds(remote_time), + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return (gst::ClockTime::from_nseconds(local_time), duration); + } + + if inner.filling { + if inner.deltas.is_empty() || delta < inner.min_delta { + inner.min_delta = delta; + } + inner.deltas.push_back(delta); + + if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH { + inner.window_size = inner.deltas.len(); + inner.skew = inner.min_delta; + inner.filling = false; + } else { + let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64; + let perc_window = (inner.deltas.len() as u64) + .mul_div_floor(100, WINDOW_LENGTH) + .unwrap() as i64; + let perc = cmp::max(perc_time, perc_window); + + inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000; + } + } else { + let old = inner.deltas.pop_front().unwrap(); + inner.deltas.push_back(delta); + + if delta <= inner.min_delta { + inner.min_delta = delta; + } else if old == inner.min_delta { + inner.min_delta = inner.deltas.iter().copied().min().unwrap(); + } + + inner.skew = (inner.min_delta + (124 * inner.skew)) / 125; + } + + let out_time = base_local_time + remote_diff; + let out_time = if inner.skew < 0 { + out_time.saturating_sub((-inner.skew) as u64) + } else { + out_time + (inner.skew as u64) + }; + + gst_trace!( + CAT, + obj: element, + "Skew {}, min delta {}", + inner.skew, + inner.min_delta + ); + gst_trace!( + CAT, + obj: element, + "Outputting {}", + gst::ClockTime::from_nseconds(out_time) + ); + + (gst::ClockTime::from_nseconds(out_time), duration) } } @@ -285,8 +285,8 @@ impl Default for TimeMapping { Self { xbase: 0, b: 0, - num: 1, - den: 1, + num: 0, + den: 0, } } } From 0911775142c5886d7bfb557facc0ce1e3a7dbba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 30 Sep 2021 13:33:18 +0300 Subject: [PATCH 4/5] Set the discont flag on the first audio/video buffer --- src/receiver.rs | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/receiver.rs b/src/receiver.rs index dd901edb..43cb7741 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -493,6 +493,8 @@ impl Receiver { } fn receive_thread(receiver: &Weak, recv: RecvInstance) { + let mut first_video_frame = true; + let mut first_audio_frame = true; let mut first_frame = true; let mut timer = time::Instant::now(); @@ -553,11 +555,31 @@ impl Receiver { } Ok(Some(Frame::Video(frame))) => { first_frame = false; - receiver.create_video_buffer_and_info(&element, frame) + let mut buffer = receiver.create_video_buffer_and_info(&element, frame); + if first_video_frame { + if let Ok(Buffer::Video(ref mut buffer, _)) = buffer { + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::DISCONT); + first_video_frame = false; + } + } + buffer } Ok(Some(Frame::Audio(frame))) => { first_frame = false; - receiver.create_audio_buffer_and_info(&element, frame) + let mut buffer = receiver.create_audio_buffer_and_info(&element, frame); + if first_audio_frame { + if let Ok(Buffer::Video(ref mut buffer, _)) = buffer { + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::DISCONT); + first_audio_frame = false; + } + } + buffer } Ok(Some(Frame::Metadata(frame))) => { if let Some(metadata) = frame.metadata() { From e642d6a4c1417a6a79b63f4830c83071458a1af1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 30 Sep 2021 13:37:30 +0300 Subject: [PATCH 5/5] Set the RESYNC flag on buffers after time tracking state was reset --- src/receiver.rs | 60 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 21 deletions(-) diff --git a/src/receiver.rs b/src/receiver.rs index 43cb7741..1cc413b4 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -125,9 +125,9 @@ impl Observations { element: &gst_base::BaseSrc, time: (Option, gst::ClockTime), duration: Option, - ) -> (gst::ClockTime, Option) { + ) -> (gst::ClockTime, Option, bool) { if time.0.is_none() { - return (time.1, duration); + return (time.1, duration, false); } let time = (time.0.unwrap(), time.1); @@ -158,7 +158,7 @@ impl Observations { inner.base_remote_time = Some(remote_time); inner.base_local_time = Some(local_time); - return (gst::ClockTime::from_nseconds(local_time), duration); + return (gst::ClockTime::from_nseconds(local_time), duration, true); } }; @@ -184,7 +184,10 @@ impl Observations { "Too small/big slope {}, resetting", slope ); + + let discont = !inner.deltas.is_empty(); *inner = ObservationsInner::default(); + gst_debug!( CAT, obj: element, @@ -195,7 +198,7 @@ impl Observations { inner.base_remote_time = Some(remote_time); inner.base_local_time = Some(local_time); - return (gst::ClockTime::from_nseconds(local_time), duration); + return (gst::ClockTime::from_nseconds(local_time), duration, discont); } } @@ -209,7 +212,10 @@ impl Observations { delta, inner.skew ); + + let discont = !inner.deltas.is_empty(); *inner = ObservationsInner::default(); + gst_debug!( CAT, obj: element, @@ -220,7 +226,7 @@ impl Observations { inner.base_remote_time = Some(remote_time); inner.base_local_time = Some(local_time); - return (gst::ClockTime::from_nseconds(local_time), duration); + return (gst::ClockTime::from_nseconds(local_time), duration, discont); } if inner.filling { @@ -276,7 +282,7 @@ impl Observations { gst::ClockTime::from_nseconds(out_time) ); - (gst::ClockTime::from_nseconds(out_time), duration) + (gst::ClockTime::from_nseconds(out_time), duration, false) } } @@ -645,7 +651,7 @@ impl Receiver { timestamp: i64, timecode: i64, duration: Option, - ) -> Option<(gst::ClockTime, Option)> { + ) -> Option<(gst::ClockTime, Option, bool)> { let receive_time = element.current_running_time()?; let real_time_now = gst::ClockTime::from_nseconds(glib::real_time() as u64 * 1000); @@ -667,7 +673,7 @@ impl Receiver { real_time_now, ); - let (pts, duration) = match self.0.timestamp_mode { + let (pts, duration, discont) = match self.0.timestamp_mode { TimestampMode::ReceiveTimeTimecode => { self.0 .observations @@ -678,24 +684,24 @@ impl Receiver { .observations .process(element, (timestamp, receive_time), duration) } - TimestampMode::Timecode => (timecode, duration), - TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration), + TimestampMode::Timecode => (timecode, duration, false), + TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration, false), TimestampMode::Timestamp => { // Timestamps are relative to the UNIX epoch let timestamp = timestamp?; if real_time_now > timestamp { let diff = real_time_now - timestamp; if diff > receive_time { - (gst::ClockTime::ZERO, duration) + (gst::ClockTime::ZERO, duration, false) } else { - (receive_time - diff, duration) + (receive_time - diff, duration, false) } } else { let diff = timestamp - real_time_now; - (receive_time + diff, duration) + (receive_time + diff, duration, false) } } - TimestampMode::ReceiveTime => (receive_time, duration), + TimestampMode::ReceiveTime => (receive_time, duration, false), }; gst_log!( @@ -706,7 +712,7 @@ impl Receiver { duration.display(), ); - Some((pts, duration)) + Some((pts, duration, discont)) } fn create_video_buffer_and_info( @@ -716,7 +722,7 @@ impl Receiver { ) -> Result { gst_debug!(CAT, obj: element, "Received video frame {:?}", video_frame); - let (pts, duration) = self + let (pts, duration, discont) = self .calculate_video_timestamp(element, &video_frame) .ok_or_else(|| { gst_debug!(CAT, obj: element, "Flushing, dropping buffer"); @@ -725,7 +731,13 @@ impl Receiver { let info = self.create_video_info(element, &video_frame)?; - let buffer = self.create_video_buffer(element, pts, duration, &info, &video_frame); + let mut buffer = self.create_video_buffer(element, pts, duration, &info, &video_frame); + if discont { + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::RESYNC); + } gst_log!(CAT, obj: element, "Produced video buffer {:?}", buffer); @@ -736,7 +748,7 @@ impl Receiver { &self, element: &gst_base::BaseSrc, video_frame: &VideoFrame, - ) -> Option<(gst::ClockTime, Option)> { + ) -> Option<(gst::ClockTime, Option, bool)> { let duration = gst::ClockTime::SECOND.mul_div_floor( video_frame.frame_rate().1 as u64, video_frame.frame_rate().0 as u64, @@ -1072,7 +1084,7 @@ impl Receiver { ) -> Result { gst_debug!(CAT, obj: element, "Received audio frame {:?}", audio_frame); - let (pts, duration) = self + let (pts, duration, discont) = self .calculate_audio_timestamp(element, &audio_frame) .ok_or_else(|| { gst_debug!(CAT, obj: element, "Flushing, dropping buffer"); @@ -1081,7 +1093,13 @@ impl Receiver { let info = self.create_audio_info(element, &audio_frame)?; - let buffer = self.create_audio_buffer(element, pts, duration, &info, &audio_frame); + let mut buffer = self.create_audio_buffer(element, pts, duration, &info, &audio_frame); + if discont { + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::RESYNC); + } gst_log!(CAT, obj: element, "Produced audio buffer {:?}", buffer); @@ -1092,7 +1110,7 @@ impl Receiver { &self, element: &gst_base::BaseSrc, audio_frame: &AudioFrame, - ) -> Option<(gst::ClockTime, Option)> { + ) -> Option<(gst::ClockTime, Option, bool)> { let duration = gst::ClockTime::SECOND.mul_div_floor( audio_frame.no_samples() as u64, audio_frame.sample_rate() as u64,