diff --git a/net/ndi/src/ndisrc/imp.rs b/net/ndi/src/ndisrc/imp.rs index 14ef8015..45a25591 100644 --- a/net/ndi/src/ndisrc/imp.rs +++ b/net/ndi/src/ndisrc/imp.rs @@ -1,11 +1,14 @@ // SPDX-License-Identifier: MPL-2.0 +use atomic_refcell::AtomicRefCell; use gst::prelude::*; use gst::subclass::prelude::*; use gst_base::prelude::*; use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; +use std::cmp; +use std::collections::VecDeque; use std::sync::Mutex; use once_cell::sync::Lazy; @@ -66,7 +69,10 @@ impl Default for Settings { #[derive(Default)] struct State { receiver: Option, + // Audio/video/metadata time observations timestamp_mode: TimestampMode, + observations_timestamp: [Observations; 3], + observations_timecode: [Observations; 3], current_latency: Option, } @@ -536,7 +542,58 @@ impl BaseSrcImpl for NdiSrc { let mut gst_buffer = gst::Buffer::new(); { let buffer_ref = gst_buffer.get_mut().unwrap(); - NdiSrcMeta::add(buffer_ref, ndi_buffer, state.timestamp_mode); + let ((pts, duration, resync), discont) = match ndi_buffer { + Buffer::Audio { + ref frame, + discont, + receive_time_gst, + receive_time_real, + } => ( + self.calculate_audio_timestamp( + &mut state, + receive_time_gst, + receive_time_real, + frame, + ), + discont, + ), + Buffer::Video { + ref frame, + discont, + receive_time_gst, + receive_time_real, + } => ( + self.calculate_video_timestamp( + &mut state, + receive_time_gst, + receive_time_real, + frame, + ), + discont, + ), + Buffer::Metadata { + ref frame, + receive_time_gst, + receive_time_real, + } => ( + self.calculate_metadata_timestamp( + &mut state, + receive_time_gst, + receive_time_real, + frame, + ), + false, + ), + }; + buffer_ref.set_pts(pts); + buffer_ref.set_duration(duration); + if resync { + buffer_ref.set_flags(gst::BufferFlags::RESYNC); + } + if discont { + buffer_ref.set_flags(gst::BufferFlags::DISCONT); + } + NdiSrcMeta::add(buffer_ref, ndi_buffer); } drop(state); @@ -555,3 +612,465 @@ impl BaseSrcImpl for NdiSrc { } } } + +impl NdiSrc { + #[allow(clippy::too_many_arguments)] + fn calculate_timestamp( + &self, + state: &mut State, + idx: usize, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + timestamp: i64, + timecode: i64, + duration: Option, + ) -> (gst::ClockTime, Option, bool) { + let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined { + gst::ClockTime::NONE + } else { + Some((timestamp as u64 * 100).nseconds()) + }; + let timecode = (timecode as u64 * 100).nseconds(); + + gst::log!( + CAT, + imp = self, + "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", + timecode, + timestamp.display(), + duration.display(), + receive_time_gst.display(), + receive_time_real, + ); + + let res_timestamp = state.observations_timestamp[idx].process( + self.obj().upcast_ref(), + timestamp, + receive_time_gst, + duration, + ); + + let res_timecode = state.observations_timecode[idx].process( + self.obj().upcast_ref(), + Some(timecode), + receive_time_gst, + duration, + ); + + let (pts, duration, discont) = match state.timestamp_mode { + TimestampMode::ReceiveTimeTimecode => match res_timecode { + Some((pts, duration, discont)) => (pts, duration, discont), + None => { + gst::warning!(CAT, imp = self, "Can't calculate timestamp"); + (receive_time_gst, duration, false) + } + }, + TimestampMode::ReceiveTimeTimestamp => match res_timestamp { + Some((pts, duration, discont)) => (pts, duration, discont), + None => { + if timestamp.is_some() { + gst::warning!(CAT, imp = self, "Can't calculate timestamp"); + } + + (receive_time_gst, duration, false) + } + }, + TimestampMode::Timecode => (timecode, duration, false), + TimestampMode::Timestamp if timestamp.is_none() => (receive_time_gst, duration, false), + TimestampMode::Timestamp => { + // Timestamps are relative to the UNIX epoch + let timestamp = timestamp.unwrap(); + if receive_time_real > timestamp { + let diff = receive_time_real - timestamp; + if diff > receive_time_gst { + (gst::ClockTime::ZERO, duration, false) + } else { + (receive_time_gst - diff, duration, false) + } + } else { + let diff = timestamp - receive_time_real; + (receive_time_gst + diff, duration, false) + } + } + TimestampMode::ReceiveTime => (receive_time_gst, duration, false), + TimestampMode::Auto => { + res_timecode + .or(res_timestamp) + .unwrap_or((receive_time_gst, duration, false)) + } + }; + + gst::log!( + CAT, + imp = self, + "Calculated PTS {}, duration {}", + pts.display(), + duration.display(), + ); + + (pts, duration, discont) + } + + fn calculate_video_timestamp( + &self, + state: &mut State, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + video_frame: &crate::ndi::VideoFrame, + ) -> (gst::ClockTime, Option, bool) { + let duration = gst::ClockTime::SECOND.mul_div_floor( + video_frame.frame_rate().1 as u64, + video_frame.frame_rate().0 as u64, + ); + + self.calculate_timestamp( + state, + 0, + receive_time_gst, + receive_time_real, + video_frame.timestamp(), + video_frame.timecode(), + duration, + ) + } + + fn calculate_audio_timestamp( + &self, + state: &mut State, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + audio_frame: &crate::ndi::AudioFrame, + ) -> (gst::ClockTime, Option, bool) { + let duration = gst::ClockTime::SECOND.mul_div_floor( + audio_frame.no_samples() as u64, + audio_frame.sample_rate() as u64, + ); + + self.calculate_timestamp( + state, + 1, + receive_time_gst, + receive_time_real, + audio_frame.timestamp(), + audio_frame.timecode(), + duration, + ) + } + + fn calculate_metadata_timestamp( + &self, + state: &mut State, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + metadata_frame: &crate::ndi::MetadataFrame, + ) -> (gst::ClockTime, Option, bool) { + self.calculate_timestamp( + state, + 2, + receive_time_gst, + receive_time_real, + ndisys::NDIlib_recv_timestamp_undefined, + metadata_frame.timecode(), + gst::ClockTime::NONE, + ) + } +} + +const PREFILL_WINDOW_LENGTH: usize = 12; +const WINDOW_LENGTH: u64 = 512; +const WINDOW_DURATION: u64 = 2_000_000_000; + +#[derive(Default)] +struct Observations(AtomicRefCell); + +struct ObservationsInner { + base_remote_time: Option, + base_local_time: Option, + deltas: VecDeque, + min_delta: i64, + skew: i64, + filling: bool, + window_size: usize, + + // Remote/local times for workaround around fundamentally wrong slopes + // This is not reset below and has a bigger window. + times: VecDeque<(u64, u64)>, + slope_correction: (u64, u64), +} + +impl Default for ObservationsInner { + fn default() -> ObservationsInner { + ObservationsInner { + base_local_time: None, + base_remote_time: None, + deltas: VecDeque::new(), + min_delta: 0, + skew: 0, + filling: true, + window_size: 0, + times: VecDeque::new(), + slope_correction: (1, 1), + } + } +} + +impl ObservationsInner { + fn reset(&mut self) { + self.base_local_time = None; + self.base_remote_time = None; + self.deltas = VecDeque::new(); + self.min_delta = 0; + self.skew = 0; + self.filling = true; + self.window_size = 0; + } +} + +impl Observations { + // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from + // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays": + // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 + fn process( + &self, + element: &gst::Element, + remote_time: Option, + local_time: gst::ClockTime, + duration: Option, + ) -> Option<(gst::ClockTime, Option, bool)> { + let remote_time = remote_time?.nseconds(); + let local_time = local_time.nseconds(); + + let mut inner = self.0.borrow_mut(); + + gst::trace!( + CAT, + obj = element, + "Local time {}, remote time {}, slope correct {}/{}", + local_time.nseconds(), + remote_time.nseconds(), + inner.slope_correction.0, + inner.slope_correction.1, + ); + + inner.times.push_back((remote_time, local_time)); + while inner + .times + .back() + .unwrap() + .1 + .saturating_sub(inner.times.front().unwrap().1) + > WINDOW_DURATION + { + let _ = inner.times.pop_front(); + } + + // Static remote times + if inner.slope_correction.1 == 0 { + return None; + } + + let remote_time = + remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; + + let (base_remote_time, base_local_time) = + match (inner.base_remote_time, inner.base_local_time) { + (Some(remote), Some(local)) => (remote, local), + _ => { + gst::debug!( + CAT, + obj = element, + "Initializing base time: local {}, remote {}", + local_time.nseconds(), + remote_time.nseconds(), + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return Some((local_time.nseconds(), duration, true)); + } + }; + + if inner.times.len() < PREFILL_WINDOW_LENGTH { + return Some((local_time.nseconds(), duration, false)); + } + + // Check if the slope is simply wrong and try correcting + { + let local_diff = inner + .times + .back() + .unwrap() + .1 + .saturating_sub(inner.times.front().unwrap().1); + let remote_diff = inner + .times + .back() + .unwrap() + .0 + .saturating_sub(inner.times.front().unwrap().0); + + if remote_diff == 0 { + inner.reset(); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + // Static remote times + inner.slope_correction = (0, 0); + return None; + } else { + let slope = local_diff as f64 / remote_diff as f64; + let scaled_slope = + slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64); + + // Check for some obviously wrong slopes and try to correct for that + if !(0.5..1.5).contains(&scaled_slope) { + gst::warning!( + CAT, + obj = element, + "Too small/big slope {}, resetting", + scaled_slope + ); + + let discont = !inner.deltas.is_empty(); + inner.reset(); + + if (0.0005..0.0015).contains(&slope) { + // Remote unit was actually 0.1ns + inner.slope_correction = (1, 1000); + } else if (0.005..0.015).contains(&slope) { + // Remote unit was actually 1ns + inner.slope_correction = (1, 100); + } else if (0.05..0.15).contains(&slope) { + // Remote unit was actually 10ns + inner.slope_correction = (1, 10); + } else if (5.0..15.0).contains(&slope) { + // Remote unit was actually 1us + inner.slope_correction = (10, 1); + } else if (50.0..150.0).contains(&slope) { + // Remote unit was actually 10us + inner.slope_correction = (100, 1); + } else if (50.0..150.0).contains(&slope) { + // Remote unit was actually 100us + inner.slope_correction = (1000, 1); + } else if (50.0..150.0).contains(&slope) { + // Remote unit was actually 1ms + inner.slope_correction = (10000, 1); + } else { + inner.slope_correction = (1, 1); + } + + let remote_time = inner + .times + .back() + .unwrap() + .0 + .mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; + gst::debug!( + CAT, + obj = element, + "Initializing base time: local {}, remote {}, slope correction {}/{}", + local_time.nseconds(), + remote_time.nseconds(), + inner.slope_correction.0, + inner.slope_correction.1, + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return Some((local_time.nseconds(), duration, discont)); + } + } + } + + let remote_diff = remote_time.saturating_sub(base_remote_time); + let local_diff = local_time.saturating_sub(base_local_time); + let delta = (local_diff as i64) - (remote_diff as i64); + + gst::trace!( + CAT, + obj = element, + "Local diff {}, remote diff {}, delta {}", + local_diff.nseconds(), + remote_diff.nseconds(), + delta, + ); + + if (delta > inner.skew && delta - inner.skew > 1_000_000_000) + || (delta < inner.skew && inner.skew - delta > 1_000_000_000) + { + gst::warning!( + CAT, + obj = element, + "Delta {} too far from skew {}, resetting", + delta, + inner.skew + ); + + let discont = !inner.deltas.is_empty(); + + gst::debug!( + CAT, + obj = element, + "Initializing base time: local {}, remote {}", + local_time.nseconds(), + remote_time.nseconds(), + ); + + inner.reset(); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return Some((local_time.nseconds(), duration, discont)); + } + + if inner.filling { + if inner.deltas.is_empty() || delta < inner.min_delta { + inner.min_delta = delta; + } + inner.deltas.push_back(delta); + + if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH { + inner.window_size = inner.deltas.len(); + inner.skew = inner.min_delta; + inner.filling = false; + } else { + let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64; + let perc_window = (inner.deltas.len() as u64) + .mul_div_floor(100, WINDOW_LENGTH) + .unwrap() as i64; + let perc = cmp::max(perc_time, perc_window); + + inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000; + } + } else { + let old = inner.deltas.pop_front().unwrap(); + inner.deltas.push_back(delta); + + if delta <= inner.min_delta { + inner.min_delta = delta; + } else if old == inner.min_delta { + inner.min_delta = inner.deltas.iter().copied().min().unwrap(); + } + + inner.skew = (inner.min_delta + (124 * inner.skew)) / 125; + } + + let out_time = base_local_time + remote_diff; + let out_time = if inner.skew < 0 { + out_time.saturating_sub((-inner.skew) as u64) + } else { + out_time + (inner.skew as u64) + }; + + gst::trace!( + CAT, + obj = element, + "Skew {}, min delta {}", + inner.skew, + inner.min_delta + ); + gst::trace!(CAT, obj = element, "Outputting {}", out_time.nseconds()); + + Some((out_time.nseconds(), duration, false)) + } +} diff --git a/net/ndi/src/ndisrcdemux/imp.rs b/net/ndi/src/ndisrcdemux/imp.rs index b629b3ca..2a645d3c 100644 --- a/net/ndi/src/ndisrcdemux/imp.rs +++ b/net/ndi/src/ndisrcdemux/imp.rs @@ -1,19 +1,18 @@ // SPDX-License-Identifier: MPL-2.0 -use atomic_refcell::AtomicRefCell; use gst::prelude::*; use gst::subclass::prelude::*; use gst_video::prelude::*; use once_cell::sync::Lazy; -use std::{cmp, collections::VecDeque, sync::Mutex}; +use std::sync::Mutex; use byte_slice_cast::*; use crate::{ ndi_cc_meta::NDICCMetaDecoder, ndisrcmeta::{self, Buffer}, - ndisys, TimestampMode, + ndisys, }; static CAT: Lazy = Lazy::new(|| { @@ -41,11 +40,6 @@ struct State { ndi_cc_decoder: Option, pending_metadata: Vec, - - // Audio/video time observations - timestamp_mode: TimestampMode, - observations_timestamp: [Observations; 2], - observations_timecode: [Observations; 2], } impl Default for State { @@ -67,10 +61,6 @@ impl Default for State { ndi_cc_decoder: None, pending_metadata: Vec::new(), - - timestamp_mode: TimestampMode::Auto, - observations_timestamp: [Observations::default(), Observations::default()], - observations_timecode: [Observations::default(), Observations::default()], } } } @@ -212,6 +202,11 @@ impl NdiSrcDemux { ) -> Result { gst::log!(CAT, imp = self, "Handling buffer {:?}", buffer); + let pts = buffer.pts(); + let duration = buffer.duration(); + let resync = buffer.flags().contains(gst::BufferFlags::RESYNC); + let discont = buffer.flags().contains(gst::BufferFlags::DISCONT); + let mut meta = buffer .make_mut() .meta_mut::() @@ -490,51 +485,15 @@ impl NdiSrcDemux { let srcpad; let buffer; match ndi_buffer { - Buffer::Audio { - frame, - discont, - receive_time_gst, - receive_time_real, - } => { + Buffer::Audio { frame, .. } => { srcpad = state.audio_pad.clone().unwrap(); - let (pts, duration, resync) = self - .calculate_audio_timestamp( - &mut state, - receive_time_gst, - receive_time_real, - &frame, - ) - .ok_or_else(|| { - gst::debug!(CAT, imp = self, "Flushing, dropping buffer"); - gst::FlowError::Flushing - })?; - buffer = self.create_audio_buffer(&state, pts, duration, discont, resync, frame)?; - gst::log!(CAT, imp = self, "Produced audio buffer {:?}", buffer); } - Buffer::Video { - frame, - discont, - receive_time_gst, - receive_time_real, - } => { + Buffer::Video { frame, .. } => { srcpad = state.video_pad.clone().unwrap(); - let (pts, duration, resync) = self - .calculate_video_timestamp( - &mut state, - receive_time_gst, - receive_time_real, - &frame, - ) - .ok_or_else(|| { - gst::debug!(CAT, imp = self, "Flushing, dropping buffer"); - gst::FlowError::Flushing - })?; - buffer = self.create_video_buffer(&mut state, pts, duration, discont, resync, frame)?; - gst::log!(CAT, imp = self, "Produced video buffer {:?}", buffer); } Buffer::Metadata { frame, .. } => { @@ -610,126 +569,6 @@ impl NdiSrcDemux { } impl NdiSrcDemux { - #[allow(clippy::too_many_arguments)] - fn calculate_timestamp( - &self, - state: &mut State, - is_audio: bool, - receive_time_gst: gst::ClockTime, - receive_time_real: gst::ClockTime, - timestamp: i64, - timecode: i64, - duration: Option, - ) -> Option<(gst::ClockTime, Option, bool)> { - let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined { - gst::ClockTime::NONE - } else { - Some((timestamp as u64 * 100).nseconds()) - }; - let timecode = (timecode as u64 * 100).nseconds(); - - gst::log!( - CAT, - imp = self, - "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", - timecode, - timestamp.display(), - duration.display(), - receive_time_gst.display(), - receive_time_real, - ); - - let res_timestamp = state.observations_timestamp[usize::from(!is_audio)].process( - self.obj().upcast_ref(), - timestamp, - receive_time_gst, - duration, - ); - - let res_timecode = state.observations_timecode[usize::from(!is_audio)].process( - self.obj().upcast_ref(), - Some(timecode), - receive_time_gst, - duration, - ); - - let (pts, duration, discont) = match state.timestamp_mode { - TimestampMode::ReceiveTimeTimecode => match res_timecode { - Some((pts, duration, discont)) => (pts, duration, discont), - None => { - gst::warning!(CAT, imp = self, "Can't calculate timestamp"); - (receive_time_gst, duration, false) - } - }, - TimestampMode::ReceiveTimeTimestamp => match res_timestamp { - Some((pts, duration, discont)) => (pts, duration, discont), - None => { - if timestamp.is_some() { - gst::warning!(CAT, imp = self, "Can't calculate timestamp"); - } - - (receive_time_gst, duration, false) - } - }, - TimestampMode::Timecode => (timecode, duration, false), - TimestampMode::Timestamp if timestamp.is_none() => (receive_time_gst, duration, false), - TimestampMode::Timestamp => { - // Timestamps are relative to the UNIX epoch - let timestamp = timestamp?; - if receive_time_real > timestamp { - let diff = receive_time_real - timestamp; - if diff > receive_time_gst { - (gst::ClockTime::ZERO, duration, false) - } else { - (receive_time_gst - diff, duration, false) - } - } else { - let diff = timestamp - receive_time_real; - (receive_time_gst + diff, duration, false) - } - } - TimestampMode::ReceiveTime => (receive_time_gst, duration, false), - TimestampMode::Auto => { - res_timecode - .or(res_timestamp) - .unwrap_or((receive_time_gst, duration, false)) - } - }; - - gst::log!( - CAT, - imp = self, - "Calculated PTS {}, duration {}", - pts.display(), - duration.display(), - ); - - Some((pts, duration, discont)) - } - - fn calculate_video_timestamp( - &self, - state: &mut State, - receive_time_gst: gst::ClockTime, - receive_time_real: gst::ClockTime, - video_frame: &crate::ndi::VideoFrame, - ) -> Option<(gst::ClockTime, Option, bool)> { - let duration = gst::ClockTime::SECOND.mul_div_floor( - video_frame.frame_rate().1 as u64, - video_frame.frame_rate().0 as u64, - ); - - self.calculate_timestamp( - state, - false, - receive_time_gst, - receive_time_real, - video_frame.timestamp(), - video_frame.timecode(), - duration, - ) - } - fn create_video_buffer_pool(&self, video_info: &gst_video::VideoInfo) -> gst::BufferPool { let pool = gst_video::VideoBufferPool::new(); let mut config = pool.config(); @@ -958,7 +797,7 @@ impl NdiSrcDemux { fn create_video_buffer( &self, state: &mut State, - pts: gst::ClockTime, + pts: Option, duration: Option, discont: bool, resync: bool, @@ -1356,29 +1195,6 @@ impl NdiSrcDemux { } } - fn calculate_audio_timestamp( - &self, - state: &mut State, - receive_time_gst: gst::ClockTime, - receive_time_real: gst::ClockTime, - audio_frame: &crate::ndi::AudioFrame, - ) -> Option<(gst::ClockTime, Option, bool)> { - let duration = gst::ClockTime::SECOND.mul_div_floor( - audio_frame.no_samples() as u64, - audio_frame.sample_rate() as u64, - ); - - self.calculate_timestamp( - state, - true, - receive_time_gst, - receive_time_real, - audio_frame.timestamp(), - audio_frame.timecode(), - duration, - ) - } - fn create_audio_info( &self, audio_frame: &crate::ndi::AudioFrame, @@ -1461,7 +1277,7 @@ impl NdiSrcDemux { fn create_audio_buffer( &self, state: &State, - pts: gst::ClockTime, + pts: Option, duration: Option, discont: bool, resync: bool, @@ -1761,302 +1577,3 @@ impl VideoInfo { } } } - -const PREFILL_WINDOW_LENGTH: usize = 12; -const WINDOW_LENGTH: u64 = 512; -const WINDOW_DURATION: u64 = 2_000_000_000; - -#[derive(Default)] -struct Observations(AtomicRefCell); - -struct ObservationsInner { - base_remote_time: Option, - base_local_time: Option, - deltas: VecDeque, - min_delta: i64, - skew: i64, - filling: bool, - window_size: usize, - - // Remote/local times for workaround around fundamentally wrong slopes - // This is not reset below and has a bigger window. - times: VecDeque<(u64, u64)>, - slope_correction: (u64, u64), -} - -impl Default for ObservationsInner { - fn default() -> ObservationsInner { - ObservationsInner { - base_local_time: None, - base_remote_time: None, - deltas: VecDeque::new(), - min_delta: 0, - skew: 0, - filling: true, - window_size: 0, - times: VecDeque::new(), - slope_correction: (1, 1), - } - } -} - -impl ObservationsInner { - fn reset(&mut self) { - self.base_local_time = None; - self.base_remote_time = None; - self.deltas = VecDeque::new(); - self.min_delta = 0; - self.skew = 0; - self.filling = true; - self.window_size = 0; - } -} - -impl Observations { - // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from - // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays": - // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 - fn process( - &self, - element: &gst::Element, - remote_time: Option, - local_time: gst::ClockTime, - duration: Option, - ) -> Option<(gst::ClockTime, Option, bool)> { - let remote_time = remote_time?.nseconds(); - let local_time = local_time.nseconds(); - - let mut inner = self.0.borrow_mut(); - - gst::trace!( - CAT, - obj = element, - "Local time {}, remote time {}, slope correct {}/{}", - local_time.nseconds(), - remote_time.nseconds(), - inner.slope_correction.0, - inner.slope_correction.1, - ); - - inner.times.push_back((remote_time, local_time)); - while inner - .times - .back() - .unwrap() - .1 - .saturating_sub(inner.times.front().unwrap().1) - > WINDOW_DURATION - { - let _ = inner.times.pop_front(); - } - - // Static remote times - if inner.slope_correction.1 == 0 { - return None; - } - - let remote_time = - remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; - - let (base_remote_time, base_local_time) = - match (inner.base_remote_time, inner.base_local_time) { - (Some(remote), Some(local)) => (remote, local), - _ => { - gst::debug!( - CAT, - obj = element, - "Initializing base time: local {}, remote {}", - local_time.nseconds(), - remote_time.nseconds(), - ); - inner.base_remote_time = Some(remote_time); - inner.base_local_time = Some(local_time); - - return Some((local_time.nseconds(), duration, true)); - } - }; - - if inner.times.len() < PREFILL_WINDOW_LENGTH { - return Some((local_time.nseconds(), duration, false)); - } - - // Check if the slope is simply wrong and try correcting - { - let local_diff = inner - .times - .back() - .unwrap() - .1 - .saturating_sub(inner.times.front().unwrap().1); - let remote_diff = inner - .times - .back() - .unwrap() - .0 - .saturating_sub(inner.times.front().unwrap().0); - - if remote_diff == 0 { - inner.reset(); - inner.base_remote_time = Some(remote_time); - inner.base_local_time = Some(local_time); - - // Static remote times - inner.slope_correction = (0, 0); - return None; - } else { - let slope = local_diff as f64 / remote_diff as f64; - let scaled_slope = - slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64); - - // Check for some obviously wrong slopes and try to correct for that - if !(0.5..1.5).contains(&scaled_slope) { - gst::warning!( - CAT, - obj = element, - "Too small/big slope {}, resetting", - scaled_slope - ); - - let discont = !inner.deltas.is_empty(); - inner.reset(); - - if (0.0005..0.0015).contains(&slope) { - // Remote unit was actually 0.1ns - inner.slope_correction = (1, 1000); - } else if (0.005..0.015).contains(&slope) { - // Remote unit was actually 1ns - inner.slope_correction = (1, 100); - } else if (0.05..0.15).contains(&slope) { - // Remote unit was actually 10ns - inner.slope_correction = (1, 10); - } else if (5.0..15.0).contains(&slope) { - // Remote unit was actually 1us - inner.slope_correction = (10, 1); - } else if (50.0..150.0).contains(&slope) { - // Remote unit was actually 10us - inner.slope_correction = (100, 1); - } else if (50.0..150.0).contains(&slope) { - // Remote unit was actually 100us - inner.slope_correction = (1000, 1); - } else if (50.0..150.0).contains(&slope) { - // Remote unit was actually 1ms - inner.slope_correction = (10000, 1); - } else { - inner.slope_correction = (1, 1); - } - - let remote_time = inner - .times - .back() - .unwrap() - .0 - .mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; - gst::debug!( - CAT, - obj = element, - "Initializing base time: local {}, remote {}, slope correction {}/{}", - local_time.nseconds(), - remote_time.nseconds(), - inner.slope_correction.0, - inner.slope_correction.1, - ); - inner.base_remote_time = Some(remote_time); - inner.base_local_time = Some(local_time); - - return Some((local_time.nseconds(), duration, discont)); - } - } - } - - let remote_diff = remote_time.saturating_sub(base_remote_time); - let local_diff = local_time.saturating_sub(base_local_time); - let delta = (local_diff as i64) - (remote_diff as i64); - - gst::trace!( - CAT, - obj = element, - "Local diff {}, remote diff {}, delta {}", - local_diff.nseconds(), - remote_diff.nseconds(), - delta, - ); - - if (delta > inner.skew && delta - inner.skew > 1_000_000_000) - || (delta < inner.skew && inner.skew - delta > 1_000_000_000) - { - gst::warning!( - CAT, - obj = element, - "Delta {} too far from skew {}, resetting", - delta, - inner.skew - ); - - let discont = !inner.deltas.is_empty(); - - gst::debug!( - CAT, - obj = element, - "Initializing base time: local {}, remote {}", - local_time.nseconds(), - remote_time.nseconds(), - ); - - inner.reset(); - inner.base_remote_time = Some(remote_time); - inner.base_local_time = Some(local_time); - - return Some((local_time.nseconds(), duration, discont)); - } - - if inner.filling { - if inner.deltas.is_empty() || delta < inner.min_delta { - inner.min_delta = delta; - } - inner.deltas.push_back(delta); - - if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH { - inner.window_size = inner.deltas.len(); - inner.skew = inner.min_delta; - inner.filling = false; - } else { - let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64; - let perc_window = (inner.deltas.len() as u64) - .mul_div_floor(100, WINDOW_LENGTH) - .unwrap() as i64; - let perc = cmp::max(perc_time, perc_window); - - inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000; - } - } else { - let old = inner.deltas.pop_front().unwrap(); - inner.deltas.push_back(delta); - - if delta <= inner.min_delta { - inner.min_delta = delta; - } else if old == inner.min_delta { - inner.min_delta = inner.deltas.iter().copied().min().unwrap(); - } - - inner.skew = (inner.min_delta + (124 * inner.skew)) / 125; - } - - let out_time = base_local_time + remote_diff; - let out_time = if inner.skew < 0 { - out_time.saturating_sub((-inner.skew) as u64) - } else { - out_time + (inner.skew as u64) - }; - - gst::trace!( - CAT, - obj = element, - "Skew {}, min delta {}", - inner.skew, - inner.min_delta - ); - gst::trace!(CAT, obj = element, "Outputting {}", out_time.nseconds()); - - Some((out_time.nseconds(), duration, false)) - } -} diff --git a/net/ndi/src/ndisrcmeta.rs b/net/ndi/src/ndisrcmeta.rs index 0b712d4d..86b83ce4 100644 --- a/net/ndi/src/ndisrcmeta.rs +++ b/net/ndi/src/ndisrcmeta.rs @@ -5,7 +5,6 @@ use std::fmt; use std::mem; use crate::ndi::{AudioFrame, MetadataFrame, VideoFrame}; -use crate::TimestampMode; #[repr(transparent)] pub struct NdiSrcMeta(imp::NdiSrcMeta); @@ -27,9 +26,7 @@ pub enum Buffer { }, Metadata { frame: MetadataFrame, - #[allow(unused)] receive_time_gst: gst::ClockTime, - #[allow(unused)] receive_time_real: gst::ClockTime, }, } @@ -41,15 +38,11 @@ impl NdiSrcMeta { pub fn add( buffer: &mut gst::BufferRef, ndi_buffer: Buffer, - timestamp_mode: TimestampMode, ) -> gst::MetaRefMut { unsafe { // Manually dropping because gst_buffer_add_meta() takes ownership of the // content of the struct - let mut params = mem::ManuallyDrop::new(imp::NdiSrcMetaParams { - ndi_buffer, - timestamp_mode, - }); + let mut params = mem::ManuallyDrop::new(imp::NdiSrcMetaParams { ndi_buffer }); let meta = gst::ffi::gst_buffer_add_meta( buffer.as_mut_ptr(), @@ -83,8 +76,6 @@ impl fmt::Debug for NdiSrcMeta { } mod imp { - use crate::TimestampMode; - use super::Buffer; use glib::translate::*; use once_cell::sync::Lazy; @@ -93,14 +84,12 @@ mod imp { pub(super) struct NdiSrcMetaParams { pub ndi_buffer: Buffer, - pub timestamp_mode: TimestampMode, } #[repr(C)] pub struct NdiSrcMeta { parent: gst::ffi::GstMeta, pub(super) ndi_buffer: Option, - pub(super) timestamp_mode: TimestampMode, } pub(super) fn ndi_src_meta_api_get_type() -> glib::Type { @@ -129,7 +118,6 @@ mod imp { let params = ptr::read(params as *const NdiSrcMetaParams); ptr::write(&mut meta.ndi_buffer, Some(params.ndi_buffer)); - ptr::write(&mut meta.timestamp_mode, params.timestamp_mode); true.into_glib() }