rtpbin2: Add handling for receiving NACK/PLI and FIR

Co-Authored-By: Matthew Waters <matthew@centricular.com>
Co-Authored-By: Mathieu Duponchelle <mathieu@centricular.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Sebastian Dröge 2023-12-30 17:00:19 +02:00 committed by Matthew Waters
parent 2c86f18a99
commit 66c9840ad8
5 changed files with 141 additions and 4 deletions

2
Cargo.lock generated
View file

@ -5831,7 +5831,7 @@ dependencies = [
[[package]] [[package]]
name = "rtcp-types" name = "rtcp-types"
version = "0.0.1" version = "0.0.1"
source = "git+https://github.com/ystreet/rtcp-types#f7fddfb87e9d7f4fed0b967fedc34995dd81ca86" source = "git+https://github.com/ystreet/rtcp-types#c1da8a1a193a0c02d798fea5f16863b69abd9000"
dependencies = [ dependencies = [
"thiserror", "thiserror",
] ]

View file

@ -64,4 +64,4 @@ versioning = false
import_library = false import_library = false
[package.metadata.capi.pkg_config] [package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gstreamer-net-1.0, gobject-2.0, glib-2.0, gmodule-2.0, gio-2.0" requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gstreamer-net-1.0, gstreamer-video-1.0 gobject-2.0, glib-2.0, gmodule-2.0, gio-2.0"

View file

@ -791,6 +791,9 @@ impl RtpBin2 {
let replies = session let replies = session
.session .session
.handle_rtcp_recv(rtcp, mapped.len(), addr, now, ntp_now); .handle_rtcp_recv(rtcp, mapped.len(), addr, now, ntp_now);
let rtp_send_sinkpad = session.rtp_send_sinkpad.clone();
drop(session);
for reply in replies { for reply in replies {
match reply { match reply {
RtcpRecvReply::NewSsrc(_ssrc) => (), // TODO: handle new ssrc RtcpRecvReply::NewSsrc(_ssrc) => (), // TODO: handle new ssrc
@ -801,6 +804,20 @@ impl RtpBin2 {
waker.wake_by_ref(); waker.wake_by_ref();
} }
} }
RtcpRecvReply::RequestKeyUnit { ssrcs, fir } => {
if let Some(ref rtp_send_sinkpad) = rtp_send_sinkpad {
gst::debug!(CAT, imp: self, "Sending force-keyunit event for ssrcs {ssrcs:?} (all headers: {fir})");
// TODO what to do with the ssrc?
let event = gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(fir)
.other_field("ssrcs", &gst::Array::new(ssrcs))
.build();
let _ = rtp_send_sinkpad.push_event(event);
} else {
gst::debug!(CAT, imp: self, "Can't send force-keyunit event because of missing sinkpad");
}
}
} }
} }
drop(mapped); drop(mapped);

View file

