diff --git a/net/ndi/src/receiver.rs b/net/ndi/src/receiver.rs index b0d987767..c01b84032 100644 --- a/net/ndi/src/receiver.rs +++ b/net/ndi/src/receiver.rs @@ -191,7 +191,9 @@ pub struct ReceiverInner { queue: ReceiverQueue, max_queue_length: usize, - observations: Observations, + // Audio/video time observations + observations_timestamp: [Observations; 2], + observations_timecode: [Observations; 2], element: glib::WeakRef, timestamp_mode: TimestampMode, @@ -227,6 +229,7 @@ struct ReceiverQueueInner { const WINDOW_LENGTH: u64 = 512; const WINDOW_DURATION: u64 = 2_000_000_000; +#[derive(Default)] struct Observations(AtomicRefCell); struct ObservationsInner { @@ -254,26 +257,21 @@ impl Default for ObservationsInner { } impl Observations { - fn new() -> Self { - Self(AtomicRefCell::new(ObservationsInner::default())) - } - // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays": // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 fn process( &self, element: &gst_base::BaseSrc, - time: (Option, gst::ClockTime), + remote_time: Option, + local_time: gst::ClockTime, duration: Option, ) -> (gst::ClockTime, Option, bool) { - if time.0.is_none() { - return (time.1, duration, false); - } - - let time = (time.0.unwrap(), time.1); - let remote_time = time.0.nseconds(); - let local_time = time.1.nseconds(); + let remote_time = match remote_time { + None => return (local_time, duration, false), + Some(remote_time) => remote_time.nseconds(), + }; + let local_time = local_time.nseconds(); gst_trace!( CAT, @@ -488,7 +486,8 @@ impl Receiver { Condvar::new(), ))), max_queue_length, - observations: Observations::new(), + observations_timestamp: Default::default(), + observations_timecode: Default::default(), element: element.downgrade(), timestamp_mode, timeout, @@ -780,6 +779,7 @@ impl Receiver { fn calculate_timestamp( &self, element: &gst_base::BaseSrc, + is_audio: bool, timestamp: i64, timecode: i64, duration: Option, @@ -805,17 +805,23 @@ impl Receiver { real_time_now, ); + let res_timestamp = self.0.observations_timestamp[if is_audio { 0 } else { 1 }].process( + element, + timestamp, + receive_time, + duration, + ); + + let res_timecode = self.0.observations_timecode[if is_audio { 0 } else { 1 }].process( + element, + Some(timecode), + receive_time, + duration, + ); + let (pts, duration, discont) = match self.0.timestamp_mode { - TimestampMode::ReceiveTimeTimecode => { - self.0 - .observations - .process(element, (Some(timecode), receive_time), duration) - } - TimestampMode::ReceiveTimeTimestamp => { - self.0 - .observations - .process(element, (timestamp, receive_time), duration) - } + TimestampMode::ReceiveTimeTimecode => res_timecode, + TimestampMode::ReceiveTimeTimestamp => res_timestamp, TimestampMode::Timecode => (timecode, duration, false), TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration, false), TimestampMode::Timestamp => { @@ -888,6 +894,7 @@ impl Receiver { self.calculate_timestamp( element, + false, video_frame.timestamp(), video_frame.timecode(), duration, @@ -1438,6 +1445,7 @@ impl Receiver { self.calculate_timestamp( element, + true, audio_frame.timestamp(), audio_frame.timecode(), duration,