From 33370e42adc715b8d20361198d6f49633cd3c976 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 19 Jul 2019 00:05:31 +0300 Subject: [PATCH] Collect observations for the timestamp/receive time mappings and smoothen them This allows keeping audio/video more in sync with how the sender was sending it, while also handling network jitter and clock drift in a reasonable way. --- Cargo.toml | 3 +- src/lib.rs | 1 + src/receiver.rs | 408 ++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 361 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d81e64e2..44b5b993 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,11 @@ description = "NewTek NDI Plugin" [dependencies] glib = { version = "0.8.0", features = ["subclassing"] } gobject-sys = "0.9" -gstreamer = { version = "0.14.3", features = ["subclassing"] } +gstreamer = { version = "0.14.3", features = ["subclassing", "v1_12"] } gstreamer-base = { version = "0.14.0", features = ["subclassing"] } gstreamer-audio = "0.14.0" gstreamer-video = "0.14.3" +gstreamer-sys = "0.8" lazy_static = "1.1.0" byte-slice-cast = "0.2.0" diff --git a/src/lib.rs b/src/lib.rs index a1e2d0e3..b3d8190d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ use glib::subclass::prelude::*; extern crate gstreamer as gst; extern crate gstreamer_audio as gst_audio; extern crate gstreamer_base as gst_base; +extern crate gstreamer_sys as gst_sys; extern crate gstreamer_video as gst_video; #[macro_use] diff --git a/src/receiver.rs b/src/receiver.rs index 8981f886..e9b98d7f 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -7,6 +7,7 @@ use gst_video::prelude::*; use byte_slice_cast::AsMutSliceOf; +use std::cmp; use std::collections::VecDeque; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex, Weak}; @@ -21,6 +22,7 @@ enum ReceiverInfo { ip_address: Option, video: Option>, audio: Option>, + observations: Observations, }, Connected { id: usize, @@ -29,6 +31,7 @@ enum ReceiverInfo { recv: RecvInstance, video: Option>, audio: Option>, + observations: Observations, }, } @@ -63,6 +66,8 @@ struct ReceiverInner { recv: Mutex>, recv_cond: Condvar, + observations: Observations, + cat: gst::DebugCategory, element: glib::WeakRef, timestamp_mode: TimestampMode, @@ -95,6 +100,226 @@ struct ReceiverQueueInner { timeout: bool, } +// 100 frames observations window over which we calculate the timestamp drift +// between sender and receiver. A bigger window allows more smoothing out of +// network effects +const WINDOW_LENGTH: usize = 100; +#[derive(Clone)] +struct Observations(Arc>); +struct ObservationsInner { + // NDI timestamp - GStreamer clock time tuples + values: Vec<(u64, u64)>, + values_tmp: [(u64, u64); WINDOW_LENGTH], + current_mapping: TimeMapping, + next_mapping: TimeMapping, + time_mapping_pending: bool, + + // How many frames we skipped since last observation + // we took + skip_count: usize, + // How many frames we skip in this period. once skip_count + // reaches this, we take another observation + skip_period: usize, + // How many observations are left until we update the skip_period + // again. This is always initialized to WINDOW_LENGTH + skip_period_update_in: usize, +} + +#[derive(Clone, Copy, Debug)] +struct TimeMapping { + xbase: u64, + b: u64, + num: u64, + den: u64, +} + +impl Observations { + fn new() -> Self { + Self(Arc::new(Mutex::new(ObservationsInner { + values: Vec::with_capacity(WINDOW_LENGTH), + values_tmp: [(0, 0); WINDOW_LENGTH], + current_mapping: TimeMapping::default(), + next_mapping: TimeMapping::default(), + time_mapping_pending: false, + skip_count: 0, + skip_period: 1, + skip_period_update_in: WINDOW_LENGTH, + }))) + } + + fn process( + &self, + cat: gst::DebugCategory, + element: &gst_base::BaseSrc, + time: (gst::ClockTime, gst::ClockTime), + duration: gst::ClockTime, + ) -> (gst::ClockTime, gst::ClockTime) { + assert!(time.1.is_some()); + if time.0.is_none() { + return (time.1, duration); + } + + let time = (time.0.unwrap(), time.1.unwrap()); + + let mut inner = self.0.lock().unwrap(); + let ObservationsInner { + ref mut values, + ref mut values_tmp, + ref mut current_mapping, + ref mut next_mapping, + ref mut time_mapping_pending, + ref mut skip_count, + ref mut skip_period, + ref mut skip_period_update_in, + } = *inner; + + if values.is_empty() { + current_mapping.xbase = time.0; + current_mapping.b = time.1; + current_mapping.num = 1; + current_mapping.den = 1; + } + + if *skip_count == 0 { + *skip_count += 1; + if *skip_count >= *skip_period { + *skip_count = 0; + } + *skip_period_update_in -= 1; + if *skip_period_update_in == 0 { + *skip_period_update_in = WINDOW_LENGTH; + + // Start by first updating every frame, then every second frame, then every third + // frame, etc. until we update once every quarter second + let framerate = (gst::SECOND / duration).unwrap_or(25) as usize; + + if *skip_period < framerate / 4 + 1 { + *skip_period += 1; + } else { + *skip_period = framerate / 4 + 1; + } + } + + assert!(values.len() <= WINDOW_LENGTH); + + if values.len() == WINDOW_LENGTH { + values.remove(0); + } + values.push(time); + + if let Some((num, den, b, xbase, r_squared)) = + calculate_linear_regression(values, Some(values_tmp)) + { + next_mapping.xbase = xbase; + next_mapping.b = b; + next_mapping.num = num; + next_mapping.den = den; + *time_mapping_pending = true; + gst_debug!( + cat, + obj: element, + "Calculated new time mapping: GStreamer time = {} * (NDI time - {}) + {} ({})", + next_mapping.num as f64 / next_mapping.den as f64, + gst::ClockTime::from(next_mapping.xbase), + gst::ClockTime::from(next_mapping.b), + r_squared, + ); + } + } else { + *skip_count += 1; + if *skip_count >= *skip_period { + *skip_count = 0; + } + } + + if *time_mapping_pending { + let expected = gst::Clock::adjust_with_calibration( + time.0.into(), + current_mapping.xbase.into(), + current_mapping.b.into(), + current_mapping.num.into(), + current_mapping.den.into(), + ) + .unwrap(); + let new_calculated = gst::Clock::adjust_with_calibration( + time.0.into(), + next_mapping.xbase.into(), + next_mapping.b.into(), + next_mapping.num.into(), + next_mapping.den.into(), + ) + .unwrap(); + + let diff = if new_calculated > expected { + new_calculated - expected + } else { + expected - new_calculated + }; + + // Allow at most 5% frame duration or 2ms difference per frame + let max_diff = cmp::max( + (duration / 10).unwrap_or(2 * gst::MSECOND_VAL), + 2 * gst::MSECOND_VAL, + ); + + if diff > max_diff { + gst_debug!( + cat, + obj: element, + "New time mapping causes difference {} but only {} allowed", + gst::ClockTime::from(diff), + gst::ClockTime::from(max_diff), + ); + + if new_calculated > expected { + current_mapping.b = expected + max_diff; + current_mapping.xbase = time.0; + } else { + current_mapping.b = expected - max_diff; + current_mapping.xbase = time.0; + } + } else { + *current_mapping = *next_mapping; + } + } + + let converted_timestamp = gst::Clock::adjust_with_calibration( + time.0.into(), + current_mapping.xbase.into(), + current_mapping.b.into(), + current_mapping.num.into(), + current_mapping.den.into(), + ); + let converted_duration = duration + .mul_div_floor(current_mapping.num, current_mapping.den) + .unwrap_or(gst::CLOCK_TIME_NONE); + + gst_debug!( + cat, + obj: element, + "Converted timestamp {}/{} to {}, duration {} to {}", + gst::ClockTime::from(time.0), + gst::ClockTime::from(time.1), + converted_timestamp, + duration, + converted_duration, + ); + + (converted_timestamp, converted_duration) + } +} + +impl Default for TimeMapping { + fn default() -> Self { + Self { + xbase: 0, + b: 0, + num: 1, + den: 1, + } + } +} + #[derive(Clone)] pub struct ReceiverControlHandle { queue: ReceiverQueue, @@ -128,29 +353,37 @@ impl Receiver { element: &gst_base::BaseSrc, cat: gst::DebugCategory, ) -> Self { - let (id, storage, recv) = if video { + let (id, storage, recv, observations) = if video { match info { ReceiverInfo::Connecting { - id, ref mut video, .. - } => (*id, video, None), + id, + ref mut video, + ref observations, + .. + } => (*id, video, None, observations), ReceiverInfo::Connected { id, ref mut video, ref mut recv, + ref observations, .. - } => (*id, video, Some(recv.clone())), + } => (*id, video, Some(recv.clone()), observations), } } else { match info { ReceiverInfo::Connecting { - id, ref mut audio, .. - } => (*id, audio, None), + id, + ref mut audio, + ref observations, + .. + } => (*id, audio, None, observations), ReceiverInfo::Connected { id, ref mut audio, ref mut recv, + ref observations, .. - } => (*id, audio, Some(recv.clone())), + } => (*id, audio, Some(recv.clone()), observations), } }; assert!(storage.is_none()); @@ -171,6 +404,7 @@ impl Receiver { video, recv: Mutex::new(recv), recv_cond: Condvar::new(), + observations: observations.clone(), cat, element: element.downgrade(), timestamp_mode, @@ -377,6 +611,7 @@ pub fn connect_ndi( ip_address: ip_address.map(String::from), video: None, audio: None, + observations: Observations::new(), }; let receiver = Receiver::new(&mut info, video, timestamp_mode, timeout, element, cat); @@ -564,12 +799,13 @@ fn connect_ndi_async( Some(val) => val, }; - let (audio, video) = match info { + let (audio, video, observations) = match info { ReceiverInfo::Connecting { ref audio, ref video, + ref observations, .. - } => (audio.clone(), video.clone()), + } => (audio.clone(), video.clone(), observations), ReceiverInfo::Connected { .. } => unreachable!(), }; @@ -582,6 +818,7 @@ fn connect_ndi_async( recv: recv.clone(), video: video.clone(), audio: audio.clone(), + observations: observations.clone(), }; gst_debug!(cat, obj: element, "Started NDI connection"); @@ -796,6 +1033,8 @@ impl Receiver { break video_frame; }; + let (pts, duration) = self.calculate_video_timestamp(element, &video_frame); + // Simply read all video frames while flushing but don't copy them or anything to // make sure that we're not accumulating anything here if !playing || flushing { @@ -803,10 +1042,9 @@ impl Receiver { return Err(gst::FlowError::CustomError); } - let pts = self.calculate_video_timestamp(element, &video_frame); let info = self.create_video_info(element, &video_frame)?; - let buffer = self.create_video_buffer(element, pts, &info, &video_frame)?; + let buffer = self.create_video_buffer(element, pts, duration, &info, &video_frame)?; gst_log!(self.0.cat, obj: element, "Produced buffer {:?}", buffer); @@ -817,7 +1055,7 @@ impl Receiver { &self, element: &gst_base::BaseSrc, video_frame: &VideoFrame, - ) -> gst::ClockTime { + ) -> (gst::ClockTime, gst::ClockTime) { let clock = element.get_clock().unwrap(); // For now take the current running time as PTS. At a later time we @@ -834,33 +1072,46 @@ impl Receiver { }; let timecode = gst::ClockTime::from(video_frame.timecode() as u64 * 100); + let duration = gst::SECOND + .mul_div_floor( + video_frame.frame_rate().1 as u64, + video_frame.frame_rate().0 as u64, + ) + .unwrap_or(gst::CLOCK_TIME_NONE); + gst_log!( self.0.cat, obj: element, - "NDI video frame received: {:?} with timecode {} and timestamp {}, receive time {}, local time now {}", + "NDI video frame received: {:?} with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", video_frame, timecode, timestamp, + duration, receive_time, real_time_now, ); - let pts = match self.0.timestamp_mode { - TimestampMode::ReceiveTime => receive_time, - TimestampMode::Timecode => timecode, - TimestampMode::Timestamp if timestamp.is_none() => receive_time, + let (pts, duration) = match self.0.timestamp_mode { + TimestampMode::ReceiveTime => self.0.observations.process( + self.0.cat, + element, + (timestamp, receive_time), + duration, + ), + TimestampMode::Timecode => (timecode, duration), + TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration), TimestampMode::Timestamp => { // Timestamps are relative to the UNIX epoch if real_time_now > timestamp { let diff = real_time_now - timestamp; if diff > receive_time { - 0.into() + (0.into(), duration) } else { - receive_time - diff + (receive_time - diff, duration) } } else { let diff = timestamp - real_time_now; - receive_time + diff + (receive_time + diff, duration) } } }; @@ -868,11 +1119,12 @@ impl Receiver { gst_log!( self.0.cat, obj: element, - "Calculated pts for video frame: {:?}", - pts + "Calculated PTS for video frame {}, duration {}", + pts, + duration, ); - pts + (pts, duration) } fn create_video_info( @@ -915,7 +1167,6 @@ impl Receiver { _ => gst_video::VideoInterlaceMode::Alternate, }); - /* Requires GStreamer 1.12 at least */ if video_frame.frame_format_type() == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved { @@ -956,6 +1207,13 @@ impl Receiver { gst_video::VideoInterlaceMode::Interleaved }, ); + + if video_frame.frame_format_type() + == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved + { + builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst); + } + Ok(builder.build().unwrap()); } } @@ -964,17 +1222,12 @@ impl Receiver { &self, element: &gst_base::BaseSrc, pts: gst::ClockTime, + duration: gst::ClockTime, info: &gst_video::VideoInfo, video_frame: &VideoFrame, ) -> Result { let mut buffer = gst::Buffer::with_size(info.size()).unwrap(); { - let duration = gst::SECOND - .mul_div_floor( - video_frame.frame_rate().1 as u64, - video_frame.frame_rate().0 as u64, - ) - .unwrap_or(gst::CLOCK_TIME_NONE); let buffer = buffer.get_mut().unwrap(); buffer.set_pts(pts); buffer.set_duration(duration); @@ -1211,6 +1464,8 @@ impl Receiver { break audio_frame; }; + let (pts, duration) = self.calculate_audio_timestamp(element, &audio_frame); + // Simply read all video frames while flushing but don't copy them or anything to // make sure that we're not accumulating anything here if !playing || flushing { @@ -1218,10 +1473,9 @@ impl Receiver { return Err(gst::FlowError::CustomError); } - let pts = self.calculate_audio_timestamp(element, &audio_frame); let info = self.create_audio_info(element, &audio_frame)?; - let buffer = self.create_audio_buffer(element, pts, &info, &audio_frame)?; + let buffer = self.create_audio_buffer(element, pts, duration, &info, &audio_frame)?; gst_log!(self.0.cat, obj: element, "Produced buffer {:?}", buffer); @@ -1232,7 +1486,7 @@ impl Receiver { &self, element: &gst_base::BaseSrc, audio_frame: &AudioFrame, - ) -> gst::ClockTime { + ) -> (gst::ClockTime, gst::ClockTime) { let clock = element.get_clock().unwrap(); // For now take the current running time as PTS. At a later time we @@ -1249,33 +1503,46 @@ impl Receiver { }; let timecode = gst::ClockTime::from(audio_frame.timecode() as u64 * 100); + let duration = gst::SECOND + .mul_div_floor( + audio_frame.no_samples() as u64, + audio_frame.sample_rate() as u64, + ) + .unwrap_or(gst::CLOCK_TIME_NONE); + gst_log!( self.0.cat, obj: element, - "NDI audio frame received: {:?} with timecode {} and timestamp {}, receive time {}, local time now {}", + "NDI audio frame received: {:?} with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", audio_frame, timecode, timestamp, + duration, receive_time, real_time_now, ); - let pts = match self.0.timestamp_mode { - TimestampMode::ReceiveTime => receive_time, - TimestampMode::Timecode => timecode, - TimestampMode::Timestamp if timestamp.is_none() => receive_time, + let (pts, duration) = match self.0.timestamp_mode { + TimestampMode::ReceiveTime => self.0.observations.process( + self.0.cat, + element, + (timestamp, receive_time), + duration, + ), + TimestampMode::Timecode => (timecode, duration), + TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration), TimestampMode::Timestamp => { // Timestamps are relative to the UNIX epoch if real_time_now > timestamp { let diff = real_time_now - timestamp; if diff > receive_time { - 0.into() + (0.into(), duration) } else { - receive_time - diff + (receive_time - diff, duration) } } else { let diff = timestamp - real_time_now; - receive_time + diff + (receive_time + diff, duration) } } }; @@ -1283,11 +1550,12 @@ impl Receiver { gst_log!( self.0.cat, obj: element, - "Calculated pts for audio frame: {:?}", - pts + "Calculated PTS for audio frame {}, duration {}", + pts, + duration, ); - pts + (pts, duration) } fn create_audio_info( @@ -1308,6 +1576,7 @@ impl Receiver { &self, _element: &gst_base::BaseSrc, pts: gst::ClockTime, + duration: gst::ClockTime, info: &gst_audio::AudioInfo, audio_frame: &AudioFrame, ) -> Result { @@ -1315,12 +1584,6 @@ impl Receiver { let buff_size = (audio_frame.no_samples() as u32 * info.bpf()) as usize; let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); { - let duration = gst::SECOND - .mul_div_floor( - audio_frame.no_samples() as u64, - audio_frame.sample_rate() as u64, - ) - .unwrap_or(gst::CLOCK_TIME_NONE); let buffer = buffer.get_mut().unwrap(); buffer.set_pts(pts); @@ -1356,3 +1619,48 @@ impl Receiver { Ok(buffer) } } + +// FIXME: Requires https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/307 +pub fn calculate_linear_regression( + xy: &[(u64, u64)], + temp: Option<&mut [(u64, u64)]>, +) -> Option<(u64, u64, u64, u64, f64)> { + unsafe { + use glib::translate::from_glib; + use std::mem; + use std::ptr; + + assert_eq!(mem::size_of::() * 2, mem::size_of::<(u64, u64)>()); + assert_eq!(mem::align_of::(), mem::align_of::<(u64, u64)>()); + assert!(temp.as_ref().map(|temp| temp.len()).unwrap_or(xy.len()) >= xy.len()); + + let mut m_num = mem::MaybeUninit::uninit(); + let mut m_denom = mem::MaybeUninit::uninit(); + let mut b = mem::MaybeUninit::uninit(); + let mut xbase = mem::MaybeUninit::uninit(); + let mut r_squared = mem::MaybeUninit::uninit(); + + let res = from_glib(gst_sys::gst_calculate_linear_regression( + xy.as_ptr() as *const u64, + temp.map(|temp| temp.as_mut_ptr() as *mut u64) + .unwrap_or(ptr::null_mut()), + xy.len() as u32, + m_num.as_mut_ptr(), + m_denom.as_mut_ptr(), + b.as_mut_ptr(), + xbase.as_mut_ptr(), + r_squared.as_mut_ptr(), + )); + if res { + Some(( + m_num.assume_init(), + m_denom.assume_init(), + b.assume_init(), + xbase.assume_init(), + r_squared.assume_init(), + )) + } else { + None + } + } +}