diff --git a/net/rtp/src/rtpbin2/source.rs b/net/rtp/src/rtpbin2/source.rs index 0201de06..a4f0f22e 100644 --- a/net/rtp/src/rtpbin2/source.rs +++ b/net/rtp/src/rtpbin2/source.rs @@ -8,6 +8,8 @@ use std::{ use rtcp_types::{ReportBlock, ReportBlockBuilder}; +use crate::utils::ExtendedSeqnum; + use super::{ session::KeyUnitRequestType, time::{system_time_to_ntp_time_u64, NtpTime}, @@ -168,7 +170,7 @@ impl ReceivedRb { #[derive(Debug)] pub struct LocalSendSource { source: Source, - ext_seqnum: u64, + ext_seqnum: ExtendedSeqnum, last_rtp_sent: Option<(u32, Instant)>, sent_bytes: u64, sent_packets: u64, @@ -183,7 +185,7 @@ impl LocalSendSource { pub(crate) fn new(ssrc: u32) -> Self { Self { source: Source::new(ssrc), - ext_seqnum: 0, + ext_seqnum: ExtendedSeqnum::default(), last_rtp_sent: None, sent_bytes: 0, sent_packets: 0, @@ -213,20 +215,7 @@ impl LocalSendSource { ) { self.bitrate.add_entry(bytes, time); - let mut ext_seqnum = seqnum as u64 + (self.ext_seqnum & !0xffff); - - if ext_seqnum < self.ext_seqnum { - let diff = self.ext_seqnum - ext_seqnum; - if diff > 0x7fff { - ext_seqnum += 1 << 16; - } - } else { - let diff = ext_seqnum - self.ext_seqnum; - if diff > 0x7fff { - ext_seqnum -= 1 << 16; - } - } - self.ext_seqnum = ext_seqnum; + let _ext_seqnum = self.ext_seqnum.next(seqnum); self.source.payload_type = Some(payload_type); @@ -419,7 +408,7 @@ pub struct RemoteSendSource { rtp_from: Option, rtcp_from: Option, initial_seqnum: Option, - ext_seqnum: Option, + ext_seqnum: ExtendedSeqnum, recv_bytes: u64, recv_packets: u64, recv_packets_at_last_rtcp: u64, @@ -455,7 +444,7 @@ impl RemoteSendSource { rtp_from: None, rtcp_from: None, initial_seqnum: None, - ext_seqnum: None, + ext_seqnum: ExtendedSeqnum::default(), recv_bytes: 0, recv_packets: 0, recv_packets_at_last_rtcp: 0, @@ -535,8 +524,8 @@ impl RemoteSendSource { self.recv_bytes = 0; self.recv_packets = 0; self.recv_packets_at_last_rtcp = 0; - self.initial_seqnum = self.ext_seqnum; - self.ext_seqnum_at_last_rtcp = match self.ext_seqnum { + self.initial_seqnum = self.ext_seqnum.current(); + self.ext_seqnum_at_last_rtcp = match self.ext_seqnum.current() { Some(ext) => ext, None => 0x10000 + seqnum as u64, }; @@ -560,30 +549,16 @@ impl RemoteSendSource { return SourceRecvReply::Ignore; } - let (max_seq, mut ext_seqnum) = match self.ext_seqnum { - Some(ext) => ((ext & 0xffff) as u16, seqnum as u64 + (ext & !0xffff)), - None => (seqnum.wrapping_sub(1), 0x10000 + seqnum as u64), - }; + let previous_seqnum = self.ext_seqnum.current(); + let ext_seqnum = self.ext_seqnum.next(seqnum); trace!( - "source {} max seq {max_seq}, ext_seqnum {ext_seqnum}", + "source {} previous {previous_seqnum:?}, ext_seqnum {ext_seqnum}", self.ssrc() ); - let diff = if seqnum < max_seq { - let mut diff = max_seq - seqnum; - - if diff > 0x7fff { - ext_seqnum += 1 << 16; - diff = u16::MAX - diff; - } - -(diff as i32 - 1) - } else { - let mut diff = seqnum - max_seq; - if diff > 0x7fff { - ext_seqnum -= 1 << 16; - diff = u16::MAX - diff; - } - diff as i32 + let diff = match previous_seqnum { + Some(ext) => (ext_seqnum as i64).wrapping_sub(ext as i64), + None => 0, }; trace!("source {} in state {:?} received seqnum {seqnum} with a difference of {diff} from the previous seqnum", self.ssrc(), self.state()); @@ -650,9 +625,9 @@ impl RemoteSendSource { self.set_state(SourceState::Normal); SourceRecvReply::Passthrough } - } else if diff >= 1 && diff < DEFAULT_MAX_DROPOUT as i32 { + } else if diff >= 1 && diff < DEFAULT_MAX_DROPOUT as i64 { SourceRecvReply::Passthrough - } else if diff < -(DEFAULT_MAX_MISORDER as i32) || diff >= DEFAULT_MAX_DROPOUT as i32 { + } else if diff < -(DEFAULT_MAX_MISORDER as i64) || diff >= DEFAULT_MAX_DROPOUT as i64 { debug!("non-consecutive packet outside of configured limits, dropping"); // TODO: we will want to perform a few tasks here that the C jitterbuffer @@ -696,7 +671,6 @@ impl RemoteSendSource { } trace!("setting ext seqnum to {ext_seqnum}"); - self.ext_seqnum = Some(ext_seqnum); self.recv_packet_add_to_stats( rtp_timestamp, time, @@ -779,7 +753,7 @@ impl RemoteSendSource { } fn extended_sequence_number(&self) -> u32 { - (self.ext_seqnum.unwrap_or(0) & 0xffff_ffff) as u32 + (self.ext_seqnum.current().unwrap_or(0) & 0xffff_ffff) as u32 } pub(crate) fn generate_report_block(&self, ntp_time: SystemTime) -> Rb { @@ -805,6 +779,7 @@ impl RemoteSendSource { let expected_since_last_rtcp = self .ext_seqnum + .current() .unwrap_or(0) .saturating_sub(self.ext_seqnum_at_last_rtcp); let recv_packets_since_last_rtcp = self.recv_packets - self.recv_packets_at_last_rtcp; @@ -848,7 +823,7 @@ impl RemoteSendSource { pub(crate) fn update_last_rtcp(&mut self) { self.recv_packets_at_last_rtcp = self.recv_packets; - if let Some(ext) = self.ext_seqnum { + if let Some(ext) = self.ext_seqnum.current() { self.ext_seqnum_at_last_rtcp = ext; } } @@ -860,7 +835,8 @@ impl RemoteSendSource { /// The total number of packets lost over the lifetime of this source pub fn packets_lost(&self) -> i64 { - let expected = self.ext_seqnum.unwrap_or(0) - self.initial_seqnum.unwrap_or(0) + 1; + let expected = + self.ext_seqnum.current().unwrap_or(0) - self.initial_seqnum.unwrap_or(0) + 1; expected as i64 - self.recv_packets as i64 } @@ -1126,7 +1102,7 @@ impl RemoteReceiveSource { rtp_from: None, rtcp_from: self.rtcp_from, initial_seqnum: None, - ext_seqnum: None, + ext_seqnum: ExtendedSeqnum::default(), recv_bytes: 0, recv_packets: 0, recv_packets_at_last_rtcp: 0, diff --git a/net/rtp/src/utils.rs b/net/rtp/src/utils.rs index bd0ef8c0..70031a96 100644 --- a/net/rtp/src/utils.rs +++ b/net/rtp/src/utils.rs @@ -362,7 +362,12 @@ pub(crate) struct ExtendedSeqnum { } impl ExtendedSeqnum { - /// Produces the next extended timestamp from a new RTP timestamp + /// The current extended sequence number + pub(crate) fn current(&self) -> Option { + self.last_ext + } + + /// Produces the next extended sequence number from a new RTP sequence number pub(crate) fn next(&mut self, rtp_seqnum: u16) -> u64 { let ext = match self.last_ext { None => (1u64 << 16) + rtp_seqnum as u64,