rtpbin2: Add handling for receiving NACK/PLI and FIR

Co-Authored-By: Matthew Waters <matthew@centricular.com>
Co-Authored-By: Mathieu Duponchelle <mathieu@centricular.com>
This commit is contained in:
Sebastian Dröge 2023-12-30 17:00:19 +02:00 committed by Matthew Waters
parent 1c04618f8d
commit 9a9a30bf10
5 changed files with 147 additions and 8 deletions

3
Cargo.lock generated
View file

@ -2652,6 +2652,7 @@ dependencies = [
"gstreamer-check",
"gstreamer-net",
"gstreamer-rtp",
"gstreamer-video",
"log",
"once_cell",
"rand",
@ -5407,7 +5408,7 @@ dependencies = [
[[package]]
name = "rtcp-types"
version = "0.0.1"
source = "git+https://github.com/ystreet/rtcp-types#f7fddfb87e9d7f4fed0b967fedc34995dd81ca86"
source = "git+https://github.com/ystreet/rtcp-types#c1da8a1a193a0c02d798fea5f16863b69abd9000"
dependencies = [
"thiserror",
]

View file

@ -14,10 +14,11 @@ bitstream-io = "2.1"
chrono = { version = "0.4", default-features = false }
futures = "0.3"
gio.workspace = true
gst = { workspace = true, features = ["v1_20"] }
gst-base = { workspace = true, features = ["v1_20"] }
gst-net = { workspace = true, features = ["v1_20"] }
gst-rtp = { workspace = true, features = ["v1_20"] }
gst = { workspace = true, features = ["v1_20"] }
gst-base = { workspace = true, features = ["v1_20"] }
gst-net = { workspace = true, features = ["v1_20"] }
gst-rtp = { workspace = true, features = ["v1_20"] }
gst-video = { workspace = true, features = ["v1_20"] }
log = "0.4"
once_cell.workspace = true
rand = { version = "0.8", default-features = false, features = ["std", "std_rng" ] }
@ -56,4 +57,4 @@ versioning = false
import_library = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gstreamer-net-1.0, gobject-2.0, glib-2.0, gmodule-2.0, gio-2.0"
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gstreamer-net-1.0, gstreamer-video-1.0 gobject-2.0, glib-2.0, gmodule-2.0, gio-2.0"

View file

@ -791,6 +791,9 @@ impl RtpBin2 {
let replies = session
.session
.handle_rtcp_recv(rtcp, mapped.len(), addr, now, ntp_now);
let rtp_send_sinkpad = session.rtp_send_sinkpad.clone();
drop(session);
for reply in replies {
match reply {
RtcpRecvReply::NewSsrc(_ssrc) => (), // TODO: handle new ssrc
@ -801,6 +804,20 @@ impl RtpBin2 {
waker.wake_by_ref();
}
}
RtcpRecvReply::RequestKeyUnit { ssrcs, fir } => {
if let Some(ref rtp_send_sinkpad) = rtp_send_sinkpad {
gst::debug!(CAT, imp: self, "Sending force-keyunit event for ssrcs {ssrcs:?} (all headers: {fir})");
// TODO what to do with the ssrc?
let event = gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(fir)
.other_field("ssrcs", &gst::Array::new(ssrcs))
.build();
let _ = rtp_send_sinkpad.push_event(event);
} else {
gst::debug!(CAT, imp: self, "Can't send force-keyunit event because of missing sinkpad");
}
}
}
}
drop(mapped);

View file

@ -129,7 +129,7 @@ pub enum SendReply {
SsrcCollision(u32),
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RtcpRecvReply {
/// A new ssrc was discovered. If you want to change things about the new ssrc, then do it now
/// before pushing the buffer again
@ -138,6 +138,8 @@ pub enum RtcpRecvReply {
SsrcCollision(u32),
/// RTCP timer needs to be reconsidered. Call poll_rtcp_send_timeout() to get the new time
TimerReconsideration,
/// Request a key unit for the given SSRC of ours
RequestKeyUnit { ssrcs: Vec<u32>, fir: bool },
}
impl Session {
@ -434,6 +436,7 @@ impl Session {
ntp_time: SystemTime,
) -> Option<RtcpRecvReply> {
let mut ret = None;
if let Some(source) = self.local_senders.get_mut(&rb.ssrc()) {
source.add_last_rb(sender_ssrc, rb, now, ntp_time);
source.set_last_activity(now);
@ -452,6 +455,7 @@ impl Session {
source.set_last_activity(now);
source.add_last_rb(sender_ssrc, rb, now, ntp_time);
}
ret
}
@ -655,7 +659,27 @@ impl Session {
}
}
}
Ok(Packet::Unknown(_unk)) => (),
Ok(Packet::PayloadFeedback(pf)) => {
if let Ok(_pli) = pf.parse_fci::<rtcp_types::Pli>() {
self.handle_remote_request_key_unit(
now,
&mut replies,
false,
pf.sender_ssrc(),
std::iter::once(pf.media_ssrc()),
);
} else if let Ok(fir) = pf.parse_fci::<rtcp_types::Fir>() {
self.handle_remote_request_key_unit(
now,
&mut replies,
true,
pf.sender_ssrc(),
// TODO: What to do with the sequence?
fir.entries().map(|entry| entry.ssrc()),
);
}
}
Ok(Packet::TransportFeedback(_)) | Ok(Packet::Unknown(_)) => (),
// TODO: in RFC4585 profile, need to listen for feedback messages and remove any
// that we would have sent
Err(_) => (),
@ -665,6 +689,56 @@ impl Session {
replies
}
fn handle_remote_request_key_unit(
&mut self,
now: Instant,
replies: &mut Vec<RtcpRecvReply>,
fir: bool,
sender_ssrc: u32,
media_ssrcs: impl Iterator<Item = u32>,
) {
if !self.remote_receivers.contains_key(&sender_ssrc)
&& !self.remote_senders.contains_key(&sender_ssrc)
{
trace!("No remote source known for sender ssrc {sender_ssrc}");
}
let mut ssrcs = Vec::new();
for media_ssrc in media_ssrcs {
let Some(sender) = self.local_senders.get(&media_ssrc) else {
trace!("Not a local sender for ssrc {media_ssrc}");
continue;
};
let Some(rb) = sender
.received_report_blocks()
.find(|(ssrc, _rb)| *ssrc == sender_ssrc)
else {
trace!("No RB for sender ssrc {sender_ssrc} yet");
continue;
};
if let Some(sender) = self.remote_senders.get_mut(&sender_ssrc) {
if !sender.remote_request_key_unit_allowed(now, rb.1) {
trace!("Requesting key-unit not allowed again yet");
continue;
}
} else if let Some(sender) = self.remote_receivers.get_mut(&sender_ssrc) {
if !sender.remote_request_key_unit_allowed(now, rb.1) {
trace!("Requesting key-unit not allowed again yet");
continue;
}
}
trace!("Requesting key-unit from sender ssrc {sender_ssrc} for media ssrc {media_ssrc} (fir: {fir})");
ssrcs.push(media_ssrc);
}
if !ssrcs.is_empty() {
replies.push(RtcpRecvReply::RequestKeyUnit { ssrcs, fir });
}
}
fn generate_sr<'a>(
&mut self,
mut rtcp: CompoundBuilder<'a>,

View file

@ -427,6 +427,7 @@ pub struct RemoteSendSource {
bitrate: Bitrate,
last_sent_rb: Option<Rb>,
last_received_rb: HashMap<u32, ReceivedRb>,
last_request_key_unit: HashMap<u32, Instant>,
}
// The first time we recev a packet for jitter calculations
@ -452,6 +453,7 @@ impl RemoteSendSource {
bitrate: Bitrate::new(BITRATE_WINDOW),
last_sent_rb: None,
last_received_rb: HashMap::new(),
last_request_key_unit: HashMap::new(),
}
}
@ -886,8 +888,29 @@ impl RemoteSendSource {
RemoteReceiveSource {
source: self.source,
rtcp_from: self.rtcp_from,
last_request_key_unit: self.last_request_key_unit,
}
}
pub(crate) fn remote_request_key_unit_allowed(
&mut self,
now: Instant,
rb: &ReceivedRb,
) -> bool {
let rtt = rb.round_trip_time();
// Allow up to one key-unit request per RTT and SSRC.
let mut allowed = false;
self.last_request_key_unit
.entry(rb.rb.ssrc)
.and_modify(|previous| {
allowed = now.duration_since(*previous) >= rtt;
*previous = now;
})
.or_insert_with(|| now);
allowed
}
}
#[derive(Debug)]
@ -969,6 +992,7 @@ impl LocalReceiveSource {
pub struct RemoteReceiveSource {
source: Source,
rtcp_from: Option<SocketAddr>,
last_request_key_unit: HashMap<u32, Instant>,
}
impl RemoteReceiveSource {
@ -976,6 +1000,7 @@ impl RemoteReceiveSource {
Self {
source: Source::new(ssrc),
rtcp_from: None,
last_request_key_unit: HashMap::new(),
}
}
@ -1038,8 +1063,29 @@ impl RemoteReceiveSource {
bitrate: Bitrate::new(BITRATE_WINDOW),
last_sent_rb: None,
last_received_rb: HashMap::new(),
last_request_key_unit: self.last_request_key_unit,
}
}
pub(crate) fn remote_request_key_unit_allowed(
&mut self,
now: Instant,
rb: &ReceivedRb,
) -> bool {
let rtt = rb.round_trip_time();
// Allow up to one key-unit request per RTT.
let mut allowed = false;
self.last_request_key_unit
.entry(rb.rb.ssrc)
.and_modify(|previous| {
allowed = now.duration_since(*previous) >= rtt;
*previous = now;
})
.or_insert_with(|| now);
allowed
}
}
#[derive(Debug)]