rtpbin2: Add support for sending NACK/PLI and FIR

Co-Authored-By: Matthew Waters <matthew@centricular.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Sebastian Dröge 2024-01-02 11:06:46 +02:00 committed by Matthew Waters
parent 66c9840ad8
commit 1e4a966c92
3 changed files with 219 additions and 2 deletions

View file

@ -13,7 +13,8 @@ use gst::{glib, prelude::*, subclass::prelude::*};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use super::session::{ use super::session::{
RecvReply, RtcpRecvReply, RtpProfile, SendReply, Session, RTCP_MIN_REPORT_INTERVAL, KeyUnitRequestType, RecvReply, RequestRemoteKeyUnitReply, RtcpRecvReply, RtpProfile, SendReply,
Session, RTCP_MIN_REPORT_INTERVAL,
}; };
use super::source::{ReceivedRb, SourceState}; use super::source::{ReceivedRb, SourceState};
@ -245,6 +246,7 @@ impl BinSessionInner {
(pad.pad.clone(), false) (pad.pad.clone(), false)
} else { } else {
let src_templ = rtpbin.obj().pad_template("rtp_recv_src_%u_%u_%u").unwrap(); let src_templ = rtpbin.obj().pad_template("rtp_recv_src_%u_%u_%u").unwrap();
let id = self.id;
let srcpad = gst::Pad::builder_from_template(&src_templ) let srcpad = gst::Pad::builder_from_template(&src_templ)
.iterate_internal_links_function(|pad, parent| { .iterate_internal_links_function(|pad, parent| {
RtpBin2::catch_panic_pad_function( RtpBin2::catch_panic_pad_function(
@ -260,6 +262,13 @@ impl BinSessionInner {
|this| this.src_query(pad, query), |this| this.src_query(pad, query),
) )
}) })
.event_function(move |pad, parent, event| {
RtpBin2::catch_panic_pad_function(
parent,
|| false,
|this| this.rtp_recv_src_event(pad, event, id, pt, ssrc),
)
})
.name(format!("rtp_recv_src_{}_{}_{}", self.id, pt, ssrc)) .name(format!("rtp_recv_src_{}_{}_{}", self.id, pt, ssrc))
.build(); .build();
srcpad.set_active(true).unwrap(); srcpad.set_active(true).unwrap();
@ -953,6 +962,65 @@ impl RtpBin2 {
None None
} }
} }
fn rtp_recv_src_event(
&self,
pad: &gst::Pad,
event: gst::Event,
id: usize,
pt: u8,
ssrc: u32,
) -> bool {
match event.view() {
gst::EventView::CustomUpstream(custom) => {
if let Ok(fku) = gst_video::UpstreamForceKeyUnitEvent::parse(custom) {
let all_headers = fku.all_headers;
let count = fku.count;
let state = self.state.lock().unwrap();
if let Some(session) = state.session_by_id(id) {
let now = Instant::now();
let mut session = session.inner.lock().unwrap();
let caps = session.caps_from_pt_ssrc(pt, ssrc);
let s = caps.structure(0).unwrap();
let pli = s.has_field("rtcp-fb-nack-pli");
let fir = s.has_field("rtcp-fb-ccm-fir") && all_headers;
let typ = if fir {
KeyUnitRequestType::Fir(count)
} else {
KeyUnitRequestType::Pli
};
if pli || fir {
let replies = session.session.request_remote_key_unit(now, typ, ssrc);
let waker = state.rtcp_waker.clone();
drop(session);
drop(state);
for reply in replies {
match reply {
RequestRemoteKeyUnitReply::TimerReconsideration => {
if let Some(ref waker) = waker {
// reconsider timers means that we wake the rtcp task to get a new timeout
waker.wake_by_ref();
}
}
}
}
}
}
// Don't forward
return true;
}
gst::Pad::event_default(pad, Some(&*self.obj()), event)
}
_ => gst::Pad::event_default(pad, Some(&*self.obj()), event),
}
}
} }
#[glib::object_subclass] #[glib::object_subclass]

