rtpbin2: implement and use synchronization context

Co-authored-by: Sebastian Dröge <sebastian@centricular.com>
Co-Authored-By: Matthew Waters <matthew@centricular.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Mathieu Duponchelle 2024-01-04 17:31:36 +01:00 committed by Matthew Waters
parent 1865899621
commit 74ec83a0ff
7 changed files with 1247 additions and 55 deletions

View file

@ -7297,6 +7297,18 @@
"readable": true,
"type": "guint",
"writable": false
},
"timestamping-mode": {
"blurb": "Govern how to pick presentation timestamps for packets",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "skew (2)",
"mutable": "ready",
"readable": true,
"type": "GstRtpBin2TimestampingMode",
"writable": true
}
},
"rank": "none"
@ -8522,6 +8534,26 @@
}
}
},
"GstRtpBin2TimestampingMode": {
"kind": "enum",
"values": [
{
"desc": "Use arrival time as timestamp",
"name": "arrival",
"value": "0"
},
{
"desc": "Use RTP timestamps as is",
"name": "rtp",
"value": "1"
},
{
"desc": "Correct skew to synchronize sender and receiver clocks",
"name": "skew",
"value": "2"
}
]
},
"GstRtpGCCBwEEstimator": {
"kind": "enum",
"values": [

View file

@ -19,6 +19,7 @@ use super::session::{
Session, RTCP_MIN_REPORT_INTERVAL,
};
use super::source::{ReceivedRb, SourceState};
use super::sync;
use crate::rtpbin2::RUNTIME;
@ -69,6 +70,7 @@ struct Settings {
min_rtcp_interval: Duration,
profile: Profile,
reduced_size_rtcp: bool,
timestamping_mode: sync::TimestampingMode,
}
impl Default for Settings {
@ -78,6 +80,7 @@ impl Default for Settings {
min_rtcp_interval: DEFAULT_MIN_RTCP_INTERVAL,
profile: Profile::default(),
reduced_size_rtcp: DEFAULT_REDUCED_SIZE_RTCP,
timestamping_mode: sync::TimestampingMode::default(),
}
}
}
@ -390,7 +393,9 @@ impl BinSessionInner {
};
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, pad.push(buffer));
let flow = pad.push(buffer);
gst::trace!(CAT, obj: pad, "Pushed buffer, flow ret {:?}", flow);
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow);
// TODO: store flow, return only on session pads?
})?;
@ -496,6 +501,7 @@ struct State {
rtcp_waker: Option<Waker>,
max_session_id: usize,
pads_session_id_map: HashMap<gst::Pad, usize>,
sync_context: Option<sync::Context>,
}
impl State {
@ -854,11 +860,11 @@ impl RtpBin2 {
fn rtp_recv_sink_chain(
&self,
_pad: &gst::Pad,
pad: &gst::Pad,
id: usize,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap();
let Some(session) = state.session_by_id(id) else {
return Err(gst::FlowError::Error);
};
@ -869,10 +875,29 @@ impl RtpBin2 {
//
// Check if this makes sense or if this leads to issue with eg interleaved
// TCP.
if buffer.dts().is_none() {
let buf_mut = buffer.make_mut();
buf_mut.set_dts(self.obj().current_running_time());
}
let arrival_time = match buffer.dts() {
Some(dts) => {
let session_inner = session.inner.lock().unwrap();
let segment = session_inner.rtp_recv_sink_segment.as_ref().unwrap();
// TODO: use running_time_full if we care to support that
match segment.to_running_time(dts) {
Some(time) => time,
None => {
gst::error!(CAT, obj: pad, "out of segment DTS are not supported");
return Err(gst::FlowError::Error);
}
}
}
None => match self.obj().current_running_time() {
Some(time) => time,
None => {
gst::error!(CAT, obj: pad, "Failed to get current time");
return Err(gst::FlowError::Error);
}
},
};
gst::trace!(CAT, obj: pad, "using arrival time {}", arrival_time);
let addr: Option<SocketAddr> =
buffer
@ -906,7 +931,45 @@ impl RtpBin2 {
};
let session = session.clone();
let mut session_inner = session.inner.lock().unwrap();
let current_caps = session_inner.rtp_recv_sink_caps.clone();
let ssrc_map = session_inner
.caps_map
.entry(rtp.payload_type())
.or_default();
if ssrc_map.get(&rtp.ssrc()).is_none() {
if let Some(mut caps) =
current_caps.filter(|caps| Self::clock_rate_from_caps(caps).is_some())
{
state
.sync_context
.as_mut()
.unwrap()
.set_clock_rate(rtp.ssrc(), Self::clock_rate_from_caps(&caps).unwrap());
{
// Ensure the caps we send out hold a payload field
let caps = caps.make_mut();
let s = caps.structure_mut(0).unwrap();
s.set("payload", rtp.payload_type() as i32);
}
ssrc_map.insert(rtp.ssrc(), caps);
}
}
// TODO: Put NTP time as `gst::ReferenceTimeStampMeta` on the buffers if selected via property
let (pts, _ntp_time) = state.sync_context.as_mut().unwrap().calculate_pts(
rtp.ssrc(),
rtp.timestamp(),
arrival_time.nseconds(),
);
let segment = session_inner.rtp_recv_sink_segment.as_ref().unwrap();
let pts = segment
.position_from_running_time(gst::ClockTime::from_nseconds(pts))
.unwrap();
gst::debug!(CAT, "Calculated PTS: {}", pts);
drop(state);
// Start jitterbuffer task now if not started yet
@ -922,6 +985,10 @@ impl RtpBin2 {
let pt = rtp.payload_type();
let ssrc = rtp.ssrc();
drop(mapped);
{
let buf_mut = buffer.make_mut();
buf_mut.set_pts(pts);
}
let (pad, new_pad) = session_inner.get_or_create_rtp_recv_src(self, pt, ssrc);
session_inner.recv_store.push(HeldRecvBuffer {
hold_id: Some(hold_id),
@ -956,6 +1023,10 @@ impl RtpBin2 {
let pt = rtp.payload_type();
let ssrc = rtp.ssrc();
drop(mapped);
{
let buf_mut = buffer.make_mut();
buf_mut.set_pts(pts);
}
let (pad, new_pad) = session_inner.get_or_create_rtp_recv_src(self, pt, ssrc);
buffers_to_push.push(HeldRecvBuffer {
hold_id: None,
@ -998,7 +1069,7 @@ impl RtpBin2 {
match jitterbuffer_store.jitterbuffer.queue(
&rtp,
held.buffer.dts().unwrap().nseconds(),
held.buffer.pts().unwrap().nseconds(),
now,
) {
jitterbuffer::QueueResult::Queued(id) => {
@ -1136,6 +1207,20 @@ impl RtpBin2 {
gst::debug!(CAT, imp: self, "Can't send force-keyunit event because of missing sinkpad");
}
}
RtcpRecvReply::NewCName((cname, ssrc)) => {
let mut state = self.state.lock().unwrap();
state.sync_context.as_mut().unwrap().associate(ssrc, &cname);
}
RtcpRecvReply::NewRtpNtp((ssrc, rtp, ntp)) => {
let mut state = self.state.lock().unwrap();
state
.sync_context
.as_mut()
.unwrap()
.add_sender_report(ssrc, rtp, ntp);
}
}
}
drop(mapped);
@ -1304,6 +1389,20 @@ impl RtpBin2 {
}
}
fn clock_rate_from_caps(caps: &gst::CapsRef) -> Option<u32> {
let Some(s) = caps.structure(0) else {
return None;
};
let Some(clock_rate) = s.get::<i32>("clock-rate").ok() else {
return None;
};
if clock_rate > 0 {
Some(clock_rate as u32)
} else {
None
}
}
fn pt_clock_rate_from_caps(caps: &gst::CapsRef) -> Option<(u8, u32)> {
let Some(s) = caps.structure(0) else {
return None;
@ -1430,6 +1529,12 @@ impl ObjectImpl for RtpBin2 {
.default_value(DEFAULT_REDUCED_SIZE_RTCP)
.mutable_ready()
.build(),
glib::ParamSpecEnum::builder::<sync::TimestampingMode>("timestamping-mode")
.nick("Timestamping Mode")
.blurb("Govern how to pick presentation timestamps for packets")
.default_value(sync::TimestampingMode::default())
.mutable_ready()
.build(),
]
});
@ -1465,6 +1570,12 @@ impl ObjectImpl for RtpBin2 {
let mut settings = self.settings.lock().unwrap();
settings.reduced_size_rtcp = value.get::<bool>().expect("Type checked upstream");
}
"timestamping-mode" => {
let mut settings = self.settings.lock().unwrap();
settings.timestamping_mode = value
.get::<sync::TimestampingMode>()
.expect("Type checked upstream");
}
_ => unimplemented!(),
}
}
@ -1491,6 +1602,10 @@ impl ObjectImpl for RtpBin2 {
let settings = self.settings.lock().unwrap();
settings.reduced_size_rtcp.to_value()
}
"timestamping-mode" => {
let settings = self.settings.lock().unwrap();
settings.timestamping_mode.to_value()
}
_ => unimplemented!(),
}
}
@ -1576,7 +1691,6 @@ impl ElementImpl for RtpBin2 {
name: Option<&str>,
_caps: Option<&gst::Caps>, // XXX: do something with caps?
) -> Option<gst::Pad> {
let this = self.obj();
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.lock().unwrap();
let max_session_id = state.max_session_id;
@ -1599,7 +1713,12 @@ impl ElementImpl for RtpBin2 {
match templ.name_template() {
"rtp_send_sink_%u" => {
sess_parse(name, "rtp_send_sink_", max_session_id).and_then(|id| {
let new_pad = move |session: &mut BinSessionInner| -> Option<(gst::Pad, Option<gst::Pad>, usize)> {
let new_pad = move |session: &mut BinSessionInner| -> Option<(
gst::Pad,
Option<gst::Pad>,
usize,
Vec<gst::Event>,
)> {
let sinkpad = gst::Pad::builder_from_template(templ)
.chain_function(move |_pad, parent, buffer| {
RtpBin2::catch_panic_pad_function(
@ -1609,28 +1728,36 @@ impl ElementImpl for RtpBin2 {
)
})
.iterate_internal_links_function(|pad, parent| {
RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad))
RtpBin2::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this| this.iterate_internal_links(pad),
)
})
.event_function(move |pad, parent, event| {
RtpBin2::catch_panic_pad_function(
parent,
|| false,
|this| this.rtp_send_sink_event(pad, event, id),
)
})
.event_function(move |pad, parent, event|
RtpBin2::catch_panic_pad_function(parent, || false, |this| this.rtp_send_sink_event(pad, event, id))
)
.flags(gst::PadFlags::PROXY_CAPS)
.name(format!("rtp_send_sink_{}", id))
.build();
sinkpad.set_active(true).unwrap();
this.add_pad(&sinkpad).unwrap();
let src_templ = self.obj().pad_template("rtp_send_src_%u").unwrap();
let srcpad = gst::Pad::builder_from_template(&src_templ)
.iterate_internal_links_function(|pad, parent| {
RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad))
RtpBin2::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this| this.iterate_internal_links(pad),
)
})
.name(format!("rtp_send_src_{}", id))
.build();
srcpad.set_active(true).unwrap();
this.add_pad(&srcpad).unwrap();
session.rtp_send_sinkpad = Some(sinkpad.clone());
session.rtp_send_srcpad = Some(srcpad.clone());
Some((sinkpad, Some(srcpad), id))
Some((sinkpad, Some(srcpad), id, vec![]))
};
let session = state.session_by_id(id);
@ -1653,7 +1780,12 @@ impl ElementImpl for RtpBin2 {
}
"rtp_recv_sink_%u" => {
sess_parse(name, "rtp_recv_sink_", max_session_id).and_then(|id| {
let new_pad = move |session: &mut BinSessionInner| -> Option<(gst::Pad, Option<gst::Pad>, usize)> {
let new_pad = move |session: &mut BinSessionInner| -> Option<(
gst::Pad,
Option<gst::Pad>,
usize,
Vec<gst::Event>,
)> {
let sinkpad = gst::Pad::builder_from_template(templ)
.chain_function(move |pad, parent, buffer| {
RtpBin2::catch_panic_pad_function(
@ -1663,17 +1795,23 @@ impl ElementImpl for RtpBin2 {
)
})
.iterate_internal_links_function(|pad, parent| {
RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad))
RtpBin2::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this| this.iterate_internal_links(pad),
)
})
.event_function(move |pad, parent, event| {
RtpBin2::catch_panic_pad_function(
parent,
|| false,
|this| this.rtp_recv_sink_event(pad, event, id),
)
})
.event_function(move |pad, parent, event|
RtpBin2::catch_panic_pad_function(parent, || false, |this| this.rtp_recv_sink_event(pad, event, id))
)
.name(format!("rtp_recv_sink_{}", id))
.build();
sinkpad.set_active(true).unwrap();
this.add_pad(&sinkpad).unwrap();
session.rtp_recv_sinkpad = Some(sinkpad.clone());
Some((sinkpad, None, id))
Some((sinkpad, None, id, vec![]))
};
let session = state.session_by_id(id);
@ -1710,14 +1848,16 @@ impl ElementImpl for RtpBin2 {
)
})
.iterate_internal_links_function(|pad, parent| {
RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad))
RtpBin2::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this| this.iterate_internal_links(pad),
)
})
.name(format!("rtcp_recv_sink_{}", id))
.build();
sinkpad.set_active(true).unwrap();
this.add_pad(&sinkpad).unwrap();
session.rtcp_recv_sinkpad = Some(sinkpad.clone());
Some((sinkpad, None, id))
Some((sinkpad, None, id, vec![]))
}
})
})
@ -1731,10 +1871,13 @@ impl ElementImpl for RtpBin2 {
if session.rtcp_send_srcpad.is_some() {
None
} else {
let this = self.obj();
let srcpad = gst::Pad::builder_from_template(templ)
.iterate_internal_links_function(|pad, parent| {
RtpBin2::catch_panic_pad_function(parent, || gst::Iterator::from_vec(vec![]), |this| this.iterate_internal_links(pad))
RtpBin2::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this| this.iterate_internal_links(pad),
)
})
.name(format!("rtcp_send_src_{}", id))
.build();
@ -1749,27 +1892,34 @@ impl ElementImpl for RtpBin2 {
let segment = gst::FormattedSegment::<gst::ClockTime>::new();
let segment = gst::event::Segment::new(&segment);
srcpad.set_active(true).unwrap();
let _ = srcpad.store_sticky_event(&stream_start);
let _ = srcpad.store_sticky_event(&caps);
let _ = srcpad.store_sticky_event(&segment);
this.add_pad(&srcpad).unwrap();
session.rtcp_send_srcpad = Some(srcpad.clone());
Some((srcpad, None, id))
Some((srcpad, None, id, vec![stream_start, caps, segment]))
}
})
})
}
_ => None,
}
.map(|(pad, otherpad, id)| {
.map(|(pad, otherpad, id, sticky_events)| {
state.max_session_id = (id + 1).max(state.max_session_id);
state.pads_session_id_map.insert(pad.clone(), id);
if let Some(pad) = otherpad {
state.pads_session_id_map.insert(pad, id);
if let Some(ref pad) = otherpad {
state.pads_session_id_map.insert(pad.clone(), id);
}
drop(state);
pad.set_active(true).unwrap();
for event in sticky_events {
let _ = pad.store_sticky_event(&event);
}
self.obj().add_pad(&pad).unwrap();
if let Some(pad) = otherpad {
pad.set_active(true).unwrap();
self.obj().add_pad(&pad).unwrap();
}
pad
})
}
@ -1838,20 +1988,28 @@ impl ElementImpl for RtpBin2 {
self.parent_release_pad(pad)
}
#[allow(clippy::single_match)]
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::ReadyToPaused => {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
state.sync_context = Some(sync::Context::new(settings.timestamping_mode));
}
_ => (),
}
let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToNull => {
self.stop_rtcp_task();
}
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PlayingToPaused => {
gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
@ -1879,6 +2037,7 @@ impl ElementImpl for RtpBin2 {
for pad in removed_pads.iter() {
state.pads_session_id_map.remove(pad);
}
state.sync_context = None;
drop(state);
for pad in removed_pads {
@ -1891,6 +2050,7 @@ impl ElementImpl for RtpBin2 {
}
_ => (),
}
Ok(success)
}
}

View file

@ -7,6 +7,7 @@ mod imp;
mod jitterbuffer;
mod session;
mod source;
mod sync;
mod time;
glib::wrapper! {
@ -14,6 +15,11 @@ glib::wrapper! {
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
{
crate::rtpbin2::sync::TimestampingMode::static_type()
.mark_as_plugin_api(gst::PluginAPIFlags::empty());
}
gst::Element::register(
Some(plugin),
"rtpbin2",

View file

@ -147,6 +147,10 @@ pub enum RtcpRecvReply {
TimerReconsideration,
/// Request a key unit for the given SSRC of ours
RequestKeyUnit { ssrcs: Vec<u32>, fir: bool },
/// A new cname to ssrc mapping was found in a sdes: (cname, ssrc)
NewCName((String, u32)),
/// A new RTP to NTP mapping was received for an ssrc: (ssrc, RTP, NTP)
NewRtpNtp((u32, u32, u64)),
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@ -625,6 +629,12 @@ impl Session {
sr.packet_count(),
);
replies.push(RtcpRecvReply::NewRtpNtp((
sr.ssrc(),
sr.rtp_timestamp(),
sr.ntp_timestamp(),
)));
for rb in sr.report_blocks() {
if let Some(reply) = self.handle_rb(sr.ssrc(), rb, from, now, ntp_time) {
replies.push(reply);
@ -675,6 +685,15 @@ impl Session {
source.set_state(SourceState::Normal);
source.set_last_activity(now);
}
if item.type_() == SdesItem::CNAME {
if let Ok(s) = std::str::from_utf8(item.value()) {
replies.push(RtcpRecvReply::NewCName((
s.to_owned(),
chunk.ssrc(),
)));
}
}
}
}
}
@ -1086,6 +1105,8 @@ impl Session {
}
// RFC 3550 6.3.5
// FIXME: we should surface this information to the element in order
// to perform clean up of the sync context
fn handle_timeouts(&mut self, now: Instant) {
trace!("handling rtcp timeouts");
let td = RTCP_SOURCE_TIMEOUT_N_INTERVALS
@ -1917,7 +1938,18 @@ pub(crate) mod tests {
assert_eq!(
session.handle_rtcp_recv(rtcp, len, None, now, ntp_now),
vec![]
vec![
RtcpRecvReply::NewRtpNtp((
ssrcs[0],
4,
system_time_to_ntp_time_u64(ntp_now).as_u64()
)),
RtcpRecvReply::NewRtpNtp((
ssrcs[1],
20,
system_time_to_ntp_time_u64(ntp_now).as_u64()
))
]
);
let (rtcp_data, _new_now, new_ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
@ -2211,7 +2243,10 @@ pub(crate) mod tests {
let rtcp = Compound::parse(&data[..len]).unwrap();
assert_eq!(
session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now),
vec![RtcpRecvReply::NewSsrc(ssrc)]
vec![
RtcpRecvReply::NewSsrc(ssrc),
RtcpRecvReply::NewCName(("cname".to_string(), ssrc))
]
);
let rtp_data = generate_rtp_packet(ssrc, 500, 0, 4);
@ -2235,7 +2270,10 @@ pub(crate) mod tests {
let rtcp = Compound::parse(&data[..len]).unwrap();
assert_eq!(
session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now),
vec![RtcpRecvReply::NewSsrc(new_ssrc)]
vec![
RtcpRecvReply::NewSsrc(new_ssrc),
RtcpRecvReply::NewCName(("cname".to_string(), new_ssrc))
]
);
let rtp_data = generate_rtp_packet(new_ssrc, 510, 10, 4);
@ -2484,7 +2522,10 @@ pub(crate) mod tests {
let rtcp = Compound::parse(&data[..len]).unwrap();
assert_eq!(
session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now),
vec![RtcpRecvReply::NewSsrc(recv_ssrc)]
vec![
RtcpRecvReply::NewSsrc(recv_ssrc),
RtcpRecvReply::NewCName(("cname1".to_string(), recv_ssrc))
]
);
assert!(session.is_point_to_point);
@ -2508,7 +2549,11 @@ pub(crate) mod tests {
let rtcp = Compound::parse(&data[..len]).unwrap();
assert_eq!(
session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now),
vec![RtcpRecvReply::NewSsrc(recv2_ssrc)]
vec![
RtcpRecvReply::NewCName(("cname1".to_string(), recv_ssrc)),
RtcpRecvReply::NewSsrc(recv2_ssrc),
RtcpRecvReply::NewCName(("cname1".to_string(), recv2_ssrc))
]
);
assert!(session.is_point_to_point);
@ -2532,7 +2577,10 @@ pub(crate) mod tests {
let rtcp = Compound::parse(&data[..len]).unwrap();
assert_eq!(
session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now),
vec![]
vec![
RtcpRecvReply::NewCName(("cname1".to_string(), recv_ssrc)),
RtcpRecvReply::NewCName(("cname2".to_string(), recv2_ssrc))
]
);
assert!(!session.is_point_to_point);
}

822
net/rtp/src/rtpbin2/sync.rs Normal file
View file

@ -0,0 +1,822 @@
use gst::glib;
use gst::prelude::MulDiv;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use crate::utils::ExtendedTimestamp;
use super::time::NtpTime;
#[derive(Default, Debug)]
struct Ssrc {
cname: Option<Arc<str>>,
clock_rate: Option<u32>,
extended_timestamp: ExtendedTimestamp,
last_sr_ntp_timestamp: Option<NtpTime>,
last_sr_rtp_ext: Option<u64>,
// Arrival, RTP timestamp (extended), PTS (potentially skew-corrected)
base_times: Option<(u64, u64, u64)>,
current_delay: Option<i64>,
observations: Observations,
}
impl Ssrc {
fn new(clock_rate: Option<u32>) -> Self {
Self {
clock_rate,
..Default::default()
}
}
fn reset_times(&mut self) {
self.extended_timestamp = ExtendedTimestamp::default();
self.last_sr_ntp_timestamp = None;
self.last_sr_rtp_ext = None;
self.base_times = None;
self.current_delay = None;
self.observations = Observations::default();
}
/* Returns whether the caller should reset timing associated
* values for this ssrc (eg largest delay) */
fn set_clock_rate(&mut self, clock_rate: u32) -> bool {
if Some(clock_rate) == self.clock_rate {
// No changes
return false;
}
self.clock_rate = Some(clock_rate);
self.reset_times();
true
}
fn add_sender_report(&mut self, rtp_timestamp: u32, ntp_timestamp: u64) {
self.last_sr_rtp_ext = Some(self.extended_timestamp.next(rtp_timestamp));
self.last_sr_ntp_timestamp = Some(ntp_timestamp.into());
// Reset so that the next call to calculate_pts recalculates the NTP / RTP delay
self.current_delay = None;
}
}
#[derive(Debug)]
struct CnameLargestDelay {
largest_delay: i64,
all_sync: bool,
}
/// Govern how to pick presentation timestamps for packets
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstRtpBin2TimestampingMode")]
pub enum TimestampingMode {
/// Simply use arrival time as timestamp
#[allow(dead_code)]
#[enum_value(name = "Use arrival time as timestamp", nick = "arrival")]
Arrival,
/// Use RTP timestamps as is
#[allow(dead_code)]
#[enum_value(name = "Use RTP timestamps as is", nick = "rtp")]
Rtp,
/// Correct skew to synchronize sender and receiver clocks
#[default]
#[enum_value(
name = "Correct skew to synchronize sender and receiver clocks",
nick = "skew"
)]
Skew,
}
#[derive(Debug)]
pub struct Context {
ssrcs: HashMap<u32, Ssrc>,
mode: TimestampingMode,
cnames_to_ssrcs: HashMap<Arc<str>, Vec<u32>>,
cname_to_largest_delays: HashMap<Arc<str>, CnameLargestDelay>,
}
impl Context {
pub fn new(mode: TimestampingMode) -> Self {
Self {
ssrcs: HashMap::new(),
mode,
cnames_to_ssrcs: HashMap::new(),
cname_to_largest_delays: HashMap::new(),
}
}
pub fn set_clock_rate(&mut self, ssrc_val: u32, clock_rate: u32) {
if let Some(ssrc) = self.ssrcs.get_mut(&ssrc_val) {
if ssrc.set_clock_rate(clock_rate) {
debug!("{ssrc_val:#08x} times reset after clock rate change");
if let Some(ref cname) = ssrc.cname {
self.cname_to_largest_delays.remove(cname);
}
}
} else {
self.ssrcs.insert(ssrc_val, Ssrc::new(Some(clock_rate)));
}
}
fn disassociate(&mut self, ssrc_val: u32, cname: &str) {
self.cname_to_largest_delays.remove(cname);
if let Some(ssrcs) = self.cnames_to_ssrcs.get_mut(cname) {
ssrcs.retain(|&other| other != ssrc_val);
}
}
// FIXME: call this on timeouts / BYE (maybe collisions too?)
#[allow(dead_code)]
pub fn remove_ssrc(&mut self, ssrc_val: u32) {
if let Some(ssrc) = self.ssrcs.remove(&ssrc_val) {
debug!("{ssrc_val:#08x} ssrc removed");
if let Some(ref cname) = ssrc.cname {
self.disassociate(ssrc_val, cname)
}
}
}
pub fn associate(&mut self, ssrc_val: u32, cname: &str) {
let ssrc = self
.ssrcs
.entry(ssrc_val)
.or_insert_with(|| Ssrc::new(None));
let cname = Arc::<str>::from(cname);
if let Some(ref old_cname) = ssrc.cname {
if old_cname == &cname {
return;
}
ssrc.cname = Some(cname.clone());
self.disassociate(ssrc_val, cname.as_ref());
} else {
ssrc.cname = Some(cname.clone());
}
let ssrcs = self.cnames_to_ssrcs.entry(cname.clone()).or_default();
ssrcs.push(ssrc_val);
// Recalculate a new largest delay next time calculate_pts is called
self.cname_to_largest_delays.remove(cname.as_ref());
}
pub fn add_sender_report(&mut self, ssrc_val: u32, rtp_timestamp: u32, ntp_timestamp: u64) {
debug!("Adding new sender report for ssrc {ssrc_val:#08x}");
let ssrc = self
.ssrcs
.entry(ssrc_val)
.or_insert_with(|| Ssrc::new(None));
debug!(
"Latest NTP time: {:?}",
NtpTime::from(ntp_timestamp).as_duration().unwrap()
);
ssrc.add_sender_report(rtp_timestamp, ntp_timestamp)
}
pub fn calculate_pts(
&mut self,
ssrc_val: u32,
timestamp: u32,
arrival_time: u64,
) -> (u64, Option<NtpTime>) {
let ssrc = self.ssrcs.get_mut(&ssrc_val).unwrap();
let clock_rate = ssrc.clock_rate.unwrap() as u64;
// Calculate an extended timestamp, calculations only work with extended timestamps
// from that point on
let rtp_ext_ns = ssrc
.extended_timestamp
.next(timestamp)
.mul_div_round(1_000_000_000, clock_rate)
.unwrap();
// Now potentially correct the skew by observing how RTP times and arrival times progress
let mut pts = match self.mode {
TimestampingMode::Skew => {
let (skew_corrected, discont) = ssrc.observations.process(rtp_ext_ns, arrival_time);
trace!(
"{ssrc_val:#08x} using skew corrected RTP ext: {}",
skew_corrected
);
if discont {
ssrc.reset_times();
debug!("{ssrc_val:#08x} times reset after observations discontinuity");
if let Some(ref cname) = ssrc.cname {
self.cname_to_largest_delays.remove(cname);
}
}
skew_corrected
}
TimestampingMode::Rtp => {
trace!("{ssrc_val:#08x} using uncorrected RTP ext: {}", rtp_ext_ns);
rtp_ext_ns
}
TimestampingMode::Arrival => {
trace!("{ssrc_val:#08x} using arrival time: {}", arrival_time);
arrival_time
}
};
// Determine the first arrival time and the first RTP time for that ssrc
if ssrc.base_times.is_none() {
ssrc.base_times = Some((arrival_time, rtp_ext_ns, pts));
}
let (base_arrival_time, base_rtp_ext_ns, base_pts) = ssrc.base_times.unwrap();
// Base the PTS on the first arrival time
pts += base_arrival_time;
trace!("{ssrc_val:#08x} added up base arrival time: {}", pts);
// Now subtract the base PTS we calculated
pts = pts.saturating_sub(base_pts);
trace!("{ssrc_val:#08x} subtracted base PTS: {}", base_pts);
trace!("{ssrc_val:#08x} PTS prior to potential SR offsetting: {pts}");
let mut ntp_time: Option<NtpTime> = None;
// TODO: add property for enabling / disabling offsetting based on
// NTP / RTP mapping, ie inter-stream synchronization
if let Some((last_sr_ntp, last_sr_rtp_ext)) =
ssrc.last_sr_ntp_timestamp.zip(ssrc.last_sr_rtp_ext)
{
let last_sr_rtp_ext_ns = last_sr_rtp_ext
.mul_div_round(1_000_000_000, clock_rate)
.unwrap();
// We have a new SR, we can now figure out an NTP time and calculate how it
// relates to arrival times
if ssrc.current_delay.is_none() {
if let Some(base_ntp_time) = if base_rtp_ext_ns > last_sr_rtp_ext_ns {
let rtp_range_ns = base_rtp_ext_ns - last_sr_rtp_ext_ns;
(last_sr_ntp.as_duration().unwrap().as_nanos() as u64).checked_add(rtp_range_ns)
} else {
let rtp_range_ns = last_sr_rtp_ext_ns - base_rtp_ext_ns;
(last_sr_ntp.as_duration().unwrap().as_nanos() as u64).checked_sub(rtp_range_ns)
} {
trace!(
"{ssrc_val:#08x} Base NTP time on first packet after new SR is {:?} ({:?})",
base_ntp_time,
Duration::from_nanos(base_ntp_time)
);
if base_ntp_time < base_arrival_time {
ssrc.current_delay = Some(base_arrival_time as i64 - base_ntp_time as i64);
} else {
ssrc.current_delay =
Some(-(base_ntp_time as i64 - base_arrival_time as i64));
}
trace!("{ssrc_val:#08x} Current delay is {:?}", ssrc.current_delay);
if let Some(ref cname) = ssrc.cname {
// We should recalculate a new largest delay for this CNAME
self.cname_to_largest_delays.remove(cname);
}
} else {
warn!("{ssrc_val:#08x} Invalid NTP RTP time mapping, waiting for next SR");
ssrc.last_sr_ntp_timestamp = None;
ssrc.last_sr_rtp_ext = None;
}
}
ntp_time = if rtp_ext_ns > last_sr_rtp_ext_ns {
let rtp_range_ns = Duration::from_nanos(rtp_ext_ns - last_sr_rtp_ext_ns);
last_sr_ntp
.as_duration()
.unwrap()
.checked_add(rtp_range_ns)
.map(NtpTime::from_duration)
} else {
let rtp_range_ns = Duration::from_nanos(last_sr_rtp_ext_ns - rtp_ext_ns);
last_sr_ntp
.as_duration()
.unwrap()
.checked_sub(rtp_range_ns)
.map(NtpTime::from_duration)
};
}
// Finally, if we have a CNAME for this SSRC and we have managed to calculate
// a delay for all the other ssrcs for this CNAME, we can calculate by how much
// we need to delay this stream to sync it with the others, if at all.
if let Some(cname) = ssrc.cname.clone() {
let delay = ssrc.current_delay;
let cname_largest_delay = self
.cname_to_largest_delays
.entry(cname.clone())
.or_insert_with(|| {
let mut cname_largest_delay = CnameLargestDelay {
largest_delay: std::i64::MIN,
all_sync: true,
};
trace!("{ssrc_val:#08x} searching for new largest delay");
let ssrc_vals = self.cnames_to_ssrcs.get(&cname).unwrap();
for ssrc_val in ssrc_vals {
let ssrc = self.ssrcs.get(ssrc_val).unwrap();
if let Some(delay) = ssrc.current_delay {
trace!("ssrc {ssrc_val:#08x} has delay {delay}",);
if delay > cname_largest_delay.largest_delay {
cname_largest_delay.largest_delay = delay;
}
} else {
trace!("{ssrc_val:#08x} has no delay calculated yet");
cname_largest_delay.all_sync = false;
}
}
cname_largest_delay
});
trace!("{ssrc_val:#08x} Largest delay is {:?}", cname_largest_delay);
if cname_largest_delay.all_sync {
let offset = (cname_largest_delay.largest_delay - delay.unwrap()) as u64;
trace!("{ssrc_val:#08x} applying offset {}", offset);
pts += offset;
}
}
debug!("{ssrc_val:#08x} calculated PTS {pts}");
(pts, ntp_time)
}
}
const WINDOW_LENGTH: u64 = 512;
const WINDOW_DURATION: u64 = 2_000_000_000;
#[derive(Debug)]
struct Observations {
base_local_time: Option<u64>,
base_remote_time: Option<u64>,
highest_remote_time: Option<u64>,
deltas: VecDeque<i64>,
min_delta: i64,
skew: i64,
filling: bool,
window_size: usize,
}
impl Default for Observations {
fn default() -> Self {
Self {
base_local_time: None,
base_remote_time: None,
highest_remote_time: None,
deltas: VecDeque::new(),
min_delta: 0,
skew: 0,
filling: true,
window_size: 0,
}
}
}
impl Observations {
fn out_time(&self, base_local_time: u64, remote_diff: u64) -> (u64, bool) {
let out_time = base_local_time + remote_diff;
let out_time = if self.skew < 0 {
out_time.saturating_sub((-self.skew) as u64)
} else {
out_time + (self.skew as u64)
};
trace!("Skew {}, min delta {}", self.skew, self.min_delta);
trace!("Outputting {}", out_time);
(out_time, false)
}
// Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from
// Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays":
// http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
fn process(&mut self, remote_time: u64, local_time: u64) -> (u64, bool) {
trace!("Local time {}, remote time {}", local_time, remote_time,);
let (base_remote_time, base_local_time) =
match (self.base_remote_time, self.base_local_time) {
(Some(remote), Some(local)) => (remote, local),
_ => {
debug!(
"Initializing base time: local {}, remote {}",
local_time, remote_time,
);
self.base_remote_time = Some(remote_time);
self.base_local_time = Some(local_time);
self.highest_remote_time = Some(remote_time);
return (local_time, false);
}
};
let highest_remote_time = self.highest_remote_time.unwrap();
let remote_diff = remote_time.saturating_sub(base_remote_time);
/* Only update observations when remote times progress forward */
if remote_time <= highest_remote_time {
return self.out_time(base_local_time, remote_diff);
}
self.highest_remote_time = Some(remote_time);
let local_diff = local_time.saturating_sub(base_local_time);
let delta = (local_diff as i64) - (remote_diff as i64);
trace!(
"Local diff {}, remote diff {}, delta {}",
local_diff,
remote_diff,
delta,
);
if remote_diff > 0 && local_diff > 0 {
let slope = (local_diff as f64) / (remote_diff as f64);
if !(0.8..1.2).contains(&slope) {
warn!("Too small/big slope {}, resetting", slope);
let discont = !self.deltas.is_empty();
*self = Observations::default();
debug!(
"Initializing base time: local {}, remote {}",
local_time, remote_time,
);
self.base_remote_time = Some(remote_time);
self.base_local_time = Some(local_time);
self.highest_remote_time = Some(remote_time);
return (local_time, discont);
}
}
if (delta > self.skew && delta - self.skew > 1_000_000_000)
|| (delta < self.skew && self.skew - delta > 1_000_000_000)
{
warn!("Delta {} too far from skew {}, resetting", delta, self.skew);
let discont = !self.deltas.is_empty();
*self = Observations::default();
debug!(
"Initializing base time: local {}, remote {}",
local_time, remote_time,
);
self.base_remote_time = Some(remote_time);
self.base_local_time = Some(local_time);
self.highest_remote_time = Some(remote_time);
return (local_time, discont);
}
if self.filling {
if self.deltas.is_empty() || delta < self.min_delta {
self.min_delta = delta;
}
self.deltas.push_back(delta);
if remote_diff > WINDOW_DURATION || self.deltas.len() as u64 == WINDOW_LENGTH {
self.window_size = self.deltas.len();
self.skew = self.min_delta;
self.filling = false;
} else {
let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64;
let perc_window = (self.deltas.len() as u64)
.mul_div_floor(100, WINDOW_LENGTH)
.unwrap() as i64;
let perc = std::cmp::max(perc_time, perc_window);
self.skew = (perc * self.min_delta + ((10_000 - perc) * self.skew)) / 10_000;
}
} else {
let old = self.deltas.pop_front().unwrap();
self.deltas.push_back(delta);
if delta <= self.min_delta {
self.min_delta = delta;
} else if old == self.min_delta {
self.min_delta = self.deltas.iter().copied().min().unwrap();
}
self.skew = (self.min_delta + (124 * self.skew)) / 125;
}
self.out_time(base_local_time, remote_diff)
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::rtpbin2::session::tests::init_logs;
use crate::rtpbin2::time::system_time_to_ntp_time_u64;
#[test]
fn test_single_stream_no_sr() {
init_logs();
let mut ctx = Context::new(TimestampingMode::Rtp);
let mut now = 0;
ctx.set_clock_rate(0x12345678, 90000);
assert_eq!(ctx.calculate_pts(0x12345678, 0, now), (0, None));
now += 1_000_000_000;
assert_eq!(
ctx.calculate_pts(0x12345678, 90000, now),
(1_000_000_000, None)
);
}
#[test]
fn test_single_stream_with_sr() {
init_logs();
let mut ctx = Context::new(TimestampingMode::Rtp);
let mut now = 0;
ctx.set_clock_rate(0x12345678, 90000);
ctx.add_sender_report(
0x12345678,
0,
system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(),
);
assert_eq!(
ctx.calculate_pts(0x12345678, 0, now),
(0, Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH)))
);
now += 1_000_000_000;
assert_eq!(
ctx.calculate_pts(0x12345678, 90000, now),
(
1_000_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(1000)
))
)
);
}
#[test]
fn test_two_streams_with_sr() {
init_logs();
let mut ctx = Context::new(TimestampingMode::Rtp);
let mut now = 0;
ctx.set_clock_rate(0x12345, 90000);
ctx.set_clock_rate(0x67890, 90000);
ctx.associate(0x12345, "foo@bar");
ctx.associate(0x67890, "foo@bar");
ctx.add_sender_report(
0x12345,
0,
system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(),
);
ctx.add_sender_report(
0x67890,
0,
system_time_to_ntp_time_u64(std::time::UNIX_EPOCH + Duration::from_millis(500))
.as_u64(),
);
// NTP time 0
assert_eq!(
ctx.calculate_pts(0x12345, 0, now),
(0, Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH)))
);
now += 500_000_000;
// NTP time 500, arrival time 500
assert_eq!(
ctx.calculate_pts(0x12345, 45000, now),
(
500_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(500)
))
)
);
// NTP time 500, arrival time 500
assert_eq!(
ctx.calculate_pts(0x67890, 0, now),
(
500_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(500)
))
)
);
now += 500_000_000;
// NTP time 1000, arrival time 1000
assert_eq!(
ctx.calculate_pts(0x12345, 90000, now),
(
1_000_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(1000)
))
)
);
now += 500_000_000;
// NTP time 1500, arrival time 1500
assert_eq!(
ctx.calculate_pts(0x67890, 90000, now),
(
1_500_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(1500)
))
)
);
}
#[test]
fn test_two_streams_no_sr_and_offset_arrival_times() {
init_logs();
let mut ctx = Context::new(TimestampingMode::Rtp);
let mut now = 0;
ctx.set_clock_rate(0x12345, 90000);
ctx.set_clock_rate(0x67890, 90000);
ctx.associate(0x12345, "foo@bar");
ctx.associate(0x67890, "foo@bar");
assert_eq!(ctx.calculate_pts(0x12345, 0, now), (0, None));
now += 500_000_000;
assert_eq!(ctx.calculate_pts(0x67890, 0, now), (500_000_000, None));
assert_eq!(ctx.calculate_pts(0x12345, 45000, now), (500_000_000, None));
}
#[test]
fn test_two_streams_with_same_sr_and_offset_arrival_times() {
init_logs();
let mut ctx = Context::new(TimestampingMode::Rtp);
let mut now = 0;
ctx.set_clock_rate(0x12345, 90000);
ctx.set_clock_rate(0x67890, 90000);
ctx.associate(0x12345, "foo@bar");
ctx.associate(0x67890, "foo@bar");
ctx.add_sender_report(
0x12345,
0,
system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(),
);
ctx.add_sender_report(
0x67890,
0,
system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(),
);
assert_eq!(
ctx.calculate_pts(0x12345, 0, now),
(0, Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH)))
);
now += 500_000_000;
assert_eq!(
ctx.calculate_pts(0x67890, 0, now),
(
500_000_000,
Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH))
)
);
assert_eq!(
ctx.calculate_pts(0x12345, 45000, now),
(
1_000_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(500)
))
)
);
now += 500_000_000;
assert_eq!(
ctx.calculate_pts(0x67890, 45000, now),
(
1_000_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(500)
))
)
);
// Now remove the delayed source and observe that the offset is gone
// for the other source
ctx.remove_ssrc(0x67890);
assert_eq!(
ctx.calculate_pts(0x12345, 90000, now),
(
1_000_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(1000)
))
)
);
}
#[test]
fn test_two_streams_with_sr_different_cnames() {
init_logs();
let mut ctx = Context::new(TimestampingMode::Rtp);
let mut now = 0;
ctx.set_clock_rate(0x12345, 90000);
ctx.set_clock_rate(0x67890, 90000);
ctx.associate(0x12345, "foo@bar");
ctx.associate(0x67890, "foo@baz");
ctx.add_sender_report(
0x12345,
0,
system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(),
);
ctx.add_sender_report(
0x67890,
0,
system_time_to_ntp_time_u64(std::time::UNIX_EPOCH).as_u64(),
);
assert_eq!(
ctx.calculate_pts(0x12345, 0, now),
(0, Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH)))
);
now += 500_000_000;
assert_eq!(
ctx.calculate_pts(0x67890, 0, now),
(
500_000_000,
Some(system_time_to_ntp_time_u64(std::time::UNIX_EPOCH))
)
);
assert_eq!(
ctx.calculate_pts(0x12345, 45000, now),
(
500_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(500)
))
)
);
now += 500_000_000;
assert_eq!(
ctx.calculate_pts(0x67890, 45000, now),
(
1_000_000_000,
Some(system_time_to_ntp_time_u64(
std::time::UNIX_EPOCH + Duration::from_millis(500)
))
)
);
}
}

