From 1e4a966c92429e99582de338602e742094d32aa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 2 Jan 2024 11:06:46 +0200 Subject: [PATCH] rtpbin2: Add support for sending NACK/PLI and FIR Co-Authored-By: Matthew Waters Part-of: --- net/rtp/src/rtpbin2/imp.rs | 70 ++++++++++++++++++++++++++- net/rtp/src/rtpbin2/session.rs | 87 ++++++++++++++++++++++++++++++++++ net/rtp/src/rtpbin2/source.rs | 64 ++++++++++++++++++++++++- 3 files changed, 219 insertions(+), 2 deletions(-) diff --git a/net/rtp/src/rtpbin2/imp.rs b/net/rtp/src/rtpbin2/imp.rs index f1eaea77..90aee1c2 100644 --- a/net/rtp/src/rtpbin2/imp.rs +++ b/net/rtp/src/rtpbin2/imp.rs @@ -13,7 +13,8 @@ use gst::{glib, prelude::*, subclass::prelude::*}; use once_cell::sync::Lazy; use super::session::{ - RecvReply, RtcpRecvReply, RtpProfile, SendReply, Session, RTCP_MIN_REPORT_INTERVAL, + KeyUnitRequestType, RecvReply, RequestRemoteKeyUnitReply, RtcpRecvReply, RtpProfile, SendReply, + Session, RTCP_MIN_REPORT_INTERVAL, }; use super::source::{ReceivedRb, SourceState}; @@ -245,6 +246,7 @@ impl BinSessionInner { (pad.pad.clone(), false) } else { let src_templ = rtpbin.obj().pad_template("rtp_recv_src_%u_%u_%u").unwrap(); + let id = self.id; let srcpad = gst::Pad::builder_from_template(&src_templ) .iterate_internal_links_function(|pad, parent| { RtpBin2::catch_panic_pad_function( @@ -260,6 +262,13 @@ impl BinSessionInner { |this| this.src_query(pad, query), ) }) + .event_function(move |pad, parent, event| { + RtpBin2::catch_panic_pad_function( + parent, + || false, + |this| this.rtp_recv_src_event(pad, event, id, pt, ssrc), + ) + }) .name(format!("rtp_recv_src_{}_{}_{}", self.id, pt, ssrc)) .build(); srcpad.set_active(true).unwrap(); @@ -953,6 +962,65 @@ impl RtpBin2 { None } } + + fn rtp_recv_src_event( + &self, + pad: &gst::Pad, + event: gst::Event, + id: usize, + pt: u8, + ssrc: u32, + ) -> bool { + match event.view() { + gst::EventView::CustomUpstream(custom) => { + if let Ok(fku) = gst_video::UpstreamForceKeyUnitEvent::parse(custom) { + let all_headers = fku.all_headers; + let count = fku.count; + + let state = self.state.lock().unwrap(); + if let Some(session) = state.session_by_id(id) { + let now = Instant::now(); + let mut session = session.inner.lock().unwrap(); + let caps = session.caps_from_pt_ssrc(pt, ssrc); + let s = caps.structure(0).unwrap(); + + let pli = s.has_field("rtcp-fb-nack-pli"); + let fir = s.has_field("rtcp-fb-ccm-fir") && all_headers; + + let typ = if fir { + KeyUnitRequestType::Fir(count) + } else { + KeyUnitRequestType::Pli + }; + + if pli || fir { + let replies = session.session.request_remote_key_unit(now, typ, ssrc); + + let waker = state.rtcp_waker.clone(); + drop(session); + drop(state); + + for reply in replies { + match reply { + RequestRemoteKeyUnitReply::TimerReconsideration => { + if let Some(ref waker) = waker { + // reconsider timers means that we wake the rtcp task to get a new timeout + waker.wake_by_ref(); + } + } + } + } + } + } + + // Don't forward + return true; + } + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + } + } } #[glib::object_subclass] diff --git a/net/rtp/src/rtpbin2/session.rs b/net/rtp/src/rtpbin2/session.rs index 5be1ff35..3b7b787c 100644 --- a/net/rtp/src/rtpbin2/session.rs +++ b/net/rtp/src/rtpbin2/session.rs @@ -48,6 +48,12 @@ pub enum RtpProfile { Avpf, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum KeyUnitRequestType { + Pli, + Fir(u32), +} + impl RtpProfile { fn is_feedback(&self) -> bool { self == &Self::Avpf @@ -142,6 +148,12 @@ pub enum RtcpRecvReply { RequestKeyUnit { ssrcs: Vec, fir: bool }, } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum RequestRemoteKeyUnitReply { + /// RTCP timer needs to be reconsidered. Call poll_rtcp_send_timeout() to get the new time + TimerReconsideration, +} + impl Session { pub fn new() -> Self { let cname = generate_cname(); @@ -1003,6 +1015,50 @@ impl Session { rtcp } + fn generate_pli<'a>( + &mut self, + mut rtcp: CompoundBuilder<'a>, + _now: Instant, + ) -> CompoundBuilder<'a> { + let ssrc = self.ensure_internal_send_src(); + + for source in self.remote_senders.values_mut() { + let pli = source.generate_pli(); + if let Some(pli) = pli { + debug!("Generating PLI for sender {}: {:?}", source.ssrc(), pli); + rtcp = rtcp.add_packet( + rtcp_types::PayloadFeedback::builder_owned(pli) + .sender_ssrc(ssrc) + .media_ssrc(source.ssrc()), + ); + } + } + rtcp + } + + fn generate_fir<'a>( + &mut self, + mut rtcp: CompoundBuilder<'a>, + _now: Instant, + ) -> CompoundBuilder<'a> { + let mut have_fir = false; + let mut fir = rtcp_types::Fir::builder(); + + for source in self.remote_senders.values_mut() { + fir = source.generate_fir(fir, &mut have_fir); + } + + if have_fir { + let ssrc = self.ensure_internal_send_src(); + + debug!("Generating FIR: {:?}", fir); + rtcp = + rtcp.add_packet(rtcp_types::PayloadFeedback::builder_owned(fir).sender_ssrc(ssrc)); + } + + rtcp + } + // RFC 3550 6.3.5 fn handle_timeouts(&mut self, now: Instant) { trace!("handling rtcp timeouts"); @@ -1110,6 +1166,8 @@ impl Session { rtcp = self.generate_sr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported); rtcp = self.generate_rr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported); rtcp = self.generate_sdes(rtcp, is_early); + rtcp = self.generate_pli(rtcp, now); + rtcp = self.generate_fir(rtcp, now); rtcp = self.generate_bye(rtcp, now); let size = rtcp.calculate_size().unwrap(); @@ -1429,6 +1487,35 @@ impl Session { fn mut_remote_sender_source_by_ssrc(&mut self, ssrc: u32) -> Option<&mut RemoteSendSource> { self.remote_senders.get_mut(&ssrc) } + + pub(crate) fn request_remote_key_unit( + &mut self, + now: Instant, + typ: KeyUnitRequestType, + ssrc: u32, + ) -> Vec { + let mut replies = Vec::new(); + + if !self.remote_senders.contains_key(&ssrc) { + trace!("No remote sender with ssrc {ssrc} known"); + return replies; + }; + + debug!("Requesting remote key-unit for ssrc {ssrc} of type {typ:?}"); + + // FIXME: Use hard-coded 5s interval here + let res = self.request_early_rtcp(now, Duration::from_secs(5)); + if res == RequestEarlyRtcpResult::TimerReconsideration { + replies.push(RequestRemoteKeyUnitReply::TimerReconsideration); + } + + if res != RequestEarlyRtcpResult::NotScheduled { + let source = self.remote_senders.get_mut(&ssrc).unwrap(); + source.request_remote_key_unit(now, typ); + } + + replies + } } fn generate_cname() -> String { diff --git a/net/rtp/src/rtpbin2/source.rs b/net/rtp/src/rtpbin2/source.rs index b5f01a4c..b73df13a 100644 --- a/net/rtp/src/rtpbin2/source.rs +++ b/net/rtp/src/rtpbin2/source.rs @@ -8,7 +8,10 @@ use std::{ use rtcp_types::{ReportBlock, ReportBlockBuilder}; -use super::time::{system_time_to_ntp_time_u64, NtpTime}; +use super::{ + session::KeyUnitRequestType, + time::{system_time_to_ntp_time_u64, NtpTime}, +}; use gst::prelude::MulDiv; @@ -428,6 +431,15 @@ pub struct RemoteSendSource { last_sent_rb: Option, last_received_rb: HashMap, last_request_key_unit: HashMap, + + // If a NACK/PLI is pending with the next RTCP packet + send_pli: bool, + // If a FIR is pending with the next RTCP packet + send_fir: bool, + // Sequence number of the next FIR + send_fir_seqnum: u8, + // Count from the ForceKeyUnitEvent to de-duplicate FIR + send_fir_count: Option, } // The first time we recev a packet for jitter calculations @@ -454,6 +466,10 @@ impl RemoteSendSource { last_sent_rb: None, last_received_rb: HashMap::new(), last_request_key_unit: HashMap::new(), + send_pli: false, + send_fir: false, + send_fir_seqnum: 0, + send_fir_count: None, } } @@ -911,6 +927,48 @@ impl RemoteSendSource { allowed } + + pub(crate) fn request_remote_key_unit(&mut self, _now: Instant, typ: KeyUnitRequestType) { + match typ { + KeyUnitRequestType::Fir(count) => { + if self + .send_fir_count + .map_or(true, |previous_count| previous_count != count) + { + self.send_fir_seqnum = self.send_fir_seqnum.wrapping_add(1); + } + self.send_fir = true; + self.send_fir_count = Some(count); + } + KeyUnitRequestType::Pli if !self.send_fir => { + self.send_pli = true; + } + _ => {} + } + } + + pub(crate) fn generate_pli(&mut self) -> Option { + if self.send_pli { + self.send_pli = false; + Some(rtcp_types::Pli::builder()) + } else { + None + } + } + + pub(crate) fn generate_fir( + &mut self, + fir: rtcp_types::FirBuilder, + added: &mut bool, + ) -> rtcp_types::FirBuilder { + if self.send_fir { + self.send_fir = false; + *added = true; + fir.add_ssrc(self.ssrc(), self.send_fir_seqnum) + } else { + fir + } + } } #[derive(Debug)] @@ -1064,6 +1122,10 @@ impl RemoteReceiveSource { last_sent_rb: None, last_received_rb: HashMap::new(), last_request_key_unit: self.last_request_key_unit, + send_pli: false, + send_fir: false, + send_fir_seqnum: 0, + send_fir_count: None, } }