rtpbin2: add support for RFC 4585 (RTP/AVPF)

Implements the timing rules for RTP/AVPF

Co-Authored-By: Sebastian Dröge <sebastian@centricular.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Matthew Waters 2023-12-15 15:27:29 +11:00
parent 27ad26c258
commit 2c86f18a99
4 changed files with 624 additions and 117 deletions

View file

@ -7263,6 +7263,18 @@
"type": "guint",
"writable": true
},
"rtp-profile": {
"blurb": "RTP Profile to use",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "avp (0)",
"mutable": "ready",
"readable": true,
"type": "GstRtpBin2Profile",
"writable": true
},
"stats": {
"blurb": "Statistics about the session",
"conditionally-available": false,

View file

@ -12,7 +12,9 @@ use futures::StreamExt;
use gst::{glib, prelude::*, subclass::prelude::*};
use once_cell::sync::Lazy;
use super::session::{RecvReply, RtcpRecvReply, SendReply, Session, RTCP_MIN_REPORT_INTERVAL};
use super::session::{
RecvReply, RtcpRecvReply, RtpProfile, SendReply, Session, RTCP_MIN_REPORT_INTERVAL,
};
use super::source::{ReceivedRb, SourceState};
use crate::rtpbin2::RUNTIME;
@ -28,10 +30,40 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstRtpBin2Profile")]
enum Profile {
#[default]
#[enum_value(name = "AVP profile as specified in RFC 3550", nick = "avp")]
Avp,
#[enum_value(name = "AVPF profile as specified in RFC 4585", nick = "avpf")]
Avpf,
}
impl From<RtpProfile> for Profile {
fn from(value: RtpProfile) -> Self {
match value {
RtpProfile::Avp => Self::Avp,
RtpProfile::Avpf => Self::Avpf,
}
}
}
impl From<Profile> for RtpProfile {
fn from(value: Profile) -> Self {
match value {
Profile::Avp => Self::Avp,
Profile::Avpf => Self::Avpf,
}
}
}
#[derive(Debug, Clone)]
struct Settings {
latency: gst::ClockTime,
min_rtcp_interval: Duration,
profile: Profile,
}
impl Default for Settings {
@ -39,6 +71,7 @@ impl Default for Settings {
Settings {
latency: DEFAULT_LATENCY,
min_rtcp_interval: DEFAULT_MIN_RTCP_INTERVAL,
profile: Profile::default(),
}
}
}
@ -121,10 +154,15 @@ struct BinSession {
}
impl BinSession {
fn new(id: usize, min_rtcp_interval: Duration) -> Self {
fn new(id: usize, settings: &Settings) -> Self {
let mut inner = BinSessionInner::new(id);
inner
.session
.set_min_rtcp_interval(settings.min_rtcp_interval);
inner.session.set_profile(settings.profile.into());
Self {
id,
inner: Arc::new(Mutex::new(BinSessionInner::new(id, min_rtcp_interval))),
inner: Arc::new(Mutex::new(inner)),
}
}
}
@ -156,9 +194,7 @@ struct BinSessionInner {
}
impl BinSessionInner {
fn new(id: usize, min_rtcp_interval: Duration) -> Self {
let mut session = Session::new();
session.set_min_rtcp_interval(min_rtcp_interval);
fn new(id: usize) -> Self {
Self {
id,
@ -939,6 +975,12 @@ impl ObjectImpl for RtpBin2 {
.blurb("Statistics about the session")
.read_only()
.build(),
glib::ParamSpecEnum::builder::<Profile>("rtp-profile")
.nick("RTP Profile")
.blurb("RTP Profile to use")
.default_value(Profile::default())
.mutable_ready()
.build(),
]
});
@ -966,6 +1008,10 @@ impl ObjectImpl for RtpBin2 {
value.get::<u32>().expect("type checked upstream").into(),
);
}
"rtp-profile" => {
let mut settings = self.settings.lock().unwrap();
settings.profile = value.get::<Profile>().expect("Type checked upstream");
}
_ => unimplemented!(),
}
}
@ -984,6 +1030,10 @@ impl ObjectImpl for RtpBin2 {
let state = self.state.lock().unwrap();
state.stats().to_value()
}
"rtp-profile" => {
let settings = self.settings.lock().unwrap();
settings.profile.to_value()
}
_ => unimplemented!(),
}
}
@ -1070,7 +1120,7 @@ impl ElementImpl for RtpBin2 {
_caps: Option<&gst::Caps>, // XXX: do something with caps?
) -> Option<gst::Pad> {
let this = self.obj();
let min_rtcp_interval = self.settings.lock().unwrap().min_rtcp_interval;
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.lock().unwrap();
let max_session_id = state.max_session_id;
@ -1135,7 +1185,7 @@ impl ElementImpl for RtpBin2 {
new_pad(&mut session)
}
} else {
let session = BinSession::new(id, min_rtcp_interval);
let session = BinSession::new(id, &settings);
let mut inner = session.inner.lock().unwrap();
let ret = new_pad(&mut inner);
drop(inner);
@ -1178,7 +1228,7 @@ impl ElementImpl for RtpBin2 {
new_pad(&mut session)
}
} else {
let session = BinSession::new(id, min_rtcp_interval);
let session = BinSession::new(id, &settings);
let mut inner = session.inner.lock().unwrap();
let ret = new_pad(&mut inner);
drop(inner);

View file

@ -20,7 +20,6 @@ use gst::prelude::MulDiv;
// TODO: make configurable
pub const RTCP_MIN_REPORT_INTERVAL: Duration = Duration::from_secs(5);
// TODO: reduced minimum interval? (360 / session bandwidth)
const RTCP_SOURCE_TIMEOUT_N_INTERVALS: u32 = 5;
const RTCP_ADDRESS_CONFLICT_TIMEOUT: Duration = RTCP_MIN_REPORT_INTERVAL.saturating_mul(12);
@ -42,19 +41,36 @@ struct ByeState {
pmembers: usize,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum RtpProfile {
#[default]
Avp,
Avpf,
}
impl RtpProfile {
fn is_feedback(&self) -> bool {
self == &Self::Avpf
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RequestEarlyRtcpResult {
NotScheduled,
Scheduled,
TimerReconsideration,
}
#[derive(Debug)]
pub struct Session {
// settings
min_rtcp_interval: Duration,
profile: RtpProfile,
// state
local_senders: HashMap<u32, LocalSendSource>,
local_receivers: HashMap<u32, LocalReceiveSource>,
remote_receivers: HashMap<u32, RemoteReceiveSource>,
remote_senders: HashMap<u32, RemoteSendSource>,
last_rtcp_sent_times: VecDeque<Instant>,
// holds the next rtcp send time and the number of members at the time when the time was
// calculated
next_rtcp_send: RtcpTimeMembers,
average_rtcp_size: usize,
last_sent_data: Option<Instant>,
hold_buffer_counter: usize,
@ -64,6 +80,20 @@ pub struct Session {
// used when we have not sent anything but need a ssrc for Rr
internal_rtcp_sender_src: Option<u32>,
bye_state: Option<ByeState>,
is_point_to_point: bool,
// rtcp scheduling state
// T_rr: holds the interval used to calculate the current `next_rtcp_send`
rtcp_interval: Option<Duration>,
// tp: last time that any rtcp was handled
last_rtcp_handle_time: Option<Instant>,
// tn: holds the next regular rtcp send time and the number of members at the
// time when the time was calculated
next_rtcp_send: RtcpTimeMembers,
// T_rr_last: last two times a regular rtcp packet was sent
last_rtcp_sent_times: VecDeque<Instant>,
// time for the next early rtcp to be sent
next_early_rtcp_time: Option<Instant>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -117,12 +147,14 @@ impl Session {
sdes.insert(SdesItem::CNAME, cname);
Self {
min_rtcp_interval: RTCP_MIN_REPORT_INTERVAL,
profile: RtpProfile::default(),
local_senders: HashMap::new(),
// also known as remote_senders
local_receivers: HashMap::new(),
remote_receivers: HashMap::new(),
remote_senders: HashMap::new(),
last_rtcp_sent_times: VecDeque::new(),
rtcp_interval: None,
next_rtcp_send: RtcpTimeMembers {
time: None,
p_members: 0,
@ -135,14 +167,22 @@ impl Session {
conflicting_addresses: HashMap::new(),
internal_rtcp_sender_src: None,
bye_state: None,
next_early_rtcp_time: None,
last_rtcp_handle_time: None,
is_point_to_point: true,
}
}
/// Set the minimum RTCP interval to use for this session
/// Set the minimum (regular) RTCP interval to use for this session
pub fn set_min_rtcp_interval(&mut self, min_rtcp_interval: Duration) {
self.min_rtcp_interval = min_rtcp_interval;
}
/// Set the RTP profile to use.
pub fn set_profile(&mut self, profile: RtpProfile) {
self.profile = profile;
}
fn n_members(&self) -> usize {
self.bye_state
.as_ref()
@ -191,6 +231,28 @@ impl Session {
.unwrap_or(self.next_rtcp_send.p_members)
}
fn update_point_to_point(&mut self) {
// This definition of point to point is from RFC 8108
// i.e. If there are multiple remote CNAMEs, then we are not point to point.
//
// XXX: We currently don't provide external overriding for this
let mut cname = None;
self.is_point_to_point = self
.remote_senders
.values()
.filter_map(|source| source.sdes().get(&SdesItem::CNAME))
.chain(
self.remote_receivers
.values()
.filter_map(|source| source.sdes().get(&SdesItem::CNAME)),
)
.all(|cn| {
let ret = cname.is_none() || cname == Some(cn);
cname = cname.or(Some(cn));
ret
});
}
/// Set the RTP clock rate for a particular payload type
pub fn set_pt_clock_rate(&mut self, pt: u8, clock_rate: u32) {
self.pt_map.insert(pt, clock_rate);
@ -409,6 +471,8 @@ impl Session {
let dur = prev.saturating_duration_since(now);
if self.next_rtcp_send.p_members > 0 {
let member_factor = initial_n_members as f64 / self.next_rtcp_send.p_members as f64;
// FIXME: Does something have to be adjusted here in feedback profiles for
// T_rr_interval?
*prev = now + dur.mul_f64(member_factor);
self.next_rtcp_send.p_members = n_members;
if let Some(last_rtcp) = self.last_rtcp_sent_times.front_mut() {
@ -592,9 +656,12 @@ impl Session {
}
}
Ok(Packet::Unknown(_unk)) => (),
// TODO: in RFC4585 profile, need to listen for feedback messages and remove any
// that we would have sent
Err(_) => (),
}
}
self.update_point_to_point();
replies
}
@ -603,6 +670,8 @@ impl Session {
mut rtcp: CompoundBuilder<'a>,
now: Instant,
ntp_now: SystemTime,
minimum: bool, // RFC 4585
ssrcs_reported: &mut Vec<u32>,
) -> CompoundBuilder<'a> {
let ntp_time = system_time_to_ntp_time_u64(ntp_now);
if self
@ -640,7 +709,6 @@ impl Session {
// assume that the rtp times and clock times advance at a rate
// close to 1.0 and do a direct linear extrapolation to get the rtp
// time for 'now'
trace!("clock-rate {clock_rate}");
(dur_since_last_rtp.as_nanos() as u64).mul_div_round(
clock_rate as u64,
gst::ClockTime::SECOND.nseconds(),
@ -657,15 +725,29 @@ impl Session {
.rtp_timestamp(rtp_timestamp);
sender_srs.push((sender.ssrc(), ntp_now, ntp_time, rtp_timestamp));
if !ssrcs_reported.iter().any(|&ssrc| ssrc == sender.ssrc()) {
ssrcs_reported.push(sender.ssrc());
}
for sender in self.remote_senders.values() {
if sender.state() != SourceState::Normal {
continue;
// Don't include RBs in minimal RTCP packets
if !minimum {
for sender in self.remote_senders.values() {
if sender.state() != SourceState::Normal {
continue;
}
let rb = sender.generate_report_block(ntp_now);
sr = sr.add_report_block(rb.into());
if !ssrcs_reported.iter().any(|&ssrc| ssrc == sender.ssrc()) {
ssrcs_reported.push(sender.ssrc());
}
}
let rb = sender.generate_report_block(ntp_now);
sr = sr.add_report_block(rb.into());
}
rtcp = rtcp.add_packet(sr);
// A minimal RTCP packet only contains a single SR
if minimum {
break;
}
}
for (ssrc, ntp_now, ntp_time, rtp_timestamp) in sender_srs {
self.local_senders
@ -711,6 +793,8 @@ impl Session {
mut rtcp: CompoundBuilder<'a>,
now: Instant,
ntp_now: SystemTime,
minimum: bool, // RFC 4585
ssrcs_reported: &mut Vec<u32>,
) -> CompoundBuilder<'a> {
if self
.local_senders
@ -729,32 +813,44 @@ impl Session {
.entry(ssrc)
.and_modify(|source| source.set_last_activity(now));
let mut rr = ReceiverReport::builder(ssrc);
for sender in self.remote_senders.values() {
if sender.state() != SourceState::Normal {
continue;
// Don't include RBs in minimal RTCP packets
if !minimum {
for sender in self.remote_senders.values() {
if sender.state() != SourceState::Normal {
continue;
}
let rb = sender.generate_report_block(ntp_now);
rr = rr.add_report_block(rb.into());
if !ssrcs_reported.iter().any(|&ssrc| ssrc == sender.ssrc()) {
ssrcs_reported.push(sender.ssrc());
}
}
let rb = sender.generate_report_block(ntp_now);
rr = rr.add_report_block(rb.into());
}
rtcp = rtcp.add_packet(rr);
}
rtcp
}
fn generate_sdes<'a>(&self, rtcp: CompoundBuilder<'a>) -> CompoundBuilder<'a> {
fn generate_sdes<'a>(
&self,
rtcp: CompoundBuilder<'a>,
minimum: bool, // RFC 4585
) -> CompoundBuilder<'a> {
let mut sdes = Sdes::builder();
let mut have_chunk = false;
if !self.local_senders.is_empty() {
for sender in self.local_senders.values() {
let sdes_map = sender.sdes();
if !sdes_map.is_empty() {
let mut chunk = SdesChunk::builder(sender.ssrc());
for (ty, val) in sdes_map {
for sender in self.local_senders.values() {
let sdes_map = sender.sdes();
if !sdes_map.is_empty() {
let mut chunk = SdesChunk::builder(sender.ssrc());
for (ty, val) in sdes_map {
if !minimum || *ty == SdesItem::CNAME {
chunk = chunk.add_item_owned(SdesItem::builder(*ty, val));
}
have_chunk = true;
sdes = sdes.add_chunk(chunk);
}
have_chunk = true;
sdes = sdes.add_chunk(chunk);
}
}
for receiver in self.local_receivers.values() {
@ -762,7 +858,9 @@ impl Session {
if !sdes_map.is_empty() {
let mut chunk = SdesChunk::builder(receiver.ssrc());
for (ty, val) in sdes_map {
chunk = chunk.add_item_owned(SdesItem::builder(*ty, val));
if !minimum || *ty == SdesItem::CNAME {
chunk = chunk.add_item_owned(SdesItem::builder(*ty, val));
}
}
have_chunk = true;
sdes = sdes.add_chunk(chunk);
@ -834,7 +932,10 @@ impl Session {
// RFC 3550 6.3.5
fn handle_timeouts(&mut self, now: Instant) {
trace!("handling rtcp timeouts");
let td = RTCP_SOURCE_TIMEOUT_N_INTERVALS * self.deterministic_rtcp_duration(false);
let td = RTCP_SOURCE_TIMEOUT_N_INTERVALS
* self
.deterministic_rtcp_duration(false)
.max(Duration::from_secs(5));
// delete all sources that are too old
self.local_receivers
@ -884,26 +985,57 @@ impl Session {
.retain(|_addr, time| now - *time < RTCP_ADDRESS_CONFLICT_TIMEOUT);
}
/// Produce a RTCP packet (or None if it is too early to send a RTCP packet). After this call returns
/// a packet, the next time to send a RTCP packet can be retrieved from `poll_rtcp_send_timeout`
/// Produce a RTCP packet (or `None` if it is too early to send a RTCP packet). After this call returns,
/// the next time to send a RTCP packet can be retrieved from `poll_rtcp_send_timeout`
// TODO: return RtcpPacketBuilder thing
pub fn poll_rtcp_send(&mut self, now: Instant, ntp_now: SystemTime) -> Option<Vec<u8>> {
let Some(next_rtcp_send) = self.next_rtcp_send.time else {
trace!("no next check time yet");
return None;
};
if now < next_rtcp_send {
return None;
let is_early = self.next_early_rtcp_time.is_some() && !self.last_rtcp_sent_times.is_empty();
if is_early {
if let Some(next_early_rtcp_time) = self.next_early_rtcp_time {
if now < next_early_rtcp_time {
trace!("next early time {next_early_rtcp_time:?} not reached at {now:?}, nothing to produce");
return None;
}
}
} else {
if now < next_rtcp_send {
trace!("next time {next_rtcp_send:?} not reached at {now:?}, nothing to produce");
return None;
}
// timer reconsideration
self.update_point_to_point();
let interval = self.rtcp_interval();
let test_next_rtcp_time = self.last_rtcp_handle_time.unwrap() + interval;
if test_next_rtcp_time > now {
trace!("timer reconsideration considers this wakeup {now:?} too early, nothing to produce. reconsidered time {test_next_rtcp_time:?}");
self.next_rtcp_send.time = Some(test_next_rtcp_time);
return None;
}
}
trace!("generating rtcp packet at {now:?}, ntp:{ntp_now:?}");
debug!(
"generating rtcp packet at {now:?}, ntp:{ntp_now:?}, duration since last rtcp {:?}",
self.last_rtcp_sent_times
.front()
.copied()
.unwrap_or(now)
.duration_since(now)
);
let data = {
let (data, ssrcs_reported) = {
let mut rtcp = Compound::builder();
let mut ssrcs_reported = vec![];
// TODO: implement round robin of sr/rrs
rtcp = self.generate_sr(rtcp, now, ntp_now);
rtcp = self.generate_rr(rtcp, now, ntp_now);
rtcp = self.generate_sdes(rtcp);
rtcp = self.generate_sr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported);
rtcp = self.generate_rr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported);
rtcp = self.generate_sdes(rtcp, is_early);
rtcp = self.generate_bye(rtcp, now);
let size = rtcp.calculate_size().unwrap();
@ -911,36 +1043,68 @@ impl Session {
assert!(size < RTCP_MTU);
let mut data = vec![0; size];
rtcp.write_into(&mut data).unwrap();
data
(data, ssrcs_reported)
};
for receiver in self.remote_senders.values_mut() {
receiver.update_last_rtcp();
for ssrc in ssrcs_reported {
if let Some(receiver) = self.remote_senders.get_mut(&ssrc) {
receiver.update_last_rtcp();
}
}
self.update_rtcp_average(data.len() + UDP_IP_OVERHEAD_BYTES);
self.handle_timeouts(now);
if !is_early {
self.handle_timeouts(now);
self.next_rtcp_send = RtcpTimeMembers {
time: Some(self.next_rtcp_time(now)),
p_members: self.n_members(),
};
self.last_rtcp_sent_times.push_front(now);
while self.last_rtcp_sent_times.len() > 2 {
self.last_rtcp_sent_times.pop_back();
self.update_point_to_point();
let mut interval = self.rtcp_interval();
self.rtcp_interval = Some(interval);
// Handle T_rr_interval
if self.profile.is_feedback() && !self.min_rtcp_interval.is_zero() {
let new_interval = interval.max(rtcp_dither(self.min_rtcp_interval));
trace!("Updating interval from {interval:?} to {new_interval:?}");
interval = new_interval;
}
self.next_rtcp_send = RtcpTimeMembers {
time: Some(now + interval),
p_members: self.n_members(),
};
trace!("next rtcp time {:?}", self.next_rtcp_send.time);
self.last_rtcp_sent_times.push_front(now);
while self.last_rtcp_sent_times.len() > 2 {
self.last_rtcp_sent_times.pop_back();
}
self.bye_state = None;
}
self.bye_state = None;
self.last_rtcp_handle_time = Some(now);
self.next_early_rtcp_time = None;
Some(data)
}
/// Returns the next time to send a RTCP packet.
pub fn poll_rtcp_send_timeout(&mut self, now: Instant) -> Option<Instant> {
trace!(
"poll-rtcp-send-timeout early time {:?}, next {:?}",
self.next_early_rtcp_time,
self.next_rtcp_send.time
);
if let Some(early_time) = self.next_early_rtcp_time {
return Some(early_time);
}
if self.next_rtcp_send.time.is_none() {
self.update_point_to_point();
let interval = self.rtcp_interval();
self.rtcp_interval = Some(interval);
self.last_rtcp_handle_time = Some(now);
self.next_rtcp_send = RtcpTimeMembers {
time: Some(self.next_rtcp_time(now)),
time: Some(now + interval),
p_members: self.n_members(),
};
trace!(
"poll-rtcp-send-timeout initial rtcp time {:?}",
self.next_rtcp_send.time
);
}
self.next_rtcp_send.time
}
@ -962,8 +1126,22 @@ impl Session {
(n_members, rtcp_bw)
};
let min_rtcp_interval = if !self.last_rtcp_sent_times.is_empty() && self.bye_state.is_none()
{
let min_rtcp_interval = if we_sent {
Duration::ZERO
} else if self.profile.is_feedback() {
// RFC 4585 3.4d), 3.5.1
//
// If not the first RTCP then Tmin is initialized to 0, otherwise to 1s.
// Also for a multi-party session it is initialized to 0.
if self.is_point_to_point
&& !self.last_rtcp_sent_times.is_empty()
&& self.bye_state.is_none()
{
Duration::ZERO
} else {
Duration::from_secs(1)
}
} else if !self.last_rtcp_sent_times.is_empty() && self.bye_state.is_none() {
self.min_rtcp_interval
} else {
self.min_rtcp_interval / 2
@ -971,8 +1149,12 @@ impl Session {
// 1_000_000_000 / (e-1.5)
let compensation_ns = 820_829_366;
trace!(
"members {n}, RTCP bandwidth {rtcp_bw}, average RTCP size {}",
self.average_rtcp_size
);
let t_nanos = (compensation_ns
.mul_div_round(self.average_rtcp_size as u64 * n, rtcp_bw as u64))
.mul_div_round(self.average_rtcp_size as u64 * n, rtcp_bw.max(1) as u64))
.unwrap()
.max(min_rtcp_interval.as_nanos() as u64);
trace!("deterministic rtcp interval {t_nanos}ns");
@ -995,12 +1177,96 @@ impl Session {
}
fn calculated_rtcp_duration(&self, we_sent: bool) -> Duration {
// From RFC 3550: only active data senders may use the reduced minimum
let dur = self.deterministic_rtcp_duration(we_sent);
rtcp_dither(dur)
}
let mut rng = rand::thread_rng();
// need a factor in [0.5, 1.5]
let factor = rng.gen::<f64>();
dur.mul_f64(factor + 0.5)
// returns whether to keep the feedback
fn request_early_rtcp(&mut self, now: Instant, max_delay: Duration) -> RequestEarlyRtcpResult {
// Implementation of RFC 4585 3.5.2
if !self.profile.is_feedback() {
warn!("requested early RTCP without relevant feedback RTP profile, ignoring");
return RequestEarlyRtcpResult::NotScheduled;
}
if let Some(early_time) = self.next_early_rtcp_time {
if now + max_delay >= early_time {
debug!("early RTCP already scheduled for {early_time:?}");
return RequestEarlyRtcpResult::Scheduled;
} else {
debug!("early RTCP already scheduled for {early_time:?} but too late");
return RequestEarlyRtcpResult::NotScheduled;
}
}
let Some(next_rtcp_send_time) = self.next_rtcp_send.time else {
debug!("No regular RTCP scheduled yet, scheduling now");
return RequestEarlyRtcpResult::TimerReconsideration;
};
// no regular RTCP sent yet, we cannot send early without the first regular RTCP being sent
let Some(&last_rtcp_sent) = self.last_rtcp_sent_times.front() else {
if now + max_delay >= next_rtcp_send_time {
debug!("early RTCP can't be scheduled until first regular RTCP is sent but regular RTCP scheduled early enough");
return RequestEarlyRtcpResult::Scheduled;
} else {
debug!("early RTCP can't be scheduled until first regular RTCP is sent and regular RTCP scheduled too late");
return RequestEarlyRtcpResult::NotScheduled;
}
};
self.update_point_to_point();
// safe as all previous invariants ensure that the `handle_time` must have been set
let last_rtcp_handle_time = self.last_rtcp_handle_time.unwrap();
let t_rr = self.rtcp_interval.unwrap();
let t_dither_max = if self.is_point_to_point {
Duration::from_secs(0)
} else {
t_rr / 2
};
if now + t_dither_max > next_rtcp_send_time {
if now + max_delay >= next_rtcp_send_time {
debug!("early RTCP not scheduled because regular RTCP is early enough");
return RequestEarlyRtcpResult::Scheduled;
} else {
debug!("early RTCP can't be scheduled because regular RTCP is scheduled soon but too late");
return RequestEarlyRtcpResult::NotScheduled;
}
}
let (allow_early, offset) = if last_rtcp_sent == last_rtcp_handle_time {
// last rtcp was a regular rtcp, we can send early after that
(true, Duration::ZERO)
} else if last_rtcp_handle_time + t_rr <= now + max_delay {
// 1. last rtcp packet was not regular rtcp so must be early rtcp
// 2. More than t_rr has passed since so a regular rtcp packet has been suppressed thus
// allowing early send again
(
true,
(last_rtcp_handle_time + t_rr).saturating_duration_since(now),
)
} else {
debug!("Can't allow early RTCP again yet");
(false, Duration::ZERO)
};
if !allow_early {
if next_rtcp_send_time - now < max_delay {
debug!("early RTCP not scheduled but regular RTCP scheduled time is soon enough");
RequestEarlyRtcpResult::Scheduled
} else {
debug!("early RTCP not scheduled and regular RTCP scheduled time is too late");
RequestEarlyRtcpResult::NotScheduled
}
} else {
let mut rng = rand::thread_rng();
self.next_early_rtcp_time = Some(now + t_dither_max.mul_f64(rng.gen()) + offset);
debug!("early RTCP scheduled at {:?}", self.next_early_rtcp_time);
RequestEarlyRtcpResult::TimerReconsideration
}
}
pub fn schedule_bye(&mut self, reason: &str, now: Instant) {
@ -1008,10 +1274,6 @@ impl Session {
return;
}
if self.n_members() <= 50 {
return;
}
for source in self.local_senders.values_mut() {
source.mark_bye(reason);
}
@ -1019,6 +1281,10 @@ impl Session {
source.mark_bye(reason);
}
if self.n_members() <= 50 {
return;
}
self.bye_state = Some(ByeState {
members: 1,
pmembers: 1,
@ -1028,15 +1294,27 @@ impl Session {
self.last_rtcp_sent_times.push_front(now);
// FIXME: use actual BYE packet size
self.average_rtcp_size = 100;
self.update_point_to_point();
let mut interval = self.rtcp_interval();
self.rtcp_interval = Some(interval);
// Handle T_rr_interval
if self.profile.is_feedback() && !self.min_rtcp_interval.is_zero() {
let new_interval = interval.max(rtcp_dither(self.min_rtcp_interval));
trace!("Updating interval from {interval:?} to {new_interval:?}");
interval = new_interval;
}
self.next_rtcp_send = RtcpTimeMembers {
time: Some(self.next_rtcp_time(now)),
time: Some(now + interval),
p_members: self.n_members(),
};
trace!("next rtcp time {:?}", self.next_rtcp_send.time);
}
fn next_rtcp_time(&self, now: Instant) -> Instant {
now + self
.calculated_rtcp_duration(!self.local_senders.is_empty() && self.bye_state.is_none())
fn rtcp_interval(&self) -> Duration {
let interval = self
.calculated_rtcp_duration(!self.local_senders.is_empty() && self.bye_state.is_none());
trace!("Calculated RTCP interval {interval:?}");
interval
}
/// Retrieve a list of all ssrc's currently handled by this session
@ -1091,6 +1369,13 @@ fn generate_ssrc() -> u32 {
rng.gen::<u32>()
}
fn rtcp_dither(dur: Duration) -> Duration {
let mut rng = rand::thread_rng();
// need a factor in [0.5, 1.5]
let factor = rng.gen::<f64>();
dur.mul_f64(factor + 0.5)
}
#[cfg(test)]
pub(crate) mod tests {
use rtp_types::RtpPacketBuilder;
@ -1160,7 +1445,7 @@ pub(crate) mod tests {
fn generate_rtp_packet(ssrc: u32, seq_no: u16, rtp_ts: u32, payload_len: usize) -> Vec<u8> {
init_logs();
let mut rtp_data = [0; 128];
let mut rtp_data = [0; 1200];
let payload = vec![1; payload_len];
let len = RtpPacketBuilder::new()
.payload_type(TEST_PT)
@ -1181,6 +1466,27 @@ pub(crate) mod tests {
(new_now, ntp_now + new_now.duration_since(old_now))
}
fn next_rtcp_packet(
session: &mut Session,
mut now: Instant,
mut ntp_now: SystemTime,
) -> (Vec<u8>, Instant, SystemTime) {
let mut ret = None;
while ret.is_none() {
ret = session.poll_rtcp_send(now, ntp_now);
if let Some(rtcp) = ret {
return (rtcp, now, ntp_now);
} else {
(now, ntp_now) = increment_rtcp_times(
now,
session.poll_rtcp_send_timeout(now).unwrap(),
ntp_now,
);
}
}
unreachable!();
}
#[test]
fn send_new_ssrc() {
init_logs();
@ -1262,10 +1568,8 @@ pub(crate) mod tests {
RecvReply::Passthrough
);
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let rtcp = Compound::parse(&rtcp_data).unwrap();
let mut n_rb_ssrcs = 0;
let mut found_sdes_cname = false;
@ -1328,8 +1632,10 @@ pub(crate) mod tests {
// generate packets at the 'same time' as rtcp so some calculated timestamps will match
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
// ensure that timer reconsideration will not suppress
let (now, ntp_now) = increment_rtcp_times(now, now + RTCP_MIN_REPORT_INTERVAL, ntp_now);
let rtp_data = generate_rtp_packet(ssrcs[0], 100, 4, 8);
let rtp_data = generate_rtp_packet(ssrcs[0], 100, 4, 1024);
let packet = RtpPacket::parse(&rtp_data).unwrap();
assert_eq!(
session.handle_send(&packet, now),
@ -1337,7 +1643,7 @@ pub(crate) mod tests {
);
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
let rtp_data = generate_rtp_packet(ssrcs[1], 200, 4, 8);
let rtp_data = generate_rtp_packet(ssrcs[1], 200, 4, 1024);
let packet = RtpPacket::parse(&rtp_data).unwrap();
assert_eq!(
session.handle_send(&packet, now),
@ -1357,7 +1663,7 @@ pub(crate) mod tests {
}
// we sent 1 packet on each ssrc, rtcp should reflect that
assert_eq!(sr.packet_count(), 1);
assert_eq!(sr.octet_count() as usize, 8);
assert_eq!(sr.octet_count() as usize, 1024);
assert_eq!(
sr.ntp_timestamp(),
system_time_to_ntp_time_u64(ntp_now).as_u64()
@ -1427,11 +1733,8 @@ pub(crate) mod tests {
vec![]
);
// generate packets at the 'same time' as rtcp so some calculated timestamps will match
let (new_now, new_ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let (rtcp_data, _new_now, new_ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp_data = session.poll_rtcp_send(new_now, new_ntp_now).unwrap();
let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp {
match p {
@ -1479,9 +1782,11 @@ pub(crate) mod tests {
// get the next rtcp packet times and send at the same time
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
// ensure that timer reconsideration will not suppress
let (now, ntp_now) = increment_rtcp_times(now, now + RTCP_MIN_REPORT_INTERVAL, ntp_now);
// send from two ssrcs
let rtp_data = generate_rtp_packet(ssrcs[0], 100, 0, 4);
let rtp_data = generate_rtp_packet(ssrcs[0], 100, 0, 1024);
let packet = RtpPacket::parse(&rtp_data).unwrap();
assert_eq!(
session.handle_send(&packet, now),
@ -1489,7 +1794,7 @@ pub(crate) mod tests {
);
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
let rtp_data = generate_rtp_packet(ssrcs[1], 200, 0, 4);
let rtp_data = generate_rtp_packet(ssrcs[1], 200, 0, 1024);
let packet = RtpPacket::parse(&rtp_data).unwrap();
assert_eq!(
session.handle_send(&packet, now),
@ -1535,11 +1840,8 @@ pub(crate) mod tests {
RecvReply::Passthrough
);
// get the next rtcp packet
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
trace!("rtcp data {rtcp_data:?}");
let rtcp = Compound::parse(&rtcp_data).unwrap();
let mut n_sr_ssrcs = 0;
@ -1580,11 +1882,8 @@ pub(crate) mod tests {
RecvReply::Passthrough
);
// get the next rtcp packet
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp {
trace!("{p:?}");
@ -1620,11 +1919,8 @@ pub(crate) mod tests {
);
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
// get the next rtcp packet
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp {
trace!("{p:?}");
@ -1640,10 +1936,7 @@ pub(crate) mod tests {
let mut seen_rr = false;
for _ in 0..=5 {
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp {
trace!("{p:?}");
@ -1681,11 +1974,7 @@ pub(crate) mod tests {
);
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
// get the next rtcp packet
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp {
@ -1845,9 +2134,7 @@ pub(crate) mod tests {
);
// send initial rtcp
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let _rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let (_rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp = Compound::builder().add_packet(Bye::builder().add_source(ssrc));
let mut data = vec![0; 128];
@ -1880,9 +2167,7 @@ pub(crate) mod tests {
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
// send initial rtcp
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let _rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let (_rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let source = session.mut_local_send_source_by_ssrc(ssrc).unwrap();
source.mark_bye("Cya");
@ -1891,9 +2176,7 @@ pub(crate) mod tests {
// data after bye should be dropped
assert_eq!(session.handle_send(&packet, now), SendReply::Drop);
let (now, ntp_now) =
increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now);
let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp = Compound::parse(&rtcp_data).unwrap();
let mut received_bye = false;
@ -1914,4 +2197,156 @@ pub(crate) mod tests {
}
assert!(received_bye);
}
#[test]
fn early_rtcp() {
let mut session = Session::new();
session.set_pt_clock_rate(TEST_PT, TEST_CLOCK_RATE);
session.set_profile(RtpProfile::Avpf);
let now = Instant::now();
let ntp_now = SystemTime::now();
let ssrc = 0x11223344;
let send_ssrc = 0x55667788;
let send2_ssrc = 0x99aabbcc;
let rtp_data = generate_rtp_packet(ssrc, 500, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
session_recv_first_packet_disable_probation(&mut session, &packet, now);
assert_eq!(
session.handle_recv(&packet, None, now),
RecvReply::Passthrough
);
let rtp_data = generate_rtp_packet(send_ssrc, 500, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
assert_eq!(
session.handle_send(&packet, now),
SendReply::NewSsrc(send_ssrc, TEST_PT)
);
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
let rtp_data = generate_rtp_packet(send2_ssrc, 500, 0, 4);
let packet = RtpPacket::parse(&rtp_data).unwrap();
assert_eq!(
session.handle_send(&packet, now),
SendReply::NewSsrc(send2_ssrc, TEST_PT)
);
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
// complete first regular rtcp
let (_rtcp_data, now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let source = session.mut_local_send_source_by_ssrc(send_ssrc).unwrap();
source.set_sdes_item(SdesItem::NAME, b"name");
// request early rtcp. The resulting RTCP packet, will only have the CNAME sdes item and a
// single SR/RR.
session.request_early_rtcp(now, RTCP_MIN_REPORT_INTERVAL);
assert!(session.next_early_rtcp_time.is_some());
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let rtcp = Compound::parse(&rtcp_data).unwrap();
let mut n_sr_ssrc = 0;
let mut n_sdes = 0;
for p in rtcp {
trace!("{p:?}");
match p {
Ok(Packet::Sr(sr)) => {
assert!([send_ssrc, send2_ssrc].contains(&sr.ssrc()));
n_sr_ssrc += 1;
}
Ok(Packet::Sdes(sdes)) => {
assert_eq!(n_sdes, 0);
for chunk in sdes.chunks() {
assert!([send_ssrc, send2_ssrc].contains(&chunk.ssrc()));
for item in chunk.items() {
assert_eq!(item.type_(), SdesItem::CNAME);
n_sdes += 1;
}
}
}
_ => unreachable!(),
}
}
assert_eq!(n_sdes, 2);
assert_eq!(n_sr_ssrc, 1);
}
#[test]
fn point_to_point() {
let mut session = Session::new();
assert!(session.is_point_to_point);
session.set_pt_clock_rate(TEST_PT, TEST_CLOCK_RATE);
let now = Instant::now();
let ntp_now = SystemTime::now();
let recv_ssrc = 0x11223344;
let recv2_ssrc = 0x55667788;
let from = "127.0.0.1:8080".parse().unwrap();
// single remote cname is still point to point
let mut data = vec![0; 128];
let len = Compound::builder()
.add_packet(
Sdes::builder().add_chunk(
SdesChunk::builder(recv_ssrc)
.add_item(SdesItem::builder(SdesItem::CNAME, "cname1")),
),
)
.write_into(&mut data)
.unwrap();
let rtcp = Compound::parse(&data[..len]).unwrap();
assert_eq!(
session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now),
vec![RtcpRecvReply::NewSsrc(recv_ssrc)]
);
assert!(session.is_point_to_point);
// multiple ssrc but same cname is still point to point
let mut data = vec![0; 128];
let len = Compound::builder()
.add_packet(
Sdes::builder().add_chunk(
SdesChunk::builder(recv_ssrc)
.add_item(SdesItem::builder(SdesItem::CNAME, "cname1")),
),
)
.add_packet(
Sdes::builder().add_chunk(
SdesChunk::builder(recv2_ssrc)
.add_item(SdesItem::builder(SdesItem::CNAME, "cname1")),
),
)
.write_into(&mut data)
.unwrap();
let rtcp = Compound::parse(&data[..len]).unwrap();
assert_eq!(
session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now),
vec![RtcpRecvReply::NewSsrc(recv2_ssrc)]
);
assert!(session.is_point_to_point);
// multiple ssrc and different cname is NOT point to point
let mut data = vec![0; 128];
let len = Compound::builder()
.add_packet(
Sdes::builder().add_chunk(
SdesChunk::builder(recv_ssrc)
.add_item(SdesItem::builder(SdesItem::CNAME, "cname1")),
),
)
.add_packet(
Sdes::builder().add_chunk(
SdesChunk::builder(recv2_ssrc)
.add_item(SdesItem::builder(SdesItem::CNAME, "cname2")),
),
)
.write_into(&mut data)
.unwrap();
let rtcp = Compound::parse(&data[..len]).unwrap();
assert_eq!(
session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now),
vec![]
);
assert!(!session.is_point_to_point);
}
}

View file

@ -722,6 +722,11 @@ impl RemoteSendSource {
}
}
/// Retrieve the SDES items currently received for this remote sender
pub fn sdes(&self) -> &HashMap<u8, String> {
&self.source.sdes
}
pub(crate) fn set_last_activity(&mut self, time: Instant) {
self.source.last_activity = time;
}
@ -1004,6 +1009,11 @@ impl RemoteReceiveSource {
}
}
/// Retrieve the SDES items currently received for this remote receiver
pub fn sdes(&self) -> &HashMap<u8, String> {
&self.source.sdes
}
/// Retrieve the last time that activity was seen on this source
pub fn last_activity(&self) -> Instant {
self.source.last_activity