diff --git a/src/lib.rs b/src/lib.rs index 790c5bb1..ab2ff161 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,8 @@ pub enum TimestampMode { Timecode = 2, #[genum(name = "NDI Timestamp", nick = "timestamp")] Timestamp = 3, + #[genum(name = "Receive Time", nick = "receive-time")] + ReceiveTime = 4, } fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { diff --git a/src/ndisrc/imp.rs b/src/ndisrc/imp.rs index 4ab3e28d..6467596d 100644 --- a/src/ndisrc/imp.rs +++ b/src/ndisrc/imp.rs @@ -475,7 +475,10 @@ impl BaseSrcImpl for NdiSrc { let settings = self.settings.lock().unwrap(); if let Some(latency) = state.current_latency { - let min = if settings.timestamp_mode != TimestampMode::Timecode { + let min = if matches!( + settings.timestamp_mode, + TimestampMode::ReceiveTimeTimecode | TimestampMode::ReceiveTimeTimestamp + ) { latency } else { gst::ClockTime::ZERO diff --git a/src/receiver.rs b/src/receiver.rs index 9154fbc7..1cc413b4 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -1,6 +1,6 @@ use glib::prelude::*; use gst::prelude::*; -use gst::{gst_debug, gst_error, gst_log, gst_warning}; +use gst::{gst_debug, gst_error, gst_log, gst_trace, gst_warning}; use gst_video::prelude::*; use byte_slice_cast::AsMutSliceOf; @@ -74,29 +74,34 @@ struct ReceiverQueueInner { timeout: bool, } -// 100 frames observations window over which we calculate the timestamp drift -// between sender and receiver. A bigger window allows more smoothing out of -// network effects -const WINDOW_LENGTH: usize = 100; +const WINDOW_LENGTH: u64 = 512; +const WINDOW_DURATION: u64 = 2_000_000_000; + #[derive(Clone)] struct Observations(Arc>); -struct ObservationsInner { - // NDI timestamp - GStreamer clock time tuples - values: Vec<(u64, u64)>, - values_tmp: [(u64, u64); WINDOW_LENGTH], - current_mapping: TimeMapping, - next_mapping: TimeMapping, - time_mapping_pending: bool, - // How many frames we skipped since last observation - // we took - skip_count: usize, - // How many frames we skip in this period. once skip_count - // reaches this, we take another observation - skip_period: usize, - // How many observations are left until we update the skip_period - // again. This is always initialized to WINDOW_LENGTH - skip_period_update_in: usize, +struct ObservationsInner { + base_remote_time: Option, + base_local_time: Option, + deltas: VecDeque, + min_delta: i64, + skew: i64, + filling: bool, + window_size: usize, +} + +impl Default for ObservationsInner { + fn default() -> ObservationsInner { + ObservationsInner { + base_local_time: None, + base_remote_time: None, + deltas: VecDeque::new(), + min_delta: 0, + skew: 0, + filling: true, + window_size: 0, + } + } } #[derive(Clone, Copy, Debug)] @@ -109,174 +114,175 @@ struct TimeMapping { impl Observations { fn new() -> Self { - Self(Arc::new(Mutex::new(ObservationsInner { - values: Vec::with_capacity(WINDOW_LENGTH), - values_tmp: [(0, 0); WINDOW_LENGTH], - current_mapping: TimeMapping::default(), - next_mapping: TimeMapping::default(), - time_mapping_pending: false, - skip_count: 0, - skip_period: 1, - skip_period_update_in: WINDOW_LENGTH, - }))) + Self(Arc::new(Mutex::new(ObservationsInner::default()))) } + // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from + // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays": + // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 fn process( &self, element: &gst_base::BaseSrc, time: (Option, gst::ClockTime), duration: Option, - ) -> (gst::ClockTime, Option) { + ) -> (gst::ClockTime, Option, bool) { if time.0.is_none() { - return (time.1, duration); + return (time.1, duration, false); } let time = (time.0.unwrap(), time.1); + let remote_time = time.0.nseconds(); + let local_time = time.1.nseconds(); - let mut inner = self.0.lock().unwrap(); - let ObservationsInner { - ref mut values, - ref mut values_tmp, - ref mut current_mapping, - ref mut next_mapping, - ref mut time_mapping_pending, - ref mut skip_count, - ref mut skip_period, - ref mut skip_period_update_in, - } = *inner; - - if values.is_empty() { - current_mapping.xbase = time.0.nseconds(); - current_mapping.b = time.1.nseconds(); - current_mapping.num = 1; - current_mapping.den = 1; - } - - if *skip_count == 0 { - *skip_count += 1; - if *skip_count >= *skip_period { - *skip_count = 0; - } - *skip_period_update_in -= 1; - if *skip_period_update_in == 0 { - *skip_period_update_in = WINDOW_LENGTH; - - // Start by first updating every frame, then every second frame, then every third - // frame, etc. until we update once every quarter second - let framerate = gst::ClockTime::SECOND - .checked_div(duration.unwrap_or(40 * gst::ClockTime::MSECOND).nseconds()) - .unwrap_or(25) as usize; - - if *skip_period < framerate / 4 + 1 { - *skip_period += 1; - } else { - *skip_period = framerate / 4 + 1; - } - } - - assert!(values.len() <= WINDOW_LENGTH); - - if values.len() == WINDOW_LENGTH { - values.remove(0); - } - values.push((time.0.nseconds(), time.1.nseconds())); - - if let Some((num, den, b, xbase, r_squared)) = - gst::calculate_linear_regression(values, Some(values_tmp)) - { - next_mapping.xbase = xbase; - next_mapping.b = b; - next_mapping.num = num; - next_mapping.den = den; - *time_mapping_pending = true; - gst_debug!( - CAT, - obj: element, - "Calculated new time mapping: GStreamer time = {} * (NDI time - {}) + {} ({})", - next_mapping.num as f64 / next_mapping.den as f64, - gst::ClockTime::from_nseconds(next_mapping.xbase), - gst::ClockTime::from_nseconds(next_mapping.b), - r_squared, - ); - } - } else { - *skip_count += 1; - if *skip_count >= *skip_period { - *skip_count = 0; - } - } - - if *time_mapping_pending { - let expected = gst::Clock::adjust_with_calibration( - time.0, - gst::ClockTime::from_nseconds(current_mapping.xbase), - gst::ClockTime::from_nseconds(current_mapping.b), - gst::ClockTime::from_nseconds(current_mapping.num), - gst::ClockTime::from_nseconds(current_mapping.den), - ); - let new_calculated = gst::Clock::adjust_with_calibration( - time.0, - gst::ClockTime::from_nseconds(next_mapping.xbase), - gst::ClockTime::from_nseconds(next_mapping.b), - gst::ClockTime::from_nseconds(next_mapping.num), - gst::ClockTime::from_nseconds(next_mapping.den), - ); - - let diff = if new_calculated > expected { - new_calculated - expected - } else { - expected - new_calculated - }; - - // Allow at most 5% frame duration or 2ms difference per frame - let max_diff = cmp::max( - (duration.map(|d| d / 10)).unwrap_or(2 * gst::ClockTime::MSECOND), - 2 * gst::ClockTime::MSECOND, - ); - - if diff > max_diff { - gst_debug!( - CAT, - obj: element, - "New time mapping causes difference {} but only {} allowed", - diff, - max_diff, - ); - - if new_calculated > expected { - current_mapping.b = (expected + max_diff).nseconds(); - current_mapping.xbase = time.0.nseconds(); - } else { - current_mapping.b = (expected - max_diff).nseconds(); - current_mapping.xbase = time.0.nseconds(); - } - } else { - *current_mapping = *next_mapping; - } - } - - let converted_timestamp = gst::Clock::adjust_with_calibration( - time.0, - gst::ClockTime::from_nseconds(current_mapping.xbase), - gst::ClockTime::from_nseconds(current_mapping.b), - gst::ClockTime::from_nseconds(current_mapping.num), - gst::ClockTime::from_nseconds(current_mapping.den), - ); - let converted_duration = - duration.and_then(|d| d.mul_div_floor(current_mapping.num, current_mapping.den)); - - gst_debug!( + gst_trace!( CAT, obj: element, - "Converted timestamp {}/{} to {}, duration {} to {}", - time.0, - time.1, - converted_timestamp.display(), - duration.display(), - converted_duration.display(), + "Local time {}, remote time {}", + gst::ClockTime::from_nseconds(local_time), + gst::ClockTime::from_nseconds(remote_time), ); - (converted_timestamp, converted_duration) + let mut inner = self.0.lock().unwrap(); + + let (base_remote_time, base_local_time) = + match (inner.base_remote_time, inner.base_local_time) { + (Some(remote), Some(local)) => (remote, local), + _ => { + gst_debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}", + gst::ClockTime::from_nseconds(local_time), + gst::ClockTime::from_nseconds(remote_time), + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return (gst::ClockTime::from_nseconds(local_time), duration, true); + } + }; + + let remote_diff = remote_time.saturating_sub(base_remote_time); + let local_diff = local_time.saturating_sub(base_local_time); + let delta = (local_diff as i64) - (remote_diff as i64); + + gst_trace!( + CAT, + obj: element, + "Local diff {}, remote diff {}, delta {}", + gst::ClockTime::from_nseconds(local_diff), + gst::ClockTime::from_nseconds(remote_diff), + delta, + ); + + if remote_diff > 0 && local_diff > 0 { + let slope = (local_diff as f64) / (remote_diff as f64); + if slope < 0.8 || slope > 1.2 { + gst_warning!( + CAT, + obj: element, + "Too small/big slope {}, resetting", + slope + ); + + let discont = !inner.deltas.is_empty(); + *inner = ObservationsInner::default(); + + gst_debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}", + gst::ClockTime::from_nseconds(local_time), + gst::ClockTime::from_nseconds(remote_time), + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return (gst::ClockTime::from_nseconds(local_time), duration, discont); + } + } + + if (delta > inner.skew && delta - inner.skew > 1_000_000_000) + || (delta < inner.skew && inner.skew - delta > 1_000_000_000) + { + gst_warning!( + CAT, + obj: element, + "Delta {} too far from skew {}, resetting", + delta, + inner.skew + ); + + let discont = !inner.deltas.is_empty(); + *inner = ObservationsInner::default(); + + gst_debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}", + gst::ClockTime::from_nseconds(local_time), + gst::ClockTime::from_nseconds(remote_time), + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return (gst::ClockTime::from_nseconds(local_time), duration, discont); + } + + if inner.filling { + if inner.deltas.is_empty() || delta < inner.min_delta { + inner.min_delta = delta; + } + inner.deltas.push_back(delta); + + if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH { + inner.window_size = inner.deltas.len(); + inner.skew = inner.min_delta; + inner.filling = false; + } else { + let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64; + let perc_window = (inner.deltas.len() as u64) + .mul_div_floor(100, WINDOW_LENGTH) + .unwrap() as i64; + let perc = cmp::max(perc_time, perc_window); + + inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000; + } + } else { + let old = inner.deltas.pop_front().unwrap(); + inner.deltas.push_back(delta); + + if delta <= inner.min_delta { + inner.min_delta = delta; + } else if old == inner.min_delta { + inner.min_delta = inner.deltas.iter().copied().min().unwrap(); + } + + inner.skew = (inner.min_delta + (124 * inner.skew)) / 125; + } + + let out_time = base_local_time + remote_diff; + let out_time = if inner.skew < 0 { + out_time.saturating_sub((-inner.skew) as u64) + } else { + out_time + (inner.skew as u64) + }; + + gst_trace!( + CAT, + obj: element, + "Skew {}, min delta {}", + inner.skew, + inner.min_delta + ); + gst_trace!( + CAT, + obj: element, + "Outputting {}", + gst::ClockTime::from_nseconds(out_time) + ); + + (gst::ClockTime::from_nseconds(out_time), duration, false) } } @@ -285,8 +291,8 @@ impl Default for TimeMapping { Self { xbase: 0, b: 0, - num: 1, - den: 1, + num: 0, + den: 0, } } } @@ -493,6 +499,8 @@ impl Receiver { } fn receive_thread(receiver: &Weak, recv: RecvInstance) { + let mut first_video_frame = true; + let mut first_audio_frame = true; let mut first_frame = true; let mut timer = time::Instant::now(); @@ -553,11 +561,31 @@ impl Receiver { } Ok(Some(Frame::Video(frame))) => { first_frame = false; - receiver.create_video_buffer_and_info(&element, frame) + let mut buffer = receiver.create_video_buffer_and_info(&element, frame); + if first_video_frame { + if let Ok(Buffer::Video(ref mut buffer, _)) = buffer { + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::DISCONT); + first_video_frame = false; + } + } + buffer } Ok(Some(Frame::Audio(frame))) => { first_frame = false; - receiver.create_audio_buffer_and_info(&element, frame) + let mut buffer = receiver.create_audio_buffer_and_info(&element, frame); + if first_audio_frame { + if let Ok(Buffer::Video(ref mut buffer, _)) = buffer { + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::DISCONT); + first_audio_frame = false; + } + } + buffer } Ok(Some(Frame::Metadata(frame))) => { if let Some(metadata) = frame.metadata() { @@ -623,14 +651,8 @@ impl Receiver { timestamp: i64, timecode: i64, duration: Option, - ) -> Option<(gst::ClockTime, Option)> { - let clock = element.clock()?; - - // For now take the current running time as PTS. At a later time we - // will want to work with the timestamp given by the NDI SDK if available - let now = clock.time()?; - let base_time = element.base_time()?; - let receive_time = now - base_time; + ) -> Option<(gst::ClockTime, Option, bool)> { + let receive_time = element.current_running_time()?; let real_time_now = gst::ClockTime::from_nseconds(glib::real_time() as u64 * 1000); let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined { @@ -651,7 +673,7 @@ impl Receiver { real_time_now, ); - let (pts, duration) = match self.0.timestamp_mode { + let (pts, duration, discont) = match self.0.timestamp_mode { TimestampMode::ReceiveTimeTimecode => { self.0 .observations @@ -662,23 +684,24 @@ impl Receiver { .observations .process(element, (timestamp, receive_time), duration) } - TimestampMode::Timecode => (timecode, duration), - TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration), + TimestampMode::Timecode => (timecode, duration, false), + TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration, false), TimestampMode::Timestamp => { // Timestamps are relative to the UNIX epoch let timestamp = timestamp?; if real_time_now > timestamp { let diff = real_time_now - timestamp; if diff > receive_time { - (gst::ClockTime::ZERO, duration) + (gst::ClockTime::ZERO, duration, false) } else { - (receive_time - diff, duration) + (receive_time - diff, duration, false) } } else { let diff = timestamp - real_time_now; - (receive_time + diff, duration) + (receive_time + diff, duration, false) } } + TimestampMode::ReceiveTime => (receive_time, duration, false), }; gst_log!( @@ -689,7 +712,7 @@ impl Receiver { duration.display(), ); - Some((pts, duration)) + Some((pts, duration, discont)) } fn create_video_buffer_and_info( @@ -699,7 +722,7 @@ impl Receiver { ) -> Result { gst_debug!(CAT, obj: element, "Received video frame {:?}", video_frame); - let (pts, duration) = self + let (pts, duration, discont) = self .calculate_video_timestamp(element, &video_frame) .ok_or_else(|| { gst_debug!(CAT, obj: element, "Flushing, dropping buffer"); @@ -708,7 +731,13 @@ impl Receiver { let info = self.create_video_info(element, &video_frame)?; - let buffer = self.create_video_buffer(element, pts, duration, &info, &video_frame); + let mut buffer = self.create_video_buffer(element, pts, duration, &info, &video_frame); + if discont { + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::RESYNC); + } gst_log!(CAT, obj: element, "Produced video buffer {:?}", buffer); @@ -719,7 +748,7 @@ impl Receiver { &self, element: &gst_base::BaseSrc, video_frame: &VideoFrame, - ) -> Option<(gst::ClockTime, Option)> { + ) -> Option<(gst::ClockTime, Option, bool)> { let duration = gst::ClockTime::SECOND.mul_div_floor( video_frame.frame_rate().1 as u64, video_frame.frame_rate().0 as u64, @@ -1055,7 +1084,7 @@ impl Receiver { ) -> Result { gst_debug!(CAT, obj: element, "Received audio frame {:?}", audio_frame); - let (pts, duration) = self + let (pts, duration, discont) = self .calculate_audio_timestamp(element, &audio_frame) .ok_or_else(|| { gst_debug!(CAT, obj: element, "Flushing, dropping buffer"); @@ -1064,7 +1093,13 @@ impl Receiver { let info = self.create_audio_info(element, &audio_frame)?; - let buffer = self.create_audio_buffer(element, pts, duration, &info, &audio_frame); + let mut buffer = self.create_audio_buffer(element, pts, duration, &info, &audio_frame); + if discont { + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::RESYNC); + } gst_log!(CAT, obj: element, "Produced audio buffer {:?}", buffer); @@ -1075,7 +1110,7 @@ impl Receiver { &self, element: &gst_base::BaseSrc, audio_frame: &AudioFrame, - ) -> Option<(gst::ClockTime, Option)> { + ) -> Option<(gst::ClockTime, Option, bool)> { let duration = gst::ClockTime::SECOND.mul_div_floor( audio_frame.no_samples() as u64, audio_frame.sample_rate() as u64,