@ -129,7 +129,7 @@ pub enum SendReply {
SsrcCollision(u32), SsrcCollision(u32),
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum RtcpRecvReply { pub enum RtcpRecvReply {
/// A new ssrc was discovered. If you want to change things about the new ssrc, then do it now /// A new ssrc was discovered. If you want to change things about the new ssrc, then do it now
/// before pushing the buffer again /// before pushing the buffer again
@ -138,6 +138,8 @@ pub enum RtcpRecvReply {
SsrcCollision(u32), SsrcCollision(u32),
/// RTCP timer needs to be reconsidered. Call poll_rtcp_send_timeout() to get the new time /// RTCP timer needs to be reconsidered. Call poll_rtcp_send_timeout() to get the new time
TimerReconsideration, TimerReconsideration,
/// Request a key unit for the given SSRC of ours
RequestKeyUnit { ssrcs: Vec<u32>, fir: bool },
} }
impl Session { impl Session {
@ -434,6 +436,7 @@ impl Session {
ntp_time: SystemTime, ntp_time: SystemTime,
) -> Option<RtcpRecvReply> { ) -> Option<RtcpRecvReply> {
let mut ret = None; let mut ret = None;
if let Some(source) = self.local_senders.get_mut(&rb.ssrc()) { if let Some(source) = self.local_senders.get_mut(&rb.ssrc()) {
source.add_last_rb(sender_ssrc, rb, now, ntp_time); source.add_last_rb(sender_ssrc, rb, now, ntp_time);
source.set_last_activity(now); source.set_last_activity(now);
@ -452,6 +455,7 @@ impl Session {
source.set_last_activity(now); source.set_last_activity(now);
source.add_last_rb(sender_ssrc, rb, now, ntp_time); source.add_last_rb(sender_ssrc, rb, now, ntp_time);
} }
ret ret
} }
@ -655,7 +659,27 @@ impl Session {
} }
} }
} }
Ok(Packet::Unknown(_unk)) => (), Ok(Packet::PayloadFeedback(pf)) => {
if let Ok(_pli) = pf.parse_fci::<rtcp_types::Pli>() {
self.handle_remote_request_key_unit(
now,
&mut replies,
false,
pf.sender_ssrc(),
std::iter::once(pf.media_ssrc()),
);
} else if let Ok(fir) = pf.parse_fci::<rtcp_types::Fir>() {
self.handle_remote_request_key_unit(
now,
&mut replies,
true,
pf.sender_ssrc(),
// TODO: What to do with the sequence?
fir.entries().map(|entry| entry.ssrc()),
);
}
}
Ok(Packet::TransportFeedback(_)) | Ok(Packet::Unknown(_)) => (),
// TODO: in RFC4585 profile, need to listen for feedback messages and remove any // TODO: in RFC4585 profile, need to listen for feedback messages and remove any
// that we would have sent // that we would have sent
Err(_) => (), Err(_) => (),
@ -665,6 +689,56 @@ impl Session {
replies replies
} }
fn handle_remote_request_key_unit(
&mut self,
now: Instant,
replies: &mut Vec<RtcpRecvReply>,
fir: bool,
sender_ssrc: u32,
media_ssrcs: impl Iterator<Item = u32>,
) {
if !self.remote_receivers.contains_key(&sender_ssrc)
&& !self.remote_senders.contains_key(&sender_ssrc)
{
trace!("No remote source known for sender ssrc {sender_ssrc}");
}
let mut ssrcs = Vec::new();
for media_ssrc in media_ssrcs {
let Some(sender) = self.local_senders.get(&media_ssrc) else {
trace!("Not a local sender for ssrc {media_ssrc}");
continue;
};
let Some(rb) = sender
.received_report_blocks()
.find(|(ssrc, _rb)| *ssrc == sender_ssrc)
else {
trace!("No RB for sender ssrc {sender_ssrc} yet");
continue;
};
if let Some(sender) = self.remote_senders.get_mut(&sender_ssrc) {
if !sender.remote_request_key_unit_allowed(now, rb.1) {
trace!("Requesting key-unit not allowed again yet");
continue;
}
} else if let Some(sender) = self.remote_receivers.get_mut(&sender_ssrc) {
if !sender.remote_request_key_unit_allowed(now, rb.1) {
trace!("Requesting key-unit not allowed again yet");
continue;
}
}
trace!("Requesting key-unit from sender ssrc {sender_ssrc} for media ssrc {media_ssrc} (fir: {fir})");
ssrcs.push(media_ssrc);
}
if !ssrcs.is_empty() {
replies.push(RtcpRecvReply::RequestKeyUnit { ssrcs, fir });
}
}
fn generate_sr<'a>( fn generate_sr<'a>(
&mut self, &mut self,
mut rtcp: CompoundBuilder<'a>, mut rtcp: CompoundBuilder<'a>,

View file

@ -427,6 +427,7 @@ pub struct RemoteSendSource {
bitrate: Bitrate, bitrate: Bitrate,
last_sent_rb: Option<Rb>, last_sent_rb: Option<Rb>,
last_received_rb: HashMap<u32, ReceivedRb>, last_received_rb: HashMap<u32, ReceivedRb>,
last_request_key_unit: HashMap<u32, Instant>,
} }
// The first time we recev a packet for jitter calculations // The first time we recev a packet for jitter calculations
@ -452,6 +453,7 @@ impl RemoteSendSource {
bitrate: Bitrate::new(BITRATE_WINDOW), bitrate: Bitrate::new(BITRATE_WINDOW),
last_sent_rb: None, last_sent_rb: None,
last_received_rb: HashMap::new(), last_received_rb: HashMap::new(),
last_request_key_unit: HashMap::new(),
} }
} }
@ -886,8 +888,29 @@ impl RemoteSendSource {
RemoteReceiveSource { RemoteReceiveSource {
source: self.source, source: self.source,
rtcp_from: self.rtcp_from, rtcp_from: self.rtcp_from,
last_request_key_unit: self.last_request_key_unit,
} }
} }
pub(crate) fn remote_request_key_unit_allowed(
&mut self,
now: Instant,
rb: &ReceivedRb,
) -> bool {
let rtt = rb.round_trip_time();
// Allow up to one key-unit request per RTT and SSRC.
let mut allowed = false;
self.last_request_key_unit
.entry(rb.rb.ssrc)
.and_modify(|previous| {
allowed = now.duration_since(*previous) >= rtt;
*previous = now;
})
.or_insert_with(|| now);
allowed
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -969,6 +992,7 @@ impl LocalReceiveSource {
pub struct RemoteReceiveSource { pub struct RemoteReceiveSource {
source: Source, source: Source,
rtcp_from: Option<SocketAddr>, rtcp_from: Option<SocketAddr>,
last_request_key_unit: HashMap<u32, Instant>,
} }
impl RemoteReceiveSource { impl RemoteReceiveSource {
@ -976,6 +1000,7 @@ impl RemoteReceiveSource {
Self { Self {
source: Source::new(ssrc), source: Source::new(ssrc),
rtcp_from: None, rtcp_from: None,
last_request_key_unit: HashMap::new(),
} }
} }
@ -1038,8 +1063,29 @@ impl RemoteReceiveSource {
bitrate: Bitrate::new(BITRATE_WINDOW), bitrate: Bitrate::new(BITRATE_WINDOW),
last_sent_rb: None, last_sent_rb: None,
last_received_rb: HashMap::new(), last_received_rb: HashMap::new(),
last_request_key_unit: self.last_request_key_unit,
} }
} }
pub(crate) fn remote_request_key_unit_allowed(
&mut self,
now: Instant,
rb: &ReceivedRb,
) -> bool {
let rtt = rb.round_trip_time();
// Allow up to one key-unit request per RTT.
let mut allowed = false;
self.last_request_key_unit
.entry(rb.rb.ssrc)
.and_modify(|previous| {
allowed = now.duration_since(*previous) >= rtt;
*previous = now;
})
.or_insert_with(|| now);
allowed
}
} }
#[derive(Debug)] #[derive(Debug)]