View file

@ -19,6 +19,10 @@ impl NtpTime {
Self((dur.as_secs_f64() * F32) as u64)
}
pub fn as_duration(&self) -> Result<Duration, std::time::TryFromFloatSecsError> {
Duration::try_from_secs_f64(self.0 as f64 / F32)
}
pub fn as_u32(self) -> u32 {
((self.0 >> 16) & 0xffffffff) as u32
}

View file

@ -306,6 +306,55 @@ macro_rules! define_wrapping_comparable_u32_with_display {
};
}
/// Stores information necessary to compute a series of extended timestamps
#[derive(Default, Debug)]
pub(crate) struct ExtendedTimestamp {
last_ext: Option<u64>,
}
impl ExtendedTimestamp {
/// Produces the next extended timestamp from a new RTP timestamp
pub(crate) fn next(&mut self, rtp_timestamp: u32) -> u64 {
let ext = match self.last_ext {
None => (1u64 << 32) + rtp_timestamp as u64,
Some(last_ext) => {
// pick wraparound counter from previous timestamp and add to new timestamp
let mut ext = rtp_timestamp as u64 + (last_ext & !0xffffffff);
// check for timestamp wraparound
if ext < last_ext {
let diff = last_ext - ext;
if diff > std::i32::MAX as u64 {
// timestamp went backwards more than allowed, we wrap around and get
// updated extended timestamp.
ext += 1u64 << 32;
}
} else {
let diff = ext - last_ext;
if diff > std::i32::MAX as u64 {
if ext < 1u64 << 32 {
// We can't ever get to such a case as our counter is opaque
unreachable!()
} else {
ext -= 1u64 << 32;
// We don't want the extended timestamp storage to go back, ever
return ext;
}
}
}
ext
}
};
self.last_ext = Some(ext);
ext
}
}
/// Stores information necessary to compute a series of extended seqnums
#[derive(Default, Debug)]
pub(crate) struct ExtendedSeqnum {
@ -586,6 +635,77 @@ mod tests {
assert_eq!(try_cmp(0, 0x8000_0000), Err(ComparisonLimit));
}
#[test]
fn extended_timestamp_basic() {
let mut ext_ts = ExtendedTimestamp::default();
// No wraparound when timestamps are increasing
assert_eq!(ext_ts.next(0), (1 << 32));
assert_eq!(ext_ts.next(10), (1 << 32) + 10);
assert_eq!(ext_ts.next(10), (1 << 32) + 10);
assert_eq!(
ext_ts.next(1 + std::i32::MAX as u32),
(1 << 32) + 1 + std::i32::MAX as u64
);
// Even big bumps under G_MAXINT32 don't result in wrap-around
ext_ts = ExtendedTimestamp::default();
assert_eq!(ext_ts.next(1087500), (1 << 32) + 1087500);
assert_eq!(ext_ts.next(24), (1 << 32) + 24);
}
#[test]
fn extended_timestamp_wraparound() {
let mut ext_ts = ExtendedTimestamp::default();
assert_eq!(
ext_ts.next(std::u32::MAX - 90000 + 1),
(1 << 32) + std::u32::MAX as u64 - 90000 + 1
);
assert_eq!(ext_ts.next(0), (1 << 32) + std::u32::MAX as u64 + 1);
assert_eq!(
ext_ts.next(90000),
(1 << 32) + std::u32::MAX as u64 + 1 + 90000
);
}
#[test]
fn extended_timestamp_wraparound_disordered() {
let mut ext_ts = ExtendedTimestamp::default();
assert_eq!(
ext_ts.next(std::u32::MAX - 90000 + 1),
(1 << 32) + std::u32::MAX as u64 - 90000 + 1
);
assert_eq!(ext_ts.next(0), (1 << 32) + std::u32::MAX as u64 + 1);
// Unwrapping around
assert_eq!(
ext_ts.next(std::u32::MAX - 90000 + 1),
(1 << 32) + std::u32::MAX as u64 - 90000 + 1
);
assert_eq!(
ext_ts.next(90000),
(1 << 32) + std::u32::MAX as u64 + 1 + 90000
);
}
#[test]
fn extended_timestamp_wraparound_disordered_backwards() {
let mut ext_ts = ExtendedTimestamp::default();
assert_eq!(ext_ts.next(90000), (1 << 32) + 90000);
// Wraps backwards
assert_eq!(
ext_ts.next(std::u32::MAX - 90000 + 1),
std::u32::MAX as u64 - 90000 + 1
);
// Wraps again forwards
assert_eq!(ext_ts.next(90000), (1 << 32) + 90000);
}
#[test]
fn extended_seqnum_basic() {
let mut ext_seq = ExtendedSeqnum::default();