diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index a5e079ab..60062713 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7263,6 +7263,18 @@ "type": "guint", "writable": true }, + "rtp-profile": { + "blurb": "RTP Profile to use", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "avp (0)", + "mutable": "ready", + "readable": true, + "type": "GstRtpBin2Profile", + "writable": true + }, "stats": { "blurb": "Statistics about the session", "conditionally-available": false, diff --git a/net/rtp/src/rtpbin2/imp.rs b/net/rtp/src/rtpbin2/imp.rs index 3953b304..bfddf6e4 100644 --- a/net/rtp/src/rtpbin2/imp.rs +++ b/net/rtp/src/rtpbin2/imp.rs @@ -12,7 +12,9 @@ use futures::StreamExt; use gst::{glib, prelude::*, subclass::prelude::*}; use once_cell::sync::Lazy; -use super::session::{RecvReply, RtcpRecvReply, SendReply, Session, RTCP_MIN_REPORT_INTERVAL}; +use super::session::{ + RecvReply, RtcpRecvReply, RtpProfile, SendReply, Session, RTCP_MIN_REPORT_INTERVAL, +}; use super::source::{ReceivedRb, SourceState}; use crate::rtpbin2::RUNTIME; @@ -28,10 +30,40 @@ static CAT: Lazy = Lazy::new(|| { ) }); +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, glib::Enum)] +#[repr(u32)] +#[enum_type(name = "GstRtpBin2Profile")] +enum Profile { + #[default] + #[enum_value(name = "AVP profile as specified in RFC 3550", nick = "avp")] + Avp, + #[enum_value(name = "AVPF profile as specified in RFC 4585", nick = "avpf")] + Avpf, +} + +impl From for Profile { + fn from(value: RtpProfile) -> Self { + match value { + RtpProfile::Avp => Self::Avp, + RtpProfile::Avpf => Self::Avpf, + } + } +} + +impl From for RtpProfile { + fn from(value: Profile) -> Self { + match value { + Profile::Avp => Self::Avp, + Profile::Avpf => Self::Avpf, + } + } +} + #[derive(Debug, Clone)] struct Settings { latency: gst::ClockTime, min_rtcp_interval: Duration, + profile: Profile, } impl Default for Settings { @@ -39,6 +71,7 @@ impl Default for Settings { Settings { latency: DEFAULT_LATENCY, min_rtcp_interval: DEFAULT_MIN_RTCP_INTERVAL, + profile: Profile::default(), } } } @@ -121,10 +154,15 @@ struct BinSession { } impl BinSession { - fn new(id: usize, min_rtcp_interval: Duration) -> Self { + fn new(id: usize, settings: &Settings) -> Self { + let mut inner = BinSessionInner::new(id); + inner + .session + .set_min_rtcp_interval(settings.min_rtcp_interval); + inner.session.set_profile(settings.profile.into()); Self { id, - inner: Arc::new(Mutex::new(BinSessionInner::new(id, min_rtcp_interval))), + inner: Arc::new(Mutex::new(inner)), } } } @@ -156,9 +194,7 @@ struct BinSessionInner { } impl BinSessionInner { - fn new(id: usize, min_rtcp_interval: Duration) -> Self { - let mut session = Session::new(); - session.set_min_rtcp_interval(min_rtcp_interval); + fn new(id: usize) -> Self { Self { id, @@ -939,6 +975,12 @@ impl ObjectImpl for RtpBin2 { .blurb("Statistics about the session") .read_only() .build(), + glib::ParamSpecEnum::builder::("rtp-profile") + .nick("RTP Profile") + .blurb("RTP Profile to use") + .default_value(Profile::default()) + .mutable_ready() + .build(), ] }); @@ -966,6 +1008,10 @@ impl ObjectImpl for RtpBin2 { value.get::().expect("type checked upstream").into(), ); } + "rtp-profile" => { + let mut settings = self.settings.lock().unwrap(); + settings.profile = value.get::().expect("Type checked upstream"); + } _ => unimplemented!(), } } @@ -984,6 +1030,10 @@ impl ObjectImpl for RtpBin2 { let state = self.state.lock().unwrap(); state.stats().to_value() } + "rtp-profile" => { + let settings = self.settings.lock().unwrap(); + settings.profile.to_value() + } _ => unimplemented!(), } } @@ -1070,7 +1120,7 @@ impl ElementImpl for RtpBin2 { _caps: Option<&gst::Caps>, // XXX: do something with caps? ) -> Option { let this = self.obj(); - let min_rtcp_interval = self.settings.lock().unwrap().min_rtcp_interval; + let settings = self.settings.lock().unwrap().clone(); let mut state = self.state.lock().unwrap(); let max_session_id = state.max_session_id; @@ -1135,7 +1185,7 @@ impl ElementImpl for RtpBin2 { new_pad(&mut session) } } else { - let session = BinSession::new(id, min_rtcp_interval); + let session = BinSession::new(id, &settings); let mut inner = session.inner.lock().unwrap(); let ret = new_pad(&mut inner); drop(inner); @@ -1178,7 +1228,7 @@ impl ElementImpl for RtpBin2 { new_pad(&mut session) } } else { - let session = BinSession::new(id, min_rtcp_interval); + let session = BinSession::new(id, &settings); let mut inner = session.inner.lock().unwrap(); let ret = new_pad(&mut inner); drop(inner); diff --git a/net/rtp/src/rtpbin2/session.rs b/net/rtp/src/rtpbin2/session.rs index cb51b26c..062e5733 100644 --- a/net/rtp/src/rtpbin2/session.rs +++ b/net/rtp/src/rtpbin2/session.rs @@ -20,7 +20,6 @@ use gst::prelude::MulDiv; // TODO: make configurable pub const RTCP_MIN_REPORT_INTERVAL: Duration = Duration::from_secs(5); -// TODO: reduced minimum interval? (360 / session bandwidth) const RTCP_SOURCE_TIMEOUT_N_INTERVALS: u32 = 5; const RTCP_ADDRESS_CONFLICT_TIMEOUT: Duration = RTCP_MIN_REPORT_INTERVAL.saturating_mul(12); @@ -42,19 +41,36 @@ struct ByeState { pmembers: usize, } +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum RtpProfile { + #[default] + Avp, + Avpf, +} + +impl RtpProfile { + fn is_feedback(&self) -> bool { + self == &Self::Avpf + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RequestEarlyRtcpResult { + NotScheduled, + Scheduled, + TimerReconsideration, +} + #[derive(Debug)] pub struct Session { // settings min_rtcp_interval: Duration, + profile: RtpProfile, // state local_senders: HashMap, local_receivers: HashMap, remote_receivers: HashMap, remote_senders: HashMap, - last_rtcp_sent_times: VecDeque, - // holds the next rtcp send time and the number of members at the time when the time was - // calculated - next_rtcp_send: RtcpTimeMembers, average_rtcp_size: usize, last_sent_data: Option, hold_buffer_counter: usize, @@ -64,6 +80,20 @@ pub struct Session { // used when we have not sent anything but need a ssrc for Rr internal_rtcp_sender_src: Option, bye_state: Option, + is_point_to_point: bool, + + // rtcp scheduling state + // T_rr: holds the interval used to calculate the current `next_rtcp_send` + rtcp_interval: Option, + // tp: last time that any rtcp was handled + last_rtcp_handle_time: Option, + // tn: holds the next regular rtcp send time and the number of members at the + // time when the time was calculated + next_rtcp_send: RtcpTimeMembers, + // T_rr_last: last two times a regular rtcp packet was sent + last_rtcp_sent_times: VecDeque, + // time for the next early rtcp to be sent + next_early_rtcp_time: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -117,12 +147,14 @@ impl Session { sdes.insert(SdesItem::CNAME, cname); Self { min_rtcp_interval: RTCP_MIN_REPORT_INTERVAL, + profile: RtpProfile::default(), local_senders: HashMap::new(), // also known as remote_senders local_receivers: HashMap::new(), remote_receivers: HashMap::new(), remote_senders: HashMap::new(), last_rtcp_sent_times: VecDeque::new(), + rtcp_interval: None, next_rtcp_send: RtcpTimeMembers { time: None, p_members: 0, @@ -135,14 +167,22 @@ impl Session { conflicting_addresses: HashMap::new(), internal_rtcp_sender_src: None, bye_state: None, + next_early_rtcp_time: None, + last_rtcp_handle_time: None, + is_point_to_point: true, } } - /// Set the minimum RTCP interval to use for this session + /// Set the minimum (regular) RTCP interval to use for this session pub fn set_min_rtcp_interval(&mut self, min_rtcp_interval: Duration) { self.min_rtcp_interval = min_rtcp_interval; } + /// Set the RTP profile to use. + pub fn set_profile(&mut self, profile: RtpProfile) { + self.profile = profile; + } + fn n_members(&self) -> usize { self.bye_state .as_ref() @@ -191,6 +231,28 @@ impl Session { .unwrap_or(self.next_rtcp_send.p_members) } + fn update_point_to_point(&mut self) { + // This definition of point to point is from RFC 8108 + // i.e. If there are multiple remote CNAMEs, then we are not point to point. + // + // XXX: We currently don't provide external overriding for this + let mut cname = None; + self.is_point_to_point = self + .remote_senders + .values() + .filter_map(|source| source.sdes().get(&SdesItem::CNAME)) + .chain( + self.remote_receivers + .values() + .filter_map(|source| source.sdes().get(&SdesItem::CNAME)), + ) + .all(|cn| { + let ret = cname.is_none() || cname == Some(cn); + cname = cname.or(Some(cn)); + ret + }); + } + /// Set the RTP clock rate for a particular payload type pub fn set_pt_clock_rate(&mut self, pt: u8, clock_rate: u32) { self.pt_map.insert(pt, clock_rate); @@ -409,6 +471,8 @@ impl Session { let dur = prev.saturating_duration_since(now); if self.next_rtcp_send.p_members > 0 { let member_factor = initial_n_members as f64 / self.next_rtcp_send.p_members as f64; + // FIXME: Does something have to be adjusted here in feedback profiles for + // T_rr_interval? *prev = now + dur.mul_f64(member_factor); self.next_rtcp_send.p_members = n_members; if let Some(last_rtcp) = self.last_rtcp_sent_times.front_mut() { @@ -592,9 +656,12 @@ impl Session { } } Ok(Packet::Unknown(_unk)) => (), + // TODO: in RFC4585 profile, need to listen for feedback messages and remove any + // that we would have sent Err(_) => (), } } + self.update_point_to_point(); replies } @@ -603,6 +670,8 @@ impl Session { mut rtcp: CompoundBuilder<'a>, now: Instant, ntp_now: SystemTime, + minimum: bool, // RFC 4585 + ssrcs_reported: &mut Vec, ) -> CompoundBuilder<'a> { let ntp_time = system_time_to_ntp_time_u64(ntp_now); if self @@ -640,7 +709,6 @@ impl Session { // assume that the rtp times and clock times advance at a rate // close to 1.0 and do a direct linear extrapolation to get the rtp // time for 'now' - trace!("clock-rate {clock_rate}"); (dur_since_last_rtp.as_nanos() as u64).mul_div_round( clock_rate as u64, gst::ClockTime::SECOND.nseconds(), @@ -657,15 +725,29 @@ impl Session { .rtp_timestamp(rtp_timestamp); sender_srs.push((sender.ssrc(), ntp_now, ntp_time, rtp_timestamp)); + if !ssrcs_reported.iter().any(|&ssrc| ssrc == sender.ssrc()) { + ssrcs_reported.push(sender.ssrc()); + } - for sender in self.remote_senders.values() { - if sender.state() != SourceState::Normal { - continue; + // Don't include RBs in minimal RTCP packets + if !minimum { + for sender in self.remote_senders.values() { + if sender.state() != SourceState::Normal { + continue; + } + let rb = sender.generate_report_block(ntp_now); + sr = sr.add_report_block(rb.into()); + if !ssrcs_reported.iter().any(|&ssrc| ssrc == sender.ssrc()) { + ssrcs_reported.push(sender.ssrc()); + } } - let rb = sender.generate_report_block(ntp_now); - sr = sr.add_report_block(rb.into()); } rtcp = rtcp.add_packet(sr); + + // A minimal RTCP packet only contains a single SR + if minimum { + break; + } } for (ssrc, ntp_now, ntp_time, rtp_timestamp) in sender_srs { self.local_senders @@ -711,6 +793,8 @@ impl Session { mut rtcp: CompoundBuilder<'a>, now: Instant, ntp_now: SystemTime, + minimum: bool, // RFC 4585 + ssrcs_reported: &mut Vec, ) -> CompoundBuilder<'a> { if self .local_senders @@ -729,32 +813,44 @@ impl Session { .entry(ssrc) .and_modify(|source| source.set_last_activity(now)); let mut rr = ReceiverReport::builder(ssrc); - for sender in self.remote_senders.values() { - if sender.state() != SourceState::Normal { - continue; + + // Don't include RBs in minimal RTCP packets + if !minimum { + for sender in self.remote_senders.values() { + if sender.state() != SourceState::Normal { + continue; + } + let rb = sender.generate_report_block(ntp_now); + rr = rr.add_report_block(rb.into()); + if !ssrcs_reported.iter().any(|&ssrc| ssrc == sender.ssrc()) { + ssrcs_reported.push(sender.ssrc()); + } } - let rb = sender.generate_report_block(ntp_now); - rr = rr.add_report_block(rb.into()); } + rtcp = rtcp.add_packet(rr); } rtcp } - fn generate_sdes<'a>(&self, rtcp: CompoundBuilder<'a>) -> CompoundBuilder<'a> { + fn generate_sdes<'a>( + &self, + rtcp: CompoundBuilder<'a>, + minimum: bool, // RFC 4585 + ) -> CompoundBuilder<'a> { let mut sdes = Sdes::builder(); let mut have_chunk = false; - if !self.local_senders.is_empty() { - for sender in self.local_senders.values() { - let sdes_map = sender.sdes(); - if !sdes_map.is_empty() { - let mut chunk = SdesChunk::builder(sender.ssrc()); - for (ty, val) in sdes_map { + for sender in self.local_senders.values() { + let sdes_map = sender.sdes(); + if !sdes_map.is_empty() { + let mut chunk = SdesChunk::builder(sender.ssrc()); + for (ty, val) in sdes_map { + if !minimum || *ty == SdesItem::CNAME { chunk = chunk.add_item_owned(SdesItem::builder(*ty, val)); } - have_chunk = true; - sdes = sdes.add_chunk(chunk); } + have_chunk = true; + sdes = sdes.add_chunk(chunk); } } for receiver in self.local_receivers.values() { @@ -762,7 +858,9 @@ impl Session { if !sdes_map.is_empty() { let mut chunk = SdesChunk::builder(receiver.ssrc()); for (ty, val) in sdes_map { - chunk = chunk.add_item_owned(SdesItem::builder(*ty, val)); + if !minimum || *ty == SdesItem::CNAME { + chunk = chunk.add_item_owned(SdesItem::builder(*ty, val)); + } } have_chunk = true; sdes = sdes.add_chunk(chunk); @@ -834,7 +932,10 @@ impl Session { // RFC 3550 6.3.5 fn handle_timeouts(&mut self, now: Instant) { trace!("handling rtcp timeouts"); - let td = RTCP_SOURCE_TIMEOUT_N_INTERVALS * self.deterministic_rtcp_duration(false); + let td = RTCP_SOURCE_TIMEOUT_N_INTERVALS + * self + .deterministic_rtcp_duration(false) + .max(Duration::from_secs(5)); // delete all sources that are too old self.local_receivers @@ -884,26 +985,57 @@ impl Session { .retain(|_addr, time| now - *time < RTCP_ADDRESS_CONFLICT_TIMEOUT); } - /// Produce a RTCP packet (or None if it is too early to send a RTCP packet). After this call returns - /// a packet, the next time to send a RTCP packet can be retrieved from `poll_rtcp_send_timeout` + /// Produce a RTCP packet (or `None` if it is too early to send a RTCP packet). After this call returns, + /// the next time to send a RTCP packet can be retrieved from `poll_rtcp_send_timeout` // TODO: return RtcpPacketBuilder thing pub fn poll_rtcp_send(&mut self, now: Instant, ntp_now: SystemTime) -> Option> { let Some(next_rtcp_send) = self.next_rtcp_send.time else { + trace!("no next check time yet"); return None; }; - if now < next_rtcp_send { - return None; + let is_early = self.next_early_rtcp_time.is_some() && !self.last_rtcp_sent_times.is_empty(); + + if is_early { + if let Some(next_early_rtcp_time) = self.next_early_rtcp_time { + if now < next_early_rtcp_time { + trace!("next early time {next_early_rtcp_time:?} not reached at {now:?}, nothing to produce"); + return None; + } + } + } else { + if now < next_rtcp_send { + trace!("next time {next_rtcp_send:?} not reached at {now:?}, nothing to produce"); + return None; + } + + // timer reconsideration + self.update_point_to_point(); + let interval = self.rtcp_interval(); + let test_next_rtcp_time = self.last_rtcp_handle_time.unwrap() + interval; + if test_next_rtcp_time > now { + trace!("timer reconsideration considers this wakeup {now:?} too early, nothing to produce. reconsidered time {test_next_rtcp_time:?}"); + self.next_rtcp_send.time = Some(test_next_rtcp_time); + return None; + } } - trace!("generating rtcp packet at {now:?}, ntp:{ntp_now:?}"); + debug!( + "generating rtcp packet at {now:?}, ntp:{ntp_now:?}, duration since last rtcp {:?}", + self.last_rtcp_sent_times + .front() + .copied() + .unwrap_or(now) + .duration_since(now) + ); - let data = { + let (data, ssrcs_reported) = { let mut rtcp = Compound::builder(); + let mut ssrcs_reported = vec![]; // TODO: implement round robin of sr/rrs - rtcp = self.generate_sr(rtcp, now, ntp_now); - rtcp = self.generate_rr(rtcp, now, ntp_now); - rtcp = self.generate_sdes(rtcp); + rtcp = self.generate_sr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported); + rtcp = self.generate_rr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported); + rtcp = self.generate_sdes(rtcp, is_early); rtcp = self.generate_bye(rtcp, now); let size = rtcp.calculate_size().unwrap(); @@ -911,36 +1043,68 @@ impl Session { assert!(size < RTCP_MTU); let mut data = vec![0; size]; rtcp.write_into(&mut data).unwrap(); - data + (data, ssrcs_reported) }; - for receiver in self.remote_senders.values_mut() { - receiver.update_last_rtcp(); + for ssrc in ssrcs_reported { + if let Some(receiver) = self.remote_senders.get_mut(&ssrc) { + receiver.update_last_rtcp(); + } } self.update_rtcp_average(data.len() + UDP_IP_OVERHEAD_BYTES); - self.handle_timeouts(now); + if !is_early { + self.handle_timeouts(now); - self.next_rtcp_send = RtcpTimeMembers { - time: Some(self.next_rtcp_time(now)), - p_members: self.n_members(), - }; - self.last_rtcp_sent_times.push_front(now); - while self.last_rtcp_sent_times.len() > 2 { - self.last_rtcp_sent_times.pop_back(); + self.update_point_to_point(); + let mut interval = self.rtcp_interval(); + self.rtcp_interval = Some(interval); + // Handle T_rr_interval + if self.profile.is_feedback() && !self.min_rtcp_interval.is_zero() { + let new_interval = interval.max(rtcp_dither(self.min_rtcp_interval)); + trace!("Updating interval from {interval:?} to {new_interval:?}"); + interval = new_interval; + } + self.next_rtcp_send = RtcpTimeMembers { + time: Some(now + interval), + p_members: self.n_members(), + }; + trace!("next rtcp time {:?}", self.next_rtcp_send.time); + self.last_rtcp_sent_times.push_front(now); + while self.last_rtcp_sent_times.len() > 2 { + self.last_rtcp_sent_times.pop_back(); + } + self.bye_state = None; } - self.bye_state = None; + self.last_rtcp_handle_time = Some(now); + self.next_early_rtcp_time = None; Some(data) } /// Returns the next time to send a RTCP packet. pub fn poll_rtcp_send_timeout(&mut self, now: Instant) -> Option { + trace!( + "poll-rtcp-send-timeout early time {:?}, next {:?}", + self.next_early_rtcp_time, + self.next_rtcp_send.time + ); + if let Some(early_time) = self.next_early_rtcp_time { + return Some(early_time); + } if self.next_rtcp_send.time.is_none() { + self.update_point_to_point(); + let interval = self.rtcp_interval(); + self.rtcp_interval = Some(interval); + self.last_rtcp_handle_time = Some(now); self.next_rtcp_send = RtcpTimeMembers { - time: Some(self.next_rtcp_time(now)), + time: Some(now + interval), p_members: self.n_members(), }; + trace!( + "poll-rtcp-send-timeout initial rtcp time {:?}", + self.next_rtcp_send.time + ); } self.next_rtcp_send.time } @@ -962,8 +1126,22 @@ impl Session { (n_members, rtcp_bw) }; - let min_rtcp_interval = if !self.last_rtcp_sent_times.is_empty() && self.bye_state.is_none() - { + let min_rtcp_interval = if we_sent { + Duration::ZERO + } else if self.profile.is_feedback() { + // RFC 4585 3.4d), 3.5.1 + // + // If not the first RTCP then Tmin is initialized to 0, otherwise to 1s. + // Also for a multi-party session it is initialized to 0. + if self.is_point_to_point + && !self.last_rtcp_sent_times.is_empty() + && self.bye_state.is_none() + { + Duration::ZERO + } else { + Duration::from_secs(1) + } + } else if !self.last_rtcp_sent_times.is_empty() && self.bye_state.is_none() { self.min_rtcp_interval } else { self.min_rtcp_interval / 2 @@ -971,8 +1149,12 @@ impl Session { // 1_000_000_000 / (e-1.5) let compensation_ns = 820_829_366; + trace!( + "members {n}, RTCP bandwidth {rtcp_bw}, average RTCP size {}", + self.average_rtcp_size + ); let t_nanos = (compensation_ns - .mul_div_round(self.average_rtcp_size as u64 * n, rtcp_bw as u64)) + .mul_div_round(self.average_rtcp_size as u64 * n, rtcp_bw.max(1) as u64)) .unwrap() .max(min_rtcp_interval.as_nanos() as u64); trace!("deterministic rtcp interval {t_nanos}ns"); @@ -995,12 +1177,96 @@ impl Session { } fn calculated_rtcp_duration(&self, we_sent: bool) -> Duration { + // From RFC 3550: only active data senders may use the reduced minimum let dur = self.deterministic_rtcp_duration(we_sent); + rtcp_dither(dur) + } - let mut rng = rand::thread_rng(); - // need a factor in [0.5, 1.5] - let factor = rng.gen::(); - dur.mul_f64(factor + 0.5) + // returns whether to keep the feedback + fn request_early_rtcp(&mut self, now: Instant, max_delay: Duration) -> RequestEarlyRtcpResult { + // Implementation of RFC 4585 3.5.2 + if !self.profile.is_feedback() { + warn!("requested early RTCP without relevant feedback RTP profile, ignoring"); + return RequestEarlyRtcpResult::NotScheduled; + } + + if let Some(early_time) = self.next_early_rtcp_time { + if now + max_delay >= early_time { + debug!("early RTCP already scheduled for {early_time:?}"); + return RequestEarlyRtcpResult::Scheduled; + } else { + debug!("early RTCP already scheduled for {early_time:?} but too late"); + return RequestEarlyRtcpResult::NotScheduled; + } + } + let Some(next_rtcp_send_time) = self.next_rtcp_send.time else { + debug!("No regular RTCP scheduled yet, scheduling now"); + return RequestEarlyRtcpResult::TimerReconsideration; + }; + + // no regular RTCP sent yet, we cannot send early without the first regular RTCP being sent + let Some(&last_rtcp_sent) = self.last_rtcp_sent_times.front() else { + if now + max_delay >= next_rtcp_send_time { + debug!("early RTCP can't be scheduled until first regular RTCP is sent but regular RTCP scheduled early enough"); + return RequestEarlyRtcpResult::Scheduled; + } else { + debug!("early RTCP can't be scheduled until first regular RTCP is sent and regular RTCP scheduled too late"); + return RequestEarlyRtcpResult::NotScheduled; + } + }; + + self.update_point_to_point(); + + // safe as all previous invariants ensure that the `handle_time` must have been set + let last_rtcp_handle_time = self.last_rtcp_handle_time.unwrap(); + + let t_rr = self.rtcp_interval.unwrap(); + let t_dither_max = if self.is_point_to_point { + Duration::from_secs(0) + } else { + t_rr / 2 + }; + + if now + t_dither_max > next_rtcp_send_time { + if now + max_delay >= next_rtcp_send_time { + debug!("early RTCP not scheduled because regular RTCP is early enough"); + return RequestEarlyRtcpResult::Scheduled; + } else { + debug!("early RTCP can't be scheduled because regular RTCP is scheduled soon but too late"); + return RequestEarlyRtcpResult::NotScheduled; + } + } + + let (allow_early, offset) = if last_rtcp_sent == last_rtcp_handle_time { + // last rtcp was a regular rtcp, we can send early after that + (true, Duration::ZERO) + } else if last_rtcp_handle_time + t_rr <= now + max_delay { + // 1. last rtcp packet was not regular rtcp so must be early rtcp + // 2. More than t_rr has passed since so a regular rtcp packet has been suppressed thus + // allowing early send again + ( + true, + (last_rtcp_handle_time + t_rr).saturating_duration_since(now), + ) + } else { + debug!("Can't allow early RTCP again yet"); + (false, Duration::ZERO) + }; + + if !allow_early { + if next_rtcp_send_time - now < max_delay { + debug!("early RTCP not scheduled but regular RTCP scheduled time is soon enough"); + RequestEarlyRtcpResult::Scheduled + } else { + debug!("early RTCP not scheduled and regular RTCP scheduled time is too late"); + RequestEarlyRtcpResult::NotScheduled + } + } else { + let mut rng = rand::thread_rng(); + self.next_early_rtcp_time = Some(now + t_dither_max.mul_f64(rng.gen()) + offset); + debug!("early RTCP scheduled at {:?}", self.next_early_rtcp_time); + RequestEarlyRtcpResult::TimerReconsideration + } } pub fn schedule_bye(&mut self, reason: &str, now: Instant) { @@ -1008,10 +1274,6 @@ impl Session { return; } - if self.n_members() <= 50 { - return; - } - for source in self.local_senders.values_mut() { source.mark_bye(reason); } @@ -1019,6 +1281,10 @@ impl Session { source.mark_bye(reason); } + if self.n_members() <= 50 { + return; + } + self.bye_state = Some(ByeState { members: 1, pmembers: 1, @@ -1028,15 +1294,27 @@ impl Session { self.last_rtcp_sent_times.push_front(now); // FIXME: use actual BYE packet size self.average_rtcp_size = 100; + self.update_point_to_point(); + let mut interval = self.rtcp_interval(); + self.rtcp_interval = Some(interval); + // Handle T_rr_interval + if self.profile.is_feedback() && !self.min_rtcp_interval.is_zero() { + let new_interval = interval.max(rtcp_dither(self.min_rtcp_interval)); + trace!("Updating interval from {interval:?} to {new_interval:?}"); + interval = new_interval; + } self.next_rtcp_send = RtcpTimeMembers { - time: Some(self.next_rtcp_time(now)), + time: Some(now + interval), p_members: self.n_members(), }; + trace!("next rtcp time {:?}", self.next_rtcp_send.time); } - fn next_rtcp_time(&self, now: Instant) -> Instant { - now + self - .calculated_rtcp_duration(!self.local_senders.is_empty() && self.bye_state.is_none()) + fn rtcp_interval(&self) -> Duration { + let interval = self + .calculated_rtcp_duration(!self.local_senders.is_empty() && self.bye_state.is_none()); + trace!("Calculated RTCP interval {interval:?}"); + interval } /// Retrieve a list of all ssrc's currently handled by this session @@ -1091,6 +1369,13 @@ fn generate_ssrc() -> u32 { rng.gen::() } +fn rtcp_dither(dur: Duration) -> Duration { + let mut rng = rand::thread_rng(); + // need a factor in [0.5, 1.5] + let factor = rng.gen::(); + dur.mul_f64(factor + 0.5) +} + #[cfg(test)] pub(crate) mod tests { use rtp_types::RtpPacketBuilder; @@ -1160,7 +1445,7 @@ pub(crate) mod tests { fn generate_rtp_packet(ssrc: u32, seq_no: u16, rtp_ts: u32, payload_len: usize) -> Vec { init_logs(); - let mut rtp_data = [0; 128]; + let mut rtp_data = [0; 1200]; let payload = vec![1; payload_len]; let len = RtpPacketBuilder::new() .payload_type(TEST_PT) @@ -1181,6 +1466,27 @@ pub(crate) mod tests { (new_now, ntp_now + new_now.duration_since(old_now)) } + fn next_rtcp_packet( + session: &mut Session, + mut now: Instant, + mut ntp_now: SystemTime, + ) -> (Vec, Instant, SystemTime) { + let mut ret = None; + while ret.is_none() { + ret = session.poll_rtcp_send(now, ntp_now); + if let Some(rtcp) = ret { + return (rtcp, now, ntp_now); + } else { + (now, ntp_now) = increment_rtcp_times( + now, + session.poll_rtcp_send_timeout(now).unwrap(), + ntp_now, + ); + } + } + unreachable!(); + } + #[test] fn send_new_ssrc() { init_logs(); @@ -1262,10 +1568,8 @@ pub(crate) mod tests { RecvReply::Passthrough ); - let (now, ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); + let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); - let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap(); let mut n_rb_ssrcs = 0; let mut found_sdes_cname = false; @@ -1328,8 +1632,10 @@ pub(crate) mod tests { // generate packets at the 'same time' as rtcp so some calculated timestamps will match let (now, ntp_now) = increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); + // ensure that timer reconsideration will not suppress + let (now, ntp_now) = increment_rtcp_times(now, now + RTCP_MIN_REPORT_INTERVAL, ntp_now); - let rtp_data = generate_rtp_packet(ssrcs[0], 100, 4, 8); + let rtp_data = generate_rtp_packet(ssrcs[0], 100, 4, 1024); let packet = RtpPacket::parse(&rtp_data).unwrap(); assert_eq!( session.handle_send(&packet, now), @@ -1337,7 +1643,7 @@ pub(crate) mod tests { ); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); - let rtp_data = generate_rtp_packet(ssrcs[1], 200, 4, 8); + let rtp_data = generate_rtp_packet(ssrcs[1], 200, 4, 1024); let packet = RtpPacket::parse(&rtp_data).unwrap(); assert_eq!( session.handle_send(&packet, now), @@ -1357,7 +1663,7 @@ pub(crate) mod tests { } // we sent 1 packet on each ssrc, rtcp should reflect that assert_eq!(sr.packet_count(), 1); - assert_eq!(sr.octet_count() as usize, 8); + assert_eq!(sr.octet_count() as usize, 1024); assert_eq!( sr.ntp_timestamp(), system_time_to_ntp_time_u64(ntp_now).as_u64() @@ -1427,11 +1733,8 @@ pub(crate) mod tests { vec![] ); - // generate packets at the 'same time' as rtcp so some calculated timestamps will match - let (new_now, new_ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); + let (rtcp_data, _new_now, new_ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); - let rtcp_data = session.poll_rtcp_send(new_now, new_ntp_now).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { match p { @@ -1479,9 +1782,11 @@ pub(crate) mod tests { // get the next rtcp packet times and send at the same time let (now, ntp_now) = increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); + // ensure that timer reconsideration will not suppress + let (now, ntp_now) = increment_rtcp_times(now, now + RTCP_MIN_REPORT_INTERVAL, ntp_now); // send from two ssrcs - let rtp_data = generate_rtp_packet(ssrcs[0], 100, 0, 4); + let rtp_data = generate_rtp_packet(ssrcs[0], 100, 0, 1024); let packet = RtpPacket::parse(&rtp_data).unwrap(); assert_eq!( session.handle_send(&packet, now), @@ -1489,7 +1794,7 @@ pub(crate) mod tests { ); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); - let rtp_data = generate_rtp_packet(ssrcs[1], 200, 0, 4); + let rtp_data = generate_rtp_packet(ssrcs[1], 200, 0, 1024); let packet = RtpPacket::parse(&rtp_data).unwrap(); assert_eq!( session.handle_send(&packet, now), @@ -1535,11 +1840,8 @@ pub(crate) mod tests { RecvReply::Passthrough ); - // get the next rtcp packet - let (now, ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); + let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); - let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); trace!("rtcp data {rtcp_data:?}"); let rtcp = Compound::parse(&rtcp_data).unwrap(); let mut n_sr_ssrcs = 0; @@ -1580,11 +1882,8 @@ pub(crate) mod tests { RecvReply::Passthrough ); - // get the next rtcp packet - let (now, ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); + let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); - let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { trace!("{p:?}"); @@ -1620,11 +1919,8 @@ pub(crate) mod tests { ); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); - // get the next rtcp packet - let (now, ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); + let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); - let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { trace!("{p:?}"); @@ -1640,10 +1936,7 @@ pub(crate) mod tests { let mut seen_rr = false; for _ in 0..=5 { - let (now, ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); - - let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); + let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { trace!("{p:?}"); @@ -1681,11 +1974,7 @@ pub(crate) mod tests { ); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); - // get the next rtcp packet - let (now, ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); - - let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); + let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { @@ -1845,9 +2134,7 @@ pub(crate) mod tests { ); // send initial rtcp - let (now, ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); - let _rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); + let (_rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let rtcp = Compound::builder().add_packet(Bye::builder().add_source(ssrc)); let mut data = vec![0; 128]; @@ -1880,9 +2167,7 @@ pub(crate) mod tests { assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); // send initial rtcp - let (now, ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); - let _rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); + let (_rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let source = session.mut_local_send_source_by_ssrc(ssrc).unwrap(); source.mark_bye("Cya"); @@ -1891,9 +2176,7 @@ pub(crate) mod tests { // data after bye should be dropped assert_eq!(session.handle_send(&packet, now), SendReply::Drop); - let (now, ntp_now) = - increment_rtcp_times(now, session.poll_rtcp_send_timeout(now).unwrap(), ntp_now); - let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); + let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let rtcp = Compound::parse(&rtcp_data).unwrap(); let mut received_bye = false; @@ -1914,4 +2197,156 @@ pub(crate) mod tests { } assert!(received_bye); } + + #[test] + fn early_rtcp() { + let mut session = Session::new(); + session.set_pt_clock_rate(TEST_PT, TEST_CLOCK_RATE); + session.set_profile(RtpProfile::Avpf); + let now = Instant::now(); + let ntp_now = SystemTime::now(); + let ssrc = 0x11223344; + let send_ssrc = 0x55667788; + let send2_ssrc = 0x99aabbcc; + + let rtp_data = generate_rtp_packet(ssrc, 500, 0, 4); + let packet = RtpPacket::parse(&rtp_data).unwrap(); + session_recv_first_packet_disable_probation(&mut session, &packet, now); + assert_eq!( + session.handle_recv(&packet, None, now), + RecvReply::Passthrough + ); + + let rtp_data = generate_rtp_packet(send_ssrc, 500, 0, 4); + let packet = RtpPacket::parse(&rtp_data).unwrap(); + assert_eq!( + session.handle_send(&packet, now), + SendReply::NewSsrc(send_ssrc, TEST_PT) + ); + assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); + + let rtp_data = generate_rtp_packet(send2_ssrc, 500, 0, 4); + let packet = RtpPacket::parse(&rtp_data).unwrap(); + assert_eq!( + session.handle_send(&packet, now), + SendReply::NewSsrc(send2_ssrc, TEST_PT) + ); + assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); + + // complete first regular rtcp + let (_rtcp_data, now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + + let source = session.mut_local_send_source_by_ssrc(send_ssrc).unwrap(); + source.set_sdes_item(SdesItem::NAME, b"name"); + + // request early rtcp. The resulting RTCP packet, will only have the CNAME sdes item and a + // single SR/RR. + session.request_early_rtcp(now, RTCP_MIN_REPORT_INTERVAL); + assert!(session.next_early_rtcp_time.is_some()); + + let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let rtcp = Compound::parse(&rtcp_data).unwrap(); + let mut n_sr_ssrc = 0; + let mut n_sdes = 0; + for p in rtcp { + trace!("{p:?}"); + match p { + Ok(Packet::Sr(sr)) => { + assert!([send_ssrc, send2_ssrc].contains(&sr.ssrc())); + n_sr_ssrc += 1; + } + Ok(Packet::Sdes(sdes)) => { + assert_eq!(n_sdes, 0); + for chunk in sdes.chunks() { + assert!([send_ssrc, send2_ssrc].contains(&chunk.ssrc())); + for item in chunk.items() { + assert_eq!(item.type_(), SdesItem::CNAME); + n_sdes += 1; + } + } + } + _ => unreachable!(), + } + } + assert_eq!(n_sdes, 2); + assert_eq!(n_sr_ssrc, 1); + } + + #[test] + fn point_to_point() { + let mut session = Session::new(); + assert!(session.is_point_to_point); + session.set_pt_clock_rate(TEST_PT, TEST_CLOCK_RATE); + let now = Instant::now(); + let ntp_now = SystemTime::now(); + let recv_ssrc = 0x11223344; + let recv2_ssrc = 0x55667788; + let from = "127.0.0.1:8080".parse().unwrap(); + + // single remote cname is still point to point + let mut data = vec![0; 128]; + let len = Compound::builder() + .add_packet( + Sdes::builder().add_chunk( + SdesChunk::builder(recv_ssrc) + .add_item(SdesItem::builder(SdesItem::CNAME, "cname1")), + ), + ) + .write_into(&mut data) + .unwrap(); + let rtcp = Compound::parse(&data[..len]).unwrap(); + assert_eq!( + session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now), + vec![RtcpRecvReply::NewSsrc(recv_ssrc)] + ); + assert!(session.is_point_to_point); + + // multiple ssrc but same cname is still point to point + let mut data = vec![0; 128]; + let len = Compound::builder() + .add_packet( + Sdes::builder().add_chunk( + SdesChunk::builder(recv_ssrc) + .add_item(SdesItem::builder(SdesItem::CNAME, "cname1")), + ), + ) + .add_packet( + Sdes::builder().add_chunk( + SdesChunk::builder(recv2_ssrc) + .add_item(SdesItem::builder(SdesItem::CNAME, "cname1")), + ), + ) + .write_into(&mut data) + .unwrap(); + let rtcp = Compound::parse(&data[..len]).unwrap(); + assert_eq!( + session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now), + vec![RtcpRecvReply::NewSsrc(recv2_ssrc)] + ); + assert!(session.is_point_to_point); + + // multiple ssrc and different cname is NOT point to point + let mut data = vec![0; 128]; + let len = Compound::builder() + .add_packet( + Sdes::builder().add_chunk( + SdesChunk::builder(recv_ssrc) + .add_item(SdesItem::builder(SdesItem::CNAME, "cname1")), + ), + ) + .add_packet( + Sdes::builder().add_chunk( + SdesChunk::builder(recv2_ssrc) + .add_item(SdesItem::builder(SdesItem::CNAME, "cname2")), + ), + ) + .write_into(&mut data) + .unwrap(); + let rtcp = Compound::parse(&data[..len]).unwrap(); + assert_eq!( + session.handle_rtcp_recv(rtcp, len, Some(from), now, ntp_now), + vec![] + ); + assert!(!session.is_point_to_point); + } } diff --git a/net/rtp/src/rtpbin2/source.rs b/net/rtp/src/rtpbin2/source.rs index 9f8c33ab..11d915c5 100644 --- a/net/rtp/src/rtpbin2/source.rs +++ b/net/rtp/src/rtpbin2/source.rs @@ -722,6 +722,11 @@ impl RemoteSendSource { } } + /// Retrieve the SDES items currently received for this remote sender + pub fn sdes(&self) -> &HashMap { + &self.source.sdes + } + pub(crate) fn set_last_activity(&mut self, time: Instant) { self.source.last_activity = time; } @@ -1004,6 +1009,11 @@ impl RemoteReceiveSource { } } + /// Retrieve the SDES items currently received for this remote receiver + pub fn sdes(&self) -> &HashMap { + &self.source.sdes + } + /// Retrieve the last time that activity was seen on this source pub fn last_activity(&self) -> Instant { self.source.last_activity