View file

@ -48,6 +48,12 @@ pub enum RtpProfile {
Avpf, Avpf,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum KeyUnitRequestType {
Pli,
Fir(u32),
}
impl RtpProfile { impl RtpProfile {
fn is_feedback(&self) -> bool { fn is_feedback(&self) -> bool {
self == &Self::Avpf self == &Self::Avpf
@ -142,6 +148,12 @@ pub enum RtcpRecvReply {
RequestKeyUnit { ssrcs: Vec<u32>, fir: bool }, RequestKeyUnit { ssrcs: Vec<u32>, fir: bool },
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum RequestRemoteKeyUnitReply {
/// RTCP timer needs to be reconsidered. Call poll_rtcp_send_timeout() to get the new time
TimerReconsideration,
}
impl Session { impl Session {
pub fn new() -> Self { pub fn new() -> Self {
let cname = generate_cname(); let cname = generate_cname();
@ -1003,6 +1015,50 @@ impl Session {
rtcp rtcp
} }
fn generate_pli<'a>(
&mut self,
mut rtcp: CompoundBuilder<'a>,
_now: Instant,
) -> CompoundBuilder<'a> {
let ssrc = self.ensure_internal_send_src();
for source in self.remote_senders.values_mut() {
let pli = source.generate_pli();
if let Some(pli) = pli {
debug!("Generating PLI for sender {}: {:?}", source.ssrc(), pli);
rtcp = rtcp.add_packet(
rtcp_types::PayloadFeedback::builder_owned(pli)
.sender_ssrc(ssrc)
.media_ssrc(source.ssrc()),
);
}
}
rtcp
}
fn generate_fir<'a>(
&mut self,
mut rtcp: CompoundBuilder<'a>,
_now: Instant,
) -> CompoundBuilder<'a> {
let mut have_fir = false;
let mut fir = rtcp_types::Fir::builder();
for source in self.remote_senders.values_mut() {
fir = source.generate_fir(fir, &mut have_fir);
}
if have_fir {
let ssrc = self.ensure_internal_send_src();
debug!("Generating FIR: {:?}", fir);
rtcp =
rtcp.add_packet(rtcp_types::PayloadFeedback::builder_owned(fir).sender_ssrc(ssrc));
}
rtcp
}
// RFC 3550 6.3.5 // RFC 3550 6.3.5
fn handle_timeouts(&mut self, now: Instant) { fn handle_timeouts(&mut self, now: Instant) {
trace!("handling rtcp timeouts"); trace!("handling rtcp timeouts");
@ -1110,6 +1166,8 @@ impl Session {
rtcp = self.generate_sr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported); rtcp = self.generate_sr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported);
rtcp = self.generate_rr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported); rtcp = self.generate_rr(rtcp, now, ntp_now, is_early, &mut ssrcs_reported);
rtcp = self.generate_sdes(rtcp, is_early); rtcp = self.generate_sdes(rtcp, is_early);
rtcp = self.generate_pli(rtcp, now);
rtcp = self.generate_fir(rtcp, now);
rtcp = self.generate_bye(rtcp, now); rtcp = self.generate_bye(rtcp, now);
let size = rtcp.calculate_size().unwrap(); let size = rtcp.calculate_size().unwrap();
@ -1429,6 +1487,35 @@ impl Session {
fn mut_remote_sender_source_by_ssrc(&mut self, ssrc: u32) -> Option<&mut RemoteSendSource> { fn mut_remote_sender_source_by_ssrc(&mut self, ssrc: u32) -> Option<&mut RemoteSendSource> {
self.remote_senders.get_mut(&ssrc) self.remote_senders.get_mut(&ssrc)
} }
pub(crate) fn request_remote_key_unit(
&mut self,
now: Instant,
typ: KeyUnitRequestType,
ssrc: u32,
) -> Vec<RequestRemoteKeyUnitReply> {
let mut replies = Vec::new();
if !self.remote_senders.contains_key(&ssrc) {
trace!("No remote sender with ssrc {ssrc} known");
return replies;
};
debug!("Requesting remote key-unit for ssrc {ssrc} of type {typ:?}");
// FIXME: Use hard-coded 5s interval here
let res = self.request_early_rtcp(now, Duration::from_secs(5));
if res == RequestEarlyRtcpResult::TimerReconsideration {
replies.push(RequestRemoteKeyUnitReply::TimerReconsideration);
}
if res != RequestEarlyRtcpResult::NotScheduled {
let source = self.remote_senders.get_mut(&ssrc).unwrap();
source.request_remote_key_unit(now, typ);
}
replies
}
} }
fn generate_cname() -> String { fn generate_cname() -> String {

View file

@ -8,7 +8,10 @@ use std::{
use rtcp_types::{ReportBlock, ReportBlockBuilder}; use rtcp_types::{ReportBlock, ReportBlockBuilder};
use super::time::{system_time_to_ntp_time_u64, NtpTime}; use super::{
session::KeyUnitRequestType,
time::{system_time_to_ntp_time_u64, NtpTime},
};
use gst::prelude::MulDiv; use gst::prelude::MulDiv;
@ -428,6 +431,15 @@ pub struct RemoteSendSource {
last_sent_rb: Option<Rb>, last_sent_rb: Option<Rb>,
last_received_rb: HashMap<u32, ReceivedRb>, last_received_rb: HashMap<u32, ReceivedRb>,
last_request_key_unit: HashMap<u32, Instant>, last_request_key_unit: HashMap<u32, Instant>,
// If a NACK/PLI is pending with the next RTCP packet
send_pli: bool,
// If a FIR is pending with the next RTCP packet
send_fir: bool,
// Sequence number of the next FIR
send_fir_seqnum: u8,
// Count from the ForceKeyUnitEvent to de-duplicate FIR
send_fir_count: Option<u32>,
} }
// The first time we recev a packet for jitter calculations // The first time we recev a packet for jitter calculations
@ -454,6 +466,10 @@ impl RemoteSendSource {
last_sent_rb: None, last_sent_rb: None,
last_received_rb: HashMap::new(), last_received_rb: HashMap::new(),
last_request_key_unit: HashMap::new(), last_request_key_unit: HashMap::new(),
send_pli: false,
send_fir: false,
send_fir_seqnum: 0,
send_fir_count: None,
} }
} }
@ -911,6 +927,48 @@ impl RemoteSendSource {
allowed allowed
} }
pub(crate) fn request_remote_key_unit(&mut self, _now: Instant, typ: KeyUnitRequestType) {
match typ {
KeyUnitRequestType::Fir(count) => {
if self
.send_fir_count
.map_or(true, |previous_count| previous_count != count)
{
self.send_fir_seqnum = self.send_fir_seqnum.wrapping_add(1);
}
self.send_fir = true;
self.send_fir_count = Some(count);
}
KeyUnitRequestType::Pli if !self.send_fir => {
self.send_pli = true;
}
_ => {}
}
}
pub(crate) fn generate_pli(&mut self) -> Option<rtcp_types::PliBuilder> {
if self.send_pli {
self.send_pli = false;
Some(rtcp_types::Pli::builder())
} else {
None
}
}
pub(crate) fn generate_fir(
&mut self,
fir: rtcp_types::FirBuilder,
added: &mut bool,
) -> rtcp_types::FirBuilder {
if self.send_fir {
self.send_fir = false;
*added = true;
fir.add_ssrc(self.ssrc(), self.send_fir_seqnum)
} else {
fir
}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -1064,6 +1122,10 @@ impl RemoteReceiveSource {
last_sent_rb: None, last_sent_rb: None,
last_received_rb: HashMap::new(), last_received_rb: HashMap::new(),
last_request_key_unit: self.last_request_key_unit, last_request_key_unit: self.last_request_key_unit,
send_pli: false,
send_fir: false,
send_fir_seqnum: 0,
send_fir_count: None,
} }
} }