diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 93be0a11..06b5b0a5 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7297,6 +7297,18 @@ "readable": true, "type": "guint", "writable": false + }, + "timestamping-mode": { + "blurb": "Govern how to pick presentation timestamps for packets", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "skew (2)", + "mutable": "ready", + "readable": true, + "type": "GstRtpBin2TimestampingMode", + "writable": true } }, "rank": "none" @@ -8522,6 +8534,26 @@ } } }, + "GstRtpBin2TimestampingMode": { + "kind": "enum", + "values": [ + { + "desc": "Use arrival time as timestamp", + "name": "arrival", + "value": "0" + }, + { + "desc": "Use RTP timestamps as is", + "name": "rtp", + "value": "1" + }, + { + "desc": "Correct skew to synchronize sender and receiver clocks", + "name": "skew", + "value": "2" + } + ] + }, "GstRtpGCCBwEEstimator": { "kind": "enum", "values": [ diff --git a/net/rtp/src/rtpbin2/imp.rs b/net/rtp/src/rtpbin2/imp.rs index 576b3eeb..4d983cd6 100644 --- a/net/rtp/src/rtpbin2/imp.rs +++ b/net/rtp/src/rtpbin2/imp.rs @@ -19,6 +19,7 @@ use super::session::{ Session, RTCP_MIN_REPORT_INTERVAL, }; use super::source::{ReceivedRb, SourceState}; +use super::sync; use crate::rtpbin2::RUNTIME; @@ -69,6 +70,7 @@ struct Settings { min_rtcp_interval: Duration, profile: Profile, reduced_size_rtcp: bool, + timestamping_mode: sync::TimestampingMode, } impl Default for Settings { @@ -78,6 +80,7 @@ impl Default for Settings { min_rtcp_interval: DEFAULT_MIN_RTCP_INTERVAL, profile: Profile::default(), reduced_size_rtcp: DEFAULT_REDUCED_SIZE_RTCP, + timestamping_mode: sync::TimestampingMode::default(), } } } @@ -390,7 +393,9 @@ impl BinSessionInner { }; let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); - let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, pad.push(buffer)); + let flow = pad.push(buffer); + gst::trace!(CAT, obj: pad, "Pushed buffer, flow ret {:?}", flow); + let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); // TODO: store flow, return only on session pads? })?; @@ -496,6 +501,7 @@ struct State { rtcp_waker: Option, max_session_id: usize, pads_session_id_map: HashMap, + sync_context: Option, } impl State { @@ -854,11 +860,11 @@ impl RtpBin2 { fn rtp_recv_sink_chain( &self, - _pad: &gst::Pad, + pad: &gst::Pad, id: usize, mut buffer: gst::Buffer, ) -> Result { - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); let Some(session) = state.session_by_id(id) else { return Err(gst::FlowError::Error); }; @@ -869,10 +875,29 @@ impl RtpBin2 { // // Check if this makes sense or if this leads to issue with eg interleaved // TCP. - if buffer.dts().is_none() { - let buf_mut = buffer.make_mut(); - buf_mut.set_dts(self.obj().current_running_time()); - } + let arrival_time = match buffer.dts() { + Some(dts) => { + let session_inner = session.inner.lock().unwrap(); + let segment = session_inner.rtp_recv_sink_segment.as_ref().unwrap(); + // TODO: use running_time_full if we care to support that + match segment.to_running_time(dts) { + Some(time) => time, + None => { + gst::error!(CAT, obj: pad, "out of segment DTS are not supported"); + return Err(gst::FlowError::Error); + } + } + } + None => match self.obj().current_running_time() { + Some(time) => time, + None => { + gst::error!(CAT, obj: pad, "Failed to get current time"); + return Err(gst::FlowError::Error); + } + }, + }; + + gst::trace!(CAT, obj: pad, "using arrival time {}", arrival_time); let addr: Option = buffer @@ -906,7 +931,45 @@ impl RtpBin2 { }; let session = session.clone(); + let mut session_inner = session.inner.lock().unwrap(); + + let current_caps = session_inner.rtp_recv_sink_caps.clone(); + let ssrc_map = session_inner + .caps_map + .entry(rtp.payload_type()) + .or_default(); + if ssrc_map.get(&rtp.ssrc()).is_none() { + if let Some(mut caps) = + current_caps.filter(|caps| Self::clock_rate_from_caps(caps).is_some()) + { + state + .sync_context + .as_mut() + .unwrap() + .set_clock_rate(rtp.ssrc(), Self::clock_rate_from_caps(&caps).unwrap()); + { + // Ensure the caps we send out hold a payload field + let caps = caps.make_mut(); + let s = caps.structure_mut(0).unwrap(); + s.set("payload", rtp.payload_type() as i32); + } + ssrc_map.insert(rtp.ssrc(), caps); + } + } + + // TODO: Put NTP time as `gst::ReferenceTimeStampMeta` on the buffers if selected via property + let (pts, _ntp_time) = state.sync_context.as_mut().unwrap().calculate_pts( + rtp.ssrc(), + rtp.timestamp(), + arrival_time.nseconds(), + ); + let segment = session_inner.rtp_recv_sink_segment.as_ref().unwrap(); + let pts = segment + .position_from_running_time(gst::ClockTime::from_nseconds(pts)) + .unwrap(); + gst::debug!(CAT, "Calculated PTS: {}", pts); + drop(state); // Start jitterbuffer task now if not started yet @@ -922,6 +985,10 @@ impl RtpBin2 { let pt = rtp.payload_type(); let ssrc = rtp.ssrc(); drop(mapped); + { + let buf_mut = buffer.make_mut(); + buf_mut.set_pts(pts); + } let (pad, new_pad) = session_inner.get_or_create_rtp_recv_src(self, pt, ssrc); session_inner.recv_store.push(HeldRecvBuffer { hold_id: Some(hold_id), @@ -956,6 +1023,10 @@ impl RtpBin2 { let pt = rtp.payload_type(); let ssrc = rtp.ssrc(); drop(mapped); + { + let buf_mut = buffer.make_mut(); + buf_mut.set_pts(pts); + } let (pad, new_pad) = session_inner.get_or_create_rtp_recv_src(self, pt, ssrc); buffers_to_push.push(HeldRecvBuffer { hold_id: None, @@ -998,7 +1069,7 @@ impl RtpBin2 { match jitterbuffer_store.jitterbuffer.queue( &rtp, - held.buffer.dts().unwrap().nseconds(), + held.buffer.pts().unwrap().nseconds(), now, ) { jitterbuffer::QueueResult::Queued(id) => { @@ -1136,6 +1207,20 @@ impl RtpBin2 { gst::debug!(CAT, imp: self, "Can't send force-keyunit event because of missing sinkpad"); } } + RtcpRecvReply::NewCName((cname, ssrc)) => { + let mut state = self.state.lock().unwrap(); + + state.sync_context.as_mut().unwrap().associate(ssrc, &cname); + } + RtcpRecvReply::NewRtpNtp((ssrc, rtp, ntp)) => { + let mut state = self.state.lock().unwrap(); + + state + .sync_context + .as_mut() + .unwrap() + .add_sender_report(ssrc, rtp, ntp); + } } } drop(mapped); @@ -1304,6 +1389,20 @@ impl RtpBin2 { } } + fn clock_rate_from_caps(caps: &gst::CapsRef) -> Option { + let Some(s) = caps.structure(0) else { + return None; + }; + let Some(clock_rate) = s.get::("clock-rate").ok() else { + return None; + }; + if clock_rate > 0 { + Some(clock_rate as u32) + } else { + None + } + } + fn pt_clock_rate_from_caps(caps: &gst::CapsRef) -> Option<(u8, u32)> { let Some(s) = caps.structure(0) else { return None; @@ -1430,6 +1529,12 @@ impl ObjectImpl for RtpBin2 { .default_value(DEFAULT_REDUCED_SIZE_RTCP) .mutable_ready() .build(), + glib::ParamSpecEnum::builder::("timestamping-mode") + .nick("Timestamping Mode") + .blurb("Govern how to pick presentation timestamps for packets") + .default_value(sync::TimestampingMode::default()) + .mutable_ready() + .build(), ] }); @@ -1465,6 +1570,12 @@ impl ObjectImpl for RtpBin2 { let mut settings = self.settings.lock().unwrap(); settings.reduced_size_rtcp = value.get::().expect("Type checked upstream"); } + "timestamping-mode" => { + let mut settings = self.settings.lock().unwrap(); + settings.timestamping_mode = value + .get::() + .expect("Type checked upstream"); + } _ => unimplemented!(), } } @@ -1491,6 +1602,10 @@ impl ObjectImpl for RtpBin2 { let settings = self.settings.lock().unwrap(); settings.reduced_size_rtcp.to_value() } + "timestamping-mode" => { + let settings = self.settings.lock().unwrap(); + settings.timestamping_mode.to_value() + } _ => unimplemented!(), } } @@ -1576,7 +1691,6 @@ impl ElementImpl for RtpBin2 { name: Option<&str>, _caps: Option<&gst::Caps>, // XXX: do something with caps? ) -> Option { - let this = self.obj(); let settings = self.settings.lock().unwrap().clone(); let mut state = self.state.lock().unwrap(); let max_session_id = state.max_session_id; @@ -1599,7 +1713,12 @@ impl ElementImpl for RtpBin2 { match templ.name_template() { "rtp_send_sink_%u" => { sess_parse(name, "rtp_send_sink_", max_session_id).and_then(|id| { - let new_pad = move |session: &mut BinSessionInner| -> Option<(gst::Pad, Option, usize)> { + let new_pad = move |session: &mut BinSessionInner| -> Option<( + gst::Pad, + Option, + usize, + Vec, + )> { let sinkpad = gst::Pad::builder_from_template(templ) .chain_function(move |_pad, parent, buffer| { RtpBin2::catch_panic_pad_function( @@ -1609,28 +1728,36 @@ impl ElementImpl for RtpBin2 { ) }) .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad)) + RtpBin2::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) + }) + .event_function(move |pad, parent, event| { + RtpBin2::catch_panic_pad_function( + parent, + || false, + |this| this.rtp_send_sink_event(pad, event, id), + ) }) - .event_function(move |pad, parent, event| - RtpBin2::catch_panic_pad_function(parent, || false, |this| this.rtp_send_sink_event(pad, event, id)) - ) .flags(gst::PadFlags::PROXY_CAPS) .name(format!("rtp_send_sink_{}", id)) .build(); - sinkpad.set_active(true).unwrap(); - this.add_pad(&sinkpad).unwrap(); let src_templ = self.obj().pad_template("rtp_send_src_%u").unwrap(); let srcpad = gst::Pad::builder_from_template(&src_templ) .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad)) + RtpBin2::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) }) .name(format!("rtp_send_src_{}", id)) .build(); - srcpad.set_active(true).unwrap(); - this.add_pad(&srcpad).unwrap(); session.rtp_send_sinkpad = Some(sinkpad.clone()); session.rtp_send_srcpad = Some(srcpad.clone()); - Some((sinkpad, Some(srcpad), id)) + Some((sinkpad, Some(srcpad), id, vec![])) }; let session = state.session_by_id(id); @@ -1653,7 +1780,12 @@ impl ElementImpl for RtpBin2 { } "rtp_recv_sink_%u" => { sess_parse(name, "rtp_recv_sink_", max_session_id).and_then(|id| { - let new_pad = move |session: &mut BinSessionInner| -> Option<(gst::Pad, Option, usize)> { + let new_pad = move |session: &mut BinSessionInner| -> Option<( + gst::Pad, + Option, + usize, + Vec, + )> { let sinkpad = gst::Pad::builder_from_template(templ) .chain_function(move |pad, parent, buffer| { RtpBin2::catch_panic_pad_function( @@ -1663,17 +1795,23 @@ impl ElementImpl for RtpBin2 { ) }) .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad)) + RtpBin2::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) + }) + .event_function(move |pad, parent, event| { + RtpBin2::catch_panic_pad_function( + parent, + || false, + |this| this.rtp_recv_sink_event(pad, event, id), + ) }) - .event_function(move |pad, parent, event| - RtpBin2::catch_panic_pad_function(parent, || false, |this| this.rtp_recv_sink_event(pad, event, id)) - ) .name(format!("rtp_recv_sink_{}", id)) .build(); - sinkpad.set_active(true).unwrap(); - this.add_pad(&sinkpad).unwrap(); session.rtp_recv_sinkpad = Some(sinkpad.clone()); - Some((sinkpad, None, id)) + Some((sinkpad, None, id, vec![])) }; let session = state.session_by_id(id); @@ -1710,14 +1848,16 @@ impl ElementImpl for RtpBin2 { ) }) .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad)) + RtpBin2::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) }) .name(format!("rtcp_recv_sink_{}", id)) .build(); - sinkpad.set_active(true).unwrap(); - this.add_pad(&sinkpad).unwrap(); session.rtcp_recv_sinkpad = Some(sinkpad.clone()); - Some((sinkpad, None, id)) + Some((sinkpad, None, id, vec![])) } }) }) @@ -1731,10 +1871,13 @@ impl ElementImpl for RtpBin2 { if session.rtcp_send_srcpad.is_some() { None } else { - let this = self.obj(); let srcpad = gst::Pad::builder_from_template(templ) .iterate_internal_links_function(|pad, parent| { - RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad)) + RtpBin2::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this| this.iterate_internal_links(pad), + ) }) .name(format!("rtcp_send_src_{}", id)) .build(); @@ -1749,27 +1892,34 @@ impl ElementImpl for RtpBin2 { let segment = gst::FormattedSegment::::new(); let segment = gst::event::Segment::new(&segment); - srcpad.set_active(true).unwrap(); - - let _ = srcpad.store_sticky_event(&stream_start); - let _ = srcpad.store_sticky_event(&caps); - let _ = srcpad.store_sticky_event(&segment); - - this.add_pad(&srcpad).unwrap(); session.rtcp_send_srcpad = Some(srcpad.clone()); - Some((srcpad, None, id)) + Some((srcpad, None, id, vec![stream_start, caps, segment])) } }) }) } _ => None, } - .map(|(pad, otherpad, id)| { + .map(|(pad, otherpad, id, sticky_events)| { state.max_session_id = (id + 1).max(state.max_session_id); state.pads_session_id_map.insert(pad.clone(), id); - if let Some(pad) = otherpad { - state.pads_session_id_map.insert(pad, id); + if let Some(ref pad) = otherpad { + state.pads_session_id_map.insert(pad.clone(), id); } + + drop(state); + + pad.set_active(true).unwrap(); + for event in sticky_events { + let _ = pad.store_sticky_event(&event); + } + self.obj().add_pad(&pad).unwrap(); + + if let Some(pad) = otherpad { + pad.set_active(true).unwrap(); + self.obj().add_pad(&pad).unwrap(); + } + pad }) } @@ -1838,20 +1988,28 @@ impl ElementImpl for RtpBin2 { self.parent_release_pad(pad) } + #[allow(clippy::single_match)] fn change_state( &self, transition: gst::StateChange, ) -> Result { + match transition { + gst::StateChange::ReadyToPaused => { + let settings = self.settings.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + + state.sync_context = Some(sync::Context::new(settings.timestamping_mode)); + } + _ => (), + } + let mut success = self.parent_change_state(transition)?; match transition { gst::StateChange::ReadyToNull => { self.stop_rtcp_task(); } - gst::StateChange::ReadyToPaused => { - success = gst::StateChangeSuccess::NoPreroll; - } - gst::StateChange::PlayingToPaused => { + gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToReady => { @@ -1879,6 +2037,7 @@ impl ElementImpl for RtpBin2 { for pad in removed_pads.iter() { state.pads_session_id_map.remove(pad); } + state.sync_context = None; drop(state); for pad in removed_pads { @@ -1891,6 +2050,7 @@ impl ElementImpl for RtpBin2 { } _ => (), } + Ok(success) } } diff --git a/net/rtp/src/rtpbin2/mod.rs b/net/rtp/src/rtpbin2/mod.rs index f57a7a59..19c5b264 100644 --- a/net/rtp/src/rtpbin2/mod.rs +++ b/net/rtp/src/rtpbin2/mod.rs @@ -7,6 +7,7 @@ mod imp; mod jitterbuffer; mod session; mod source; +mod sync; mod time; glib::wrapper! { @@ -14,6 +15,11 @@ glib::wrapper! { } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + crate::rtpbin2::sync::TimestampingMode::static_type() + .mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } gst::Element::register( Some(plugin), "rtpbin2", diff --git a/net/rtp/src/rtpbin2/session.rs b/net/rtp/src/rtpbin2/session.rs index 3cf60e5d..780528ce 100644 --- a/net/rtp/src/rtpbin2/session.rs +++ b/net/rtp/src/rtpbin2/session.rs @@ -147,6 +147,10 @@ pub enum RtcpRecvReply { TimerReconsideration, /// Request a key unit for the given SSRC of ours RequestKeyUnit { ssrcs: Vec, fir: bool }, + /// A new cname to ssrc mapping was found in a sdes: (cname, ssrc) + NewCName((String, u32)), + /// A new RTP to NTP mapping was received for an ssrc: (ssrc, RTP, NTP) + NewRtpNtp((u32, u32, u64)), } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -625,6 +629,12 @@ impl Session { sr.packet_count(), ); + replies.push(RtcpRecvReply::NewRtpNtp(( + sr.ssrc(), + sr.rtp_timestamp(), + sr.ntp_timestamp(), + ))); + for rb in sr.report_blocks() { if let Some(reply) = self.handle_rb(sr.ssrc(), rb, from, now, ntp_time) { replies.push(reply); @@ -675,6 +685,15 @@ impl Session { source.set_state(SourceState::Normal); source.set_last_activity(now); } + + if item.type_() == SdesItem::CNAME { + if let Ok(s) = std::str::from_utf8(item.value()) { + replies.push(RtcpRecvReply::NewCName(( + s.to_owned(), + chunk.ssrc(), + ))); + } + } } } } @@ -1086,6 +1105,8 @@ impl Session { } // RFC 3550 6.3.5 + // FIXME: we should surface this information to the element in order + // to perform clean up of the sync context fn handle_timeouts(&mut self, now: Instant) { trace!("handling rtcp timeouts"); let td = RTCP_SOURCE_TIMEOUT_N_INTERVALS @@ -1917,7 +1938,18 @@ pub(crate) mod tests { assert_eq!( session.handle_rtcp_recv(rtcp, len, None, now, ntp_now), - vec![] + vec![ + RtcpRecvReply::NewRtpNtp(( + ssrcs[0], + 4, + system_time_to_ntp_time_u64(ntp_now).as_u64() + )), + RtcpRecvReply::NewRtpNtp(( + ssrcs[1], + 20, + system_time_to_ntp_time_u64(ntp_now).as_u64() + )) + ] ); let (rtcp_data, _new_now, new_ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); @@ -2211,7 +2243,10 @@ pub(crate) mod tests { let rtcp = Compound::parse(&data[..len]).unwrap(); assert_eq!( session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now), - vec![RtcpRecvReply::NewSsrc(ssrc)] + vec![ + RtcpRecvReply::NewSsrc(ssrc), + RtcpRecvReply::NewCName(("cname".to_string(), ssrc)) + ] ); let rtp_data = generate_rtp_packet(ssrc, 500, 0, 4); @@ -2235,7 +2270,10 @@ pub(crate) mod tests { let rtcp = Compound::parse(&data[..len]).unwrap(); assert_eq!( session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now), - vec![RtcpRecvReply::NewSsrc(new_ssrc)] + vec![ + RtcpRecvReply::NewSsrc(new_ssrc), + RtcpRecvReply::NewCName(("cname".to_string(), new_ssrc)) + ] ); let rtp_data = generate_rtp_packet(new_ssrc, 510, 10, 4); @@ -2484,7 +2522,10 @@ pub(crate) mod tests { let rtcp = Compound::parse(&data[..len]).unwrap(); assert_eq!( session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now), - vec![RtcpRecvReply::NewSsrc(recv_ssrc)] + vec![ + RtcpRecvReply::NewSsrc(recv_ssrc), + RtcpRecvReply::NewCName(("cname1".to_string(), recv_ssrc)) + ] ); assert!(session.is_point_to_point); @@ -2508,7 +2549,11 @@ pub(crate) mod tests { let rtcp = Compound::parse(&data[..len]).unwrap(); assert_eq!( session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now), - vec![RtcpRecvReply::NewSsrc(recv2_ssrc)] + vec![ + RtcpRecvReply::NewCName(("cname1".to_string(), recv_ssrc)), + RtcpRecvReply::NewSsrc(recv2_ssrc), + RtcpRecvReply::NewCName(("cname1".to_string(), recv2_ssrc)) + ] ); assert!(session.is_point_to_point); @@ -2532,7 +2577,10 @@ pub(crate) mod tests { let rtcp = Compound::parse(&data[..len]).unwrap(); assert_eq!( session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now), - vec![] + vec![ + RtcpRecvReply::NewCName(("cname1".to_string(), recv_ssrc)), + RtcpRecvReply::NewCName(("cname2".to_string(), recv2_ssrc)) + ] ); assert!(!session.is_point_to_point); } diff --git a/net/rtp/src/rtpbin2/sync.rs b/net/rtp/src/rtpbin2/sync.rs new file mode 100644 index 00000000..31354f02 --- /dev/null +++ b/net/rtp/src/rtpbin2/sync.rs @@ -0,0 +1,822 @@ +use gst::glib; +use gst::prelude::MulDiv; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use std::time::Duration; + +use crate::utils::ExtendedTimestamp; + +use super::time::NtpTime; + +#[derive(Default, Debug)] +struct Ssrc { + cname: Option>, + clock_rate: Option, + extended_timestamp: ExtendedTimestamp, + last_sr_ntp_timestamp: Option, + last_sr_rtp_ext: Option, + // Arrival, RTP timestamp (extended), PTS (potentially skew-corrected) + base_times: Option<(u64, u64, u64)>, + current_delay: Option, + observations: Observations, +} + +impl Ssrc { + fn new(clock_rate: Option) -> Self { + Self { + clock_rate, + ..Default::default() + } + } + + fn reset_times(&mut self) { + self.extended_timestamp = ExtendedTimestamp::default(); + self.last_sr_ntp_timestamp = None; + self.last_sr_rtp_ext = None; + self.base_times = None; + self.current_delay = None; + self.observations = Observations::default(); + } + + /* Returns whether the caller should reset timing associated + * values for this ssrc (eg largest delay) */ + fn set_clock_rate(&mut self, clock_rate: u32) -> bool { + if Some(clock_rate) == self.clock_rate { + // No changes + return false; + } + + self.clock_rate = Some(clock_rate); + self.reset_times(); + true + } + + fn add_sender_report(&mut self, rtp_timestamp: u32, ntp_timestamp: u64) { + self.last_sr_rtp_ext = Some(self.extended_timestamp.next(rtp_timestamp)); + self.last_sr_ntp_timestamp = Some(ntp_timestamp.into()); + // Reset so that the next call to calculate_pts recalculates the NTP / RTP delay + self.current_delay = None; + } +} + +#[derive(Debug)] +struct CnameLargestDelay { + largest_delay: i64, + all_sync: bool, +} + +/// Govern how to pick presentation timestamps for packets +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, glib::Enum)] +#[repr(u32)] +#[enum_type(name = "GstRtpBin2TimestampingMode")] +pub enum TimestampingMode { + /// Simply use arrival time as timestamp + #[allow(dead_code)] + #[enum_value(name = "Use arrival time as timestamp", nick = "arrival")] + Arrival, + /// Use RTP timestamps as is + #[allow(dead_code)] + #[enum_value(name = "Use RTP timestamps as is", nick = "rtp")] + Rtp, + /// Correct skew to synchronize sender and receiver clocks + #[default] + #[enum_value( + name = "Correct skew to synchronize sender and receiver clocks", + nick = "skew" + )] + Skew, +} + +#[derive(Debug)] +pub struct Context { + ssrcs: HashMap, + mode: TimestampingMode, + cnames_to_ssrcs: HashMap, Vec>, + cname_to_largest_delays: HashMap, CnameLargestDelay>, +} + +impl Context { + pub fn new(mode: TimestampingMode) -> Self { + Self { + ssrcs: HashMap::new(), + mode, + cnames_to_ssrcs: HashMap::new(), + cname_to_largest_delays: HashMap::new(), + } + } + + pub fn set_clock_rate(&mut self, ssrc_val: u32, clock_rate: u32) { + if let Some(ssrc) = self.ssrcs.get_mut(&ssrc_val) { + if ssrc.set_clock_rate(clock_rate) { + debug!("{ssrc_val:#08x} times reset after clock rate change"); + if let Some(ref cname) = ssrc.cname { + self.cname_to_largest_delays.remove(cname); + } + } + } else { + self.ssrcs.insert(ssrc_val, Ssrc::new(Some(clock_rate))); + } + } + + fn disassociate(&mut self, ssrc_val: u32, cname: &str) { + self.cname_to_largest_delays.remove(cname); + + if let Some(ssrcs) = self.cnames_to_ssrcs.get_mut(cname) { + ssrcs.retain(|&other| other != ssrc_val); + } + } + + // FIXME: call this on timeouts / BYE (maybe collisions too?) + #[allow(dead_code)] + pub fn remove_ssrc(&mut self, ssrc_val: u32) { + if let Some(ssrc) = self.ssrcs.remove(&ssrc_val) { + debug!("{ssrc_val:#08x} ssrc removed"); + if let Some(ref cname) = ssrc.cname { + self.disassociate(ssrc_val, cname) + } + } + } + + pub fn associate(&mut self, ssrc_val: u32, cname: &str) { + let ssrc = self + .ssrcs + .entry(ssrc_val) + .or_insert_with(|| Ssrc::new(None)); + + let cname = Arc::::from(cname); + if let Some(ref old_cname) = ssrc.cname { + if old_cname == &cname { + return; + } + + ssrc.cname = Some(cname.clone()); + self.disassociate(ssrc_val, cname.as_ref()); + } else { + ssrc.cname = Some(cname.clone()); + } + + let ssrcs = self.cnames_to_ssrcs.entry(cname.clone()).or_default(); + ssrcs.push(ssrc_val); + // Recalculate a new largest delay next time calculate_pts is called + self.cname_to_largest_delays.remove(cname.as_ref()); + } + + pub fn add_sender_report(&mut self, ssrc_val: u32, rtp_timestamp: u32, ntp_timestamp: u64) { + debug!("Adding new sender report for ssrc {ssrc_val:#08x}"); + + let ssrc = self + .ssrcs + .entry(ssrc_val) + .or_insert_with(|| Ssrc::new(None)); + + debug!( + "Latest NTP time: {:?}", + NtpTime::from(ntp_timestamp).as_duration().unwrap() + ); + + ssrc.add_sender_report(rtp_timestamp, ntp_timestamp) + } + + pub fn calculate_pts( + &mut self, + ssrc_val: u32, + timestamp: u32, + arrival_time: u64, + ) -> (u64, Option) { + let ssrc = self.ssrcs.get_mut(&ssrc_val).unwrap(); + let clock_rate = ssrc.clock_rate.unwrap() as u64; + + // Calculate an extended timestamp, calculations only work with extended timestamps + // from that point on + let rtp_ext_ns = ssrc + .extended_timestamp + .next(timestamp) + .mul_div_round(1_000_000_000, clock_rate) + .unwrap(); + + // Now potentially correct the skew by observing how RTP times and arrival times progress + let mut pts = match self.mode { + TimestampingMode::Skew => { + let (skew_corrected, discont) = ssrc.observations.process(rtp_ext_ns, arrival_time); + trace!( + "{ssrc_val:#08x} using skew corrected RTP ext: {}", + skew_corrected + ); + + if discont { + ssrc.reset_times(); + debug!("{ssrc_val:#08x} times reset after observations discontinuity"); + if let Some(ref cname) = ssrc.cname { + self.cname_to_largest_delays.remove(cname); + } + } + + skew_corrected + } + TimestampingMode::Rtp => { + trace!("{ssrc_val:#08x} using uncorrected RTP ext: {}", rtp_ext_ns); + + rtp_ext_ns + } + TimestampingMode::Arrival => { + trace!("{ssrc_val:#08x} using arrival time: {}", arrival_time); + + arrival_time + } + }; + + // Determine the first arrival time and the first RTP time for that ssrc + if ssrc.base_times.is_none() { + ssrc.base_times = Some((arrival_time, rtp_ext_ns, pts)); + } + + let (base_arrival_time, base_rtp_ext_ns, base_pts) = ssrc.base_times.unwrap(); + + // Base the PTS on the first arrival time + pts += base_arrival_time; + trace!("{ssrc_val:#08x} added up base arrival time: {}", pts); + // Now subtract the base PTS we calculated + pts = pts.saturating_sub(base_pts); + trace!("{ssrc_val:#08x} subtracted base PTS: {}", base_pts); + + trace!("{ssrc_val:#08x} PTS prior to potential SR offsetting: {pts}"); + + let mut ntp_time: Option = None; + + // TODO: add property for enabling / disabling offsetting based on + // NTP / RTP mapping, ie inter-stream synchronization + if let Some((last_sr_ntp, last_sr_rtp_ext)) = + ssrc.last_sr_ntp_timestamp.zip(ssrc.last_sr_rtp_ext) + { + let last_sr_rtp_ext_ns = last_sr_rtp_ext + .mul_div_round(1_000_000_000, clock_rate) + .unwrap(); + + // We have a new SR, we can now figure out an NTP time and calculate how it + // relates to arrival times + if ssrc.current_delay.is_none() { + if let Some(base_ntp_time) = if base_rtp_ext_ns > last_sr_rtp_ext_ns { + let rtp_range_ns = base_rtp_ext_ns - last_sr_rtp_ext_ns; + + (last_sr_ntp.as_duration().unwrap().as_nanos() as u64).checked_add(rtp_range_ns) + } else { + let rtp_range_ns = last_sr_rtp_ext_ns - base_rtp_ext_ns; + + (last_sr_ntp.as_duration().unwrap().as_nanos() as u64).checked_sub(rtp_range_ns) + } { + trace!( + "{ssrc_val:#08x} Base NTP time on first packet after new SR is {:?} ({:?})", + base_ntp_time, + Duration::from_nanos(base_ntp_time) + ); + + if base_ntp_time < base_arrival_time { + ssrc.current_delay = Some(base_arrival_time as i64 - base_ntp_time as i64); + } else { + ssrc.current_delay = + Some(-(base_ntp_time as i64 - base_arrival_time as i64)); + } + + trace!("{ssrc_val:#08x} Current delay is {:?}", ssrc.current_delay); + + if let Some(ref cname) = ssrc.cname { + // We should recalculate a new largest delay for this CNAME + self.cname_to_largest_delays.remove(cname); + } + } else { + warn!("{ssrc_val:#08x} Invalid NTP RTP time mapping, waiting for next SR"); + ssrc.last_sr_ntp_timestamp = None; + ssrc.last_sr_rtp_ext = None; + } + } + + ntp_time = if rtp_ext_ns > last_sr_rtp_ext_ns { + let rtp_range_ns = Duration::from_nanos(rtp_ext_ns - last_sr_rtp_ext_ns); + + last_sr_ntp + .as_duration() + .unwrap() + .checked_add(rtp_range_ns) + .map(NtpTime::from_duration) + } else { + let rtp_range_ns = Duration::from_nanos(last_sr_rtp_ext_ns - rtp_ext_ns); + + last_sr_ntp + .as_duration() + .unwrap() + .checked_sub(rtp_range_ns) + .map(NtpTime::from_duration) + }; + } + + // Finally, if we have a CNAME for this SSRC and we have managed to calculate + // a delay for all the other ssrcs for this CNAME, we can calculate by how much + // we need to delay this stream to sync it with the others, if at all. + if let Some(cname) = ssrc.cname.clone() { + let delay = ssrc.current_delay; + let cname_largest_delay = self + .cname_to_largest_delays + .entry(cname.clone()) + .or_insert_with(|| { + let mut cname_largest_delay = CnameLargestDelay { + largest_delay: std::i64::MIN, + all_sync: true, + }; + + trace!("{ssrc_val:#08x} searching for new largest delay"); + + let ssrc_vals = self.cnames_to_ssrcs.get(&cname).unwrap(); + + for ssrc_val in ssrc_vals { + let ssrc = self.ssrcs.get(ssrc_val).unwrap(); + + if let Some(delay) = ssrc.current_delay { + trace!("ssrc {ssrc_val:#08x} has delay {delay}",); + + if delay > cname_largest_delay.largest_delay { + cname_largest_delay.largest_delay = delay; + } + } else { + trace!("{ssrc_val:#08x} has no delay calculated yet"); + cname_largest_delay.all_sync = false; + } + } + + cname_largest_delay + }); + + trace!("{ssrc_val:#08x} Largest delay is {:?}", cname_largest_delay); + + if cname_largest_delay.all_sync { + let offset = (cname_largest_delay.largest_delay - delay.unwrap()) as u64; + + trace!("{ssrc_val:#08x} applying offset {}", offset); + + pts += offset; + } + } + + debug!("{ssrc_val:#08x} calculated PTS {pts}"); + + (pts, ntp_time) + } +} + +const WINDOW_LENGTH: u64 = 512; +const WINDOW_DURATION: u64 = 2_000_000_000; + +#[derive(Debug)] +struct Observations { + base_local_time: Option, + base_remote_time: Option, + highest_remote_time: Option, + deltas: VecDeque, + min_delta: i64, + skew: i64, + filling: bool, + window_size: usize, +} + +impl Default for Observations { + fn default() -> Self { + Self { + base_local_time: None, + base_remote_time: None, + highest_remote_time: None, + deltas: VecDeque::new(), + min_delta: 0, + skew: 0, + filling: true, + window_size: 0, + } + } +} + +impl Observations { + fn out_time(&self, base_local_time: u64, remote_diff: u64) -> (u64, bool) { + let out_time = base_local_time + remote_diff; + let out_time = if self.skew < 0 { + out_time.saturating_sub((-self.skew) as u64) + } else { + out_time + (self.skew as u64) + }; + + trace!("Skew {}, min delta {}", self.skew, self.min_delta); + trace!("Outputting {}", out_time); + + (out_time, false) + } + + // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from + // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays": + // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 + fn process(&mut self, remote_time: u64, local_time: u64) -> (u64, bool) { + trace!("Local time {}, remote time {}", local_time, remote_time,); + + let (base_remote_time, base_local_time) = + match (self.base_remote_time, self.base_local_time) { + (Some(remote), Some(local)) => (remote, local), + _ => { + debug!( + "Initializing base time: local {}, remote {}", + local_time, remote_time, + ); + self.base_remote_time = Some(remote_time); + self.base_local_time = Some(local_time); + self.highest_remote_time = Some(remote_time); + + return (local_time, false); + } + }; + + let highest_remote_time = self.highest_remote_time.unwrap(); + + let remote_diff = remote_time.saturating_sub(base_remote_time); + + /* Only update observations when remote times progress forward */ + if remote_time <= highest_remote_time { + return self.out_time(base_local_time, remote_diff); + } + + self.highest_remote_time = Some(remote_time); + + let local_diff = local_time.saturating_sub(base_local_time); + let delta = (local_diff as i64) - (remote_diff as i64); + + trace!( + "Local diff {}, remote diff {}, delta {}", + local_diff, + remote_diff, + delta, + ); + + if remote_diff > 0 && local_diff > 0 { + let slope = (local_diff as f64) / (remote_diff as f64); + if !(0.8..1.2).contains(&slope) { + warn!("Too small/big slope {}, resetting", slope); + + let discont = !self.deltas.is_empty(); + *self = Observations::default(); + + debug!( + "Initializing base time: local {}, remote {}", + local_time, remote_time, + ); + self.base_remote_time = Some(remote_time); + self.base_local_time = Some(local_time); + self.highest_remote_time = Some(remote_time); + + return (local_time, discont); + } + } + + if (delta > self.skew && delta - self.skew > 1_000_000_000) + || (delta < self.skew && self.skew - delta > 1_000_000_000) + { + warn!("Delta {} too far from skew {}, resetting", delta, self.skew); + + let discont = !self.deltas.is_empty(); + *self = Observations::default(); + + debug!( + "Initializing base time: local {}, remote {}", + local_time, remote_time, + ); + self.base_remote_time = Some(remote_time); + self.base_local_time = Some(local_time); + self.highest_remote_time = Some(remote_time); + + return (local_time, discont); + } + + if self.filling { + if self.deltas.is_empty() || delta < self.min_delta { + self.min_delta = delta; + } + self.deltas.push_back(delta); + + if remote_diff > WINDOW_DURATION || self.deltas.len() as u64 == WINDOW_LENGTH { + self.window_size = self.deltas.len(); + self.skew = self.min_delta; + self.filling = false; + } else { + let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64; + let perc_window = (self.deltas.len() as u64) + .mul_div_floor(100, WINDOW_LENGTH) + .unwrap() as i64; + let perc = std::cmp::max(perc_time, perc_window); + + self.skew = (perc * self.min_delta + ((10_000 - perc) * self.skew)) / 10_000; + } + } else { + let old = self.deltas.pop_front().unwrap(); + self.deltas.push_back(delta); + + if delta <= self.min_delta { + self.min_delta = delta; + } else if old == self.min_delta { + self.min_delta = self.deltas.iter().copied().min().unwrap(); + } + + self.skew = (self.min_delta + (124 * self.skew)) / 125; + } + + self.out_time(base_local_time, remote_diff) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use crate::rtpbin2::session::tests::init_logs; + use crate::rtpbin2::time::system_time_to_ntp_time_u64; + + #[test] + fn test_single_stream_no_sr() { + init_logs(); + + let mut ctx = Context::new(TimestampingMode::Rtp); + + let mut now = 0; + + ctx.set_clock_rate(0x12345678, 90000); + + assert_eq!(ctx.calculate_pts(0x12345678, 0, now), (0, None)); + now += 1_000_000_000; + assert_eq!( + ctx.calculate_pts(0x12345678, 90000, now), + (1_000_000_000, None) + ); + } + + #[test] + fn test_single_stream_with_sr() { + init_logs(); + + let mut ctx = Context::new(TimestampingMode::Rtp); + + let mut now = 0; + + ctx.set_clock_rate(0x12345678, 90000); + + ctx.add_sender_report( + 0x12345678, + 0, + system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(), + ); + + assert_eq!( + ctx.calculate_pts(0x12345678, 0, now), + (0, Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH))) + ); + now += 1_000_000_000; + assert_eq!( + ctx.calculate_pts(0x12345678, 90000, now), + ( + 1_000_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(1000) + )) + ) + ); + } + + #[test] + fn test_two_streams_with_sr() { + init_logs(); + + let mut ctx = Context::new(TimestampingMode::Rtp); + + let mut now = 0; + + ctx.set_clock_rate(0x12345, 90000); + ctx.set_clock_rate(0x67890, 90000); + ctx.associate(0x12345, "foo@bar"); + ctx.associate(0x67890, "foo@bar"); + + ctx.add_sender_report( + 0x12345, + 0, + system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(), + ); + + ctx.add_sender_report( + 0x67890, + 0, + system_time_to_ntp_time_u64(std::time::UNIX_EPOCH + Duration::from_millis(500)) + .as_u64(), + ); + + // NTP time 0 + assert_eq!( + ctx.calculate_pts(0x12345, 0, now), + (0, Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH))) + ); + now += 500_000_000; + + // NTP time 500, arrival time 500 + assert_eq!( + ctx.calculate_pts(0x12345, 45000, now), + ( + 500_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(500) + )) + ) + ); + // NTP time 500, arrival time 500 + assert_eq!( + ctx.calculate_pts(0x67890, 0, now), + ( + 500_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(500) + )) + ) + ); + now += 500_000_000; + // NTP time 1000, arrival time 1000 + assert_eq!( + ctx.calculate_pts(0x12345, 90000, now), + ( + 1_000_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(1000) + )) + ) + ); + now += 500_000_000; + // NTP time 1500, arrival time 1500 + assert_eq!( + ctx.calculate_pts(0x67890, 90000, now), + ( + 1_500_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(1500) + )) + ) + ); + } + + #[test] + fn test_two_streams_no_sr_and_offset_arrival_times() { + init_logs(); + + let mut ctx = Context::new(TimestampingMode::Rtp); + + let mut now = 0; + + ctx.set_clock_rate(0x12345, 90000); + ctx.set_clock_rate(0x67890, 90000); + ctx.associate(0x12345, "foo@bar"); + ctx.associate(0x67890, "foo@bar"); + + assert_eq!(ctx.calculate_pts(0x12345, 0, now), (0, None)); + + now += 500_000_000; + + assert_eq!(ctx.calculate_pts(0x67890, 0, now), (500_000_000, None)); + assert_eq!(ctx.calculate_pts(0x12345, 45000, now), (500_000_000, None)); + } + + #[test] + fn test_two_streams_with_same_sr_and_offset_arrival_times() { + init_logs(); + + let mut ctx = Context::new(TimestampingMode::Rtp); + + let mut now = 0; + + ctx.set_clock_rate(0x12345, 90000); + ctx.set_clock_rate(0x67890, 90000); + ctx.associate(0x12345, "foo@bar"); + ctx.associate(0x67890, "foo@bar"); + + ctx.add_sender_report( + 0x12345, + 0, + system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(), + ); + + ctx.add_sender_report( + 0x67890, + 0, + system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(), + ); + + assert_eq!( + ctx.calculate_pts(0x12345, 0, now), + (0, Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH))) + ); + + now += 500_000_000; + + assert_eq!( + ctx.calculate_pts(0x67890, 0, now), + ( + 500_000_000, + Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH)) + ) + ); + + assert_eq!( + ctx.calculate_pts(0x12345, 45000, now), + ( + 1_000_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(500) + )) + ) + ); + + now += 500_000_000; + + assert_eq!( + ctx.calculate_pts(0x67890, 45000, now), + ( + 1_000_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(500) + )) + ) + ); + + // Now remove the delayed source and observe that the offset is gone + // for the other source + + ctx.remove_ssrc(0x67890); + + assert_eq!( + ctx.calculate_pts(0x12345, 90000, now), + ( + 1_000_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(1000) + )) + ) + ); + } + + #[test] + fn test_two_streams_with_sr_different_cnames() { + init_logs(); + + let mut ctx = Context::new(TimestampingMode::Rtp); + + let mut now = 0; + + ctx.set_clock_rate(0x12345, 90000); + ctx.set_clock_rate(0x67890, 90000); + ctx.associate(0x12345, "foo@bar"); + ctx.associate(0x67890, "foo@baz"); + + ctx.add_sender_report( + 0x12345, + 0, + system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(), + ); + + ctx.add_sender_report( + 0x67890, + 0, + system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(), + ); + + assert_eq!( + ctx.calculate_pts(0x12345, 0, now), + (0, Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH))) + ); + + now += 500_000_000; + + assert_eq!( + ctx.calculate_pts(0x67890, 0, now), + ( + 500_000_000, + Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH)) + ) + ); + + assert_eq!( + ctx.calculate_pts(0x12345, 45000, now), + ( + 500_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(500) + )) + ) + ); + + now += 500_000_000; + + assert_eq!( + ctx.calculate_pts(0x67890, 45000, now), + ( + 1_000_000_000, + Some(system_time_to_ntp_time_u64( + std::time::UNIX_EPOCH + Duration::from_millis(500) + )) + ) + ); + } +} diff --git a/net/rtp/src/rtpbin2/time.rs b/net/rtp/src/rtpbin2/time.rs index 7373e21a..61b9afcf 100644 --- a/net/rtp/src/rtpbin2/time.rs +++ b/net/rtp/src/rtpbin2/time.rs @@ -19,6 +19,10 @@ impl NtpTime { Self((dur.as_secs_f64() * F32) as u64) } + pub fn as_duration(&self) -> Result { + Duration::try_from_secs_f64(self.0 as f64 / F32) + } + pub fn as_u32(self) -> u32 { ((self.0 >> 16) & 0xffffffff) as u32 } diff --git a/net/rtp/src/utils.rs b/net/rtp/src/utils.rs index 2812a60b..b53f8e90 100644 --- a/net/rtp/src/utils.rs +++ b/net/rtp/src/utils.rs @@ -306,6 +306,55 @@ macro_rules! define_wrapping_comparable_u32_with_display { }; } +/// Stores information necessary to compute a series of extended timestamps +#[derive(Default, Debug)] +pub(crate) struct ExtendedTimestamp { + last_ext: Option, +} + +impl ExtendedTimestamp { + /// Produces the next extended timestamp from a new RTP timestamp + pub(crate) fn next(&mut self, rtp_timestamp: u32) -> u64 { + let ext = match self.last_ext { + None => (1u64 << 32) + rtp_timestamp as u64, + Some(last_ext) => { + // pick wraparound counter from previous timestamp and add to new timestamp + let mut ext = rtp_timestamp as u64 + (last_ext & !0xffffffff); + + // check for timestamp wraparound + if ext < last_ext { + let diff = last_ext - ext; + + if diff > std::i32::MAX as u64 { + // timestamp went backwards more than allowed, we wrap around and get + // updated extended timestamp. + ext += 1u64 << 32; + } + } else { + let diff = ext - last_ext; + + if diff > std::i32::MAX as u64 { + if ext < 1u64 << 32 { + // We can't ever get to such a case as our counter is opaque + unreachable!() + } else { + ext -= 1u64 << 32; + // We don't want the extended timestamp storage to go back, ever + return ext; + } + } + } + + ext + } + }; + + self.last_ext = Some(ext); + + ext + } +} + /// Stores information necessary to compute a series of extended seqnums #[derive(Default, Debug)] pub(crate) struct ExtendedSeqnum { @@ -586,6 +635,77 @@ mod tests { assert_eq!(try_cmp(0, 0x8000_0000), Err(ComparisonLimit)); } + #[test] + fn extended_timestamp_basic() { + let mut ext_ts = ExtendedTimestamp::default(); + + // No wraparound when timestamps are increasing + assert_eq!(ext_ts.next(0), (1 << 32)); + assert_eq!(ext_ts.next(10), (1 << 32) + 10); + assert_eq!(ext_ts.next(10), (1 << 32) + 10); + assert_eq!( + ext_ts.next(1 + std::i32::MAX as u32), + (1 << 32) + 1 + std::i32::MAX as u64 + ); + + // Even big bumps under G_MAXINT32 don't result in wrap-around + ext_ts = ExtendedTimestamp::default(); + + assert_eq!(ext_ts.next(1087500), (1 << 32) + 1087500); + assert_eq!(ext_ts.next(24), (1 << 32) + 24); + } + + #[test] + fn extended_timestamp_wraparound() { + let mut ext_ts = ExtendedTimestamp::default(); + assert_eq!( + ext_ts.next(std::u32::MAX - 90000 + 1), + (1 << 32) + std::u32::MAX as u64 - 90000 + 1 + ); + assert_eq!(ext_ts.next(0), (1 << 32) + std::u32::MAX as u64 + 1); + assert_eq!( + ext_ts.next(90000), + (1 << 32) + std::u32::MAX as u64 + 1 + 90000 + ); + } + + #[test] + fn extended_timestamp_wraparound_disordered() { + let mut ext_ts = ExtendedTimestamp::default(); + + assert_eq!( + ext_ts.next(std::u32::MAX - 90000 + 1), + (1 << 32) + std::u32::MAX as u64 - 90000 + 1 + ); + assert_eq!(ext_ts.next(0), (1 << 32) + std::u32::MAX as u64 + 1); + + // Unwrapping around + assert_eq!( + ext_ts.next(std::u32::MAX - 90000 + 1), + (1 << 32) + std::u32::MAX as u64 - 90000 + 1 + ); + assert_eq!( + ext_ts.next(90000), + (1 << 32) + std::u32::MAX as u64 + 1 + 90000 + ); + } + + #[test] + fn extended_timestamp_wraparound_disordered_backwards() { + let mut ext_ts = ExtendedTimestamp::default(); + + assert_eq!(ext_ts.next(90000), (1 << 32) + 90000); + + // Wraps backwards + assert_eq!( + ext_ts.next(std::u32::MAX - 90000 + 1), + std::u32::MAX as u64 - 90000 + 1 + ); + + // Wraps again forwards + assert_eq!(ext_ts.next(90000), (1 << 32) + 90000); + } + #[test] fn extended_seqnum_basic() { let mut ext_seq = ExtendedSeqnum::default();