rtp/source: use extended sequence number helper

Instead of rolling our own

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1618>
This commit is contained in:
Matthew Waters 2024-06-12 17:49:26 +10:00 committed by GStreamer Marge Bot
parent 47d62b6d78
commit 84a9f9c61f
2 changed files with 29 additions and 48 deletions

View file

@ -8,6 +8,8 @@ use std::{
use rtcp_types::{ReportBlock, ReportBlockBuilder};
use crate::utils::ExtendedSeqnum;
use super::{
session::KeyUnitRequestType,
time::{system_time_to_ntp_time_u64, NtpTime},
@ -168,7 +170,7 @@ impl ReceivedRb {
#[derive(Debug)]
pub struct LocalSendSource {
source: Source,
ext_seqnum: u64,
ext_seqnum: ExtendedSeqnum,
last_rtp_sent: Option<(u32, Instant)>,
sent_bytes: u64,
sent_packets: u64,
@ -183,7 +185,7 @@ impl LocalSendSource {
pub(crate) fn new(ssrc: u32) -> Self {
Self {
source: Source::new(ssrc),
ext_seqnum: 0,
ext_seqnum: ExtendedSeqnum::default(),
last_rtp_sent: None,
sent_bytes: 0,
sent_packets: 0,
@ -213,20 +215,7 @@ impl LocalSendSource {
) {
self.bitrate.add_entry(bytes, time);
let mut ext_seqnum = seqnum as u64 + (self.ext_seqnum & !0xffff);
if ext_seqnum < self.ext_seqnum {
let diff = self.ext_seqnum - ext_seqnum;
if diff > 0x7fff {
ext_seqnum += 1 << 16;
}
} else {
let diff = ext_seqnum - self.ext_seqnum;
if diff > 0x7fff {
ext_seqnum -= 1 << 16;
}
}
self.ext_seqnum = ext_seqnum;
let _ext_seqnum = self.ext_seqnum.next(seqnum);
self.source.payload_type = Some(payload_type);
@ -419,7 +408,7 @@ pub struct RemoteSendSource {
rtp_from: Option<SocketAddr>,
rtcp_from: Option<SocketAddr>,
initial_seqnum: Option<u64>,
ext_seqnum: Option<u64>,
ext_seqnum: ExtendedSeqnum,
recv_bytes: u64,
recv_packets: u64,
recv_packets_at_last_rtcp: u64,
@ -455,7 +444,7 @@ impl RemoteSendSource {
rtp_from: None,
rtcp_from: None,
initial_seqnum: None,
ext_seqnum: None,
ext_seqnum: ExtendedSeqnum::default(),
recv_bytes: 0,
recv_packets: 0,
recv_packets_at_last_rtcp: 0,
@ -535,8 +524,8 @@ impl RemoteSendSource {
self.recv_bytes = 0;
self.recv_packets = 0;
self.recv_packets_at_last_rtcp = 0;
self.initial_seqnum = self.ext_seqnum;
self.ext_seqnum_at_last_rtcp = match self.ext_seqnum {
self.initial_seqnum = self.ext_seqnum.current();
self.ext_seqnum_at_last_rtcp = match self.ext_seqnum.current() {
Some(ext) => ext,
None => 0x10000 + seqnum as u64,
};
@ -560,30 +549,16 @@ impl RemoteSendSource {
return SourceRecvReply::Ignore;
}
let (max_seq, mut ext_seqnum) = match self.ext_seqnum {
Some(ext) => ((ext & 0xffff) as u16, seqnum as u64 + (ext & !0xffff)),
None => (seqnum.wrapping_sub(1), 0x10000 + seqnum as u64),
};
let previous_seqnum = self.ext_seqnum.current();
let ext_seqnum = self.ext_seqnum.next(seqnum);
trace!(
"source {} max seq {max_seq}, ext_seqnum {ext_seqnum}",
"source {} previous {previous_seqnum:?}, ext_seqnum {ext_seqnum}",
self.ssrc()
);
let diff = if seqnum < max_seq {
let mut diff = max_seq - seqnum;
if diff > 0x7fff {
ext_seqnum += 1 << 16;
diff = u16::MAX - diff;
}
-(diff as i32 - 1)
} else {
let mut diff = seqnum - max_seq;
if diff > 0x7fff {
ext_seqnum -= 1 << 16;
diff = u16::MAX - diff;
}
diff as i32
let diff = match previous_seqnum {
Some(ext) => (ext_seqnum as i64).wrapping_sub(ext as i64),
None => 0,
};
trace!("source {} in state {:?} received seqnum {seqnum} with a difference of {diff} from the previous seqnum", self.ssrc(), self.state());
@ -650,9 +625,9 @@ impl RemoteSendSource {
self.set_state(SourceState::Normal);
SourceRecvReply::Passthrough
}
} else if diff >= 1 && diff < DEFAULT_MAX_DROPOUT as i32 {
} else if diff >= 1 && diff < DEFAULT_MAX_DROPOUT as i64 {
SourceRecvReply::Passthrough
} else if diff < -(DEFAULT_MAX_MISORDER as i32) || diff >= DEFAULT_MAX_DROPOUT as i32 {
} else if diff < -(DEFAULT_MAX_MISORDER as i64) || diff >= DEFAULT_MAX_DROPOUT as i64 {
debug!("non-consecutive packet outside of configured limits, dropping");
// TODO: we will want to perform a few tasks here that the C jitterbuffer
@ -696,7 +671,6 @@ impl RemoteSendSource {
}
trace!("setting ext seqnum to {ext_seqnum}");
self.ext_seqnum = Some(ext_seqnum);
self.recv_packet_add_to_stats(
rtp_timestamp,
time,
@ -779,7 +753,7 @@ impl RemoteSendSource {
}
fn extended_sequence_number(&self) -> u32 {
(self.ext_seqnum.unwrap_or(0) & 0xffff_ffff) as u32
(self.ext_seqnum.current().unwrap_or(0) & 0xffff_ffff) as u32
}
pub(crate) fn generate_report_block(&self, ntp_time: SystemTime) -> Rb {
@ -805,6 +779,7 @@ impl RemoteSendSource {
let expected_since_last_rtcp = self
.ext_seqnum
.current()
.unwrap_or(0)
.saturating_sub(self.ext_seqnum_at_last_rtcp);
let recv_packets_since_last_rtcp = self.recv_packets - self.recv_packets_at_last_rtcp;
@ -848,7 +823,7 @@ impl RemoteSendSource {
pub(crate) fn update_last_rtcp(&mut self) {
self.recv_packets_at_last_rtcp = self.recv_packets;
if let Some(ext) = self.ext_seqnum {
if let Some(ext) = self.ext_seqnum.current() {
self.ext_seqnum_at_last_rtcp = ext;
}
}
@ -860,7 +835,8 @@ impl RemoteSendSource {
/// The total number of packets lost over the lifetime of this source
pub fn packets_lost(&self) -> i64 {
let expected = self.ext_seqnum.unwrap_or(0) - self.initial_seqnum.unwrap_or(0) + 1;
let expected =
self.ext_seqnum.current().unwrap_or(0) - self.initial_seqnum.unwrap_or(0) + 1;
expected as i64 - self.recv_packets as i64
}
@ -1126,7 +1102,7 @@ impl RemoteReceiveSource {
rtp_from: None,
rtcp_from: self.rtcp_from,
initial_seqnum: None,
ext_seqnum: None,
ext_seqnum: ExtendedSeqnum::default(),
recv_bytes: 0,
recv_packets: 0,
recv_packets_at_last_rtcp: 0,

View file

@ -362,7 +362,12 @@ pub(crate) struct ExtendedSeqnum {
}
impl ExtendedSeqnum {
/// Produces the next extended timestamp from a new RTP timestamp
/// The current extended sequence number
pub(crate) fn current(&self) -> Option<u64> {
self.last_ext
}
/// Produces the next extended sequence number from a new RTP sequence number
pub(crate) fn next(&mut self, rtp_seqnum: u16) -> u64 {
let ext = match self.last_ext {
None => (1u64 << 16) + rtp_seqnum as u64,