From a19c9229d8ccd1e780cc3efbf3a03148342f3f4f Mon Sep 17 00:00:00 2001 From: Albert Sjolund Date: Fri, 16 May 2025 11:02:30 +0200 Subject: [PATCH] gcc: handle out of order packets Before, if a reference time wraparound happened and the arrival time of newer packets was much lower, the map would grow infinitely. Handle specifically the case where the difference in timestamp is very large, if the input data contains a wraparound. Contains a unit/regression test for this behaviour. This will be combined with a fix for TWCC parsing in rtptwcc.c in gstreamer core. Part-of: --- net/rtp/src/gcc/imp.rs | 69 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 61 insertions(+), 8 deletions(-) diff --git a/net/rtp/src/gcc/imp.rs b/net/rtp/src/gcc/imp.rs index d96170278..69de65a1b 100644 --- a/net/rtp/src/gcc/imp.rs +++ b/net/rtp/src/gcc/imp.rs @@ -387,17 +387,35 @@ impl Detector { } fn evict_old_received_packets(&mut self) { - let last_arrival = self - .last_received_packets - .values() - .next_back() - .unwrap() - .arrival; - - while last_arrival - self.oldest_packet_in_window_ts() > PACKETS_RECEIVED_WINDOW { + let last_arrival = self.newest_packet_in_window_ts(); + while last_arrival > self.oldest_packet_in_window_ts() + && last_arrival - self.oldest_packet_in_window_ts() > PACKETS_RECEIVED_WINDOW + { let oldest_seqnum = *self.last_received_packets.iter().next().unwrap().0; self.last_received_packets.remove(&oldest_seqnum); } + // In the case of a possible wraparound due to reference time exceeding 24 bits + // remove values that are chronologically incorrect. This *shouldn't* happen, but + // if the input data contains a wraparound, memory will start leaking otherwise. + // Simply checking the size of the map won't work, as the data accumulated will then + // be incorrect; the map has to be wiped when such data arrives. + // The value of whole_days() is picked to simply be very high. If the timestamp jumps + // 24 hours then an error must have occured. It could have been any very large number. + while last_arrival < self.oldest_packet_in_window_ts() + && (self.oldest_packet_in_window_ts() - last_arrival).whole_days() > 0 + { + let oldest_seqnum = *self.last_received_packets.iter().next().unwrap().0; + self.last_received_packets.remove(&oldest_seqnum); + } + } + + fn newest_packet_in_window_ts(&self) -> Duration { + self.last_received_packets + .iter() + .next_back() + .unwrap() + .1 + .arrival } /// Returns the effective received bitrate during the last PACKETS_RECEIVED_WINDOW @@ -1408,3 +1426,38 @@ impl ElementImpl for BandwidthEstimator { PAD_TEMPLATES.as_ref() } } + +#[cfg(test)] +mod tests { + use super::{Detector, Estimator, Packet}; + use time::Duration; + #[test] + fn test_detector_ensure_no_leak() { + gst::init().unwrap(); + let mut detector = Detector::new(Estimator::LinearRegression); + for i in 0..100_i64 { + let pkt = Packet { + departure: Duration::ZERO, + // Maximum i24 value. + arrival: Duration::new((1 << 23) - 1 - (100 - i), 0), + size: 0, + seqnum: i as u64, + }; + detector.update_last_received_packets(pkt); + } + + for i in 0..100_i64 { + let pkt = Packet { + departure: Duration::ZERO, + // Minimum i24 value. + arrival: Duration::new(-(1 << 23) + i, 0), + size: 0, + seqnum: 100 + i as u64, + }; + detector.update_last_received_packets(pkt); + } + // the actual number of packets should be 2, but it depends on the window size, + // so just ensure it is lower than 10, as it will be 100 if it is failing. + assert!(detector.last_received_packets.len() < 10); + } +}