diff --git a/Cargo.lock b/Cargo.lock index e85d93f2f..28793654b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5831,7 +5831,7 @@ dependencies = [ [[package]] name = "rtcp-types" version = "0.0.1" -source = "git+https://github.com/ystreet/rtcp-types#f7fddfb87e9d7f4fed0b967fedc34995dd81ca86" +source = "git+https://github.com/ystreet/rtcp-types#c1da8a1a193a0c02d798fea5f16863b69abd9000" dependencies = [ "thiserror", ] diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index ae2a5c256..e9e7083b2 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -64,4 +64,4 @@ versioning = false import_library = false [package.metadata.capi.pkg_config] -requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gstreamer-net-1.0, gobject-2.0, glib-2.0, gmodule-2.0, gio-2.0" +requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gstreamer-net-1.0, gstreamer-video-1.0 gobject-2.0, glib-2.0, gmodule-2.0, gio-2.0" diff --git a/net/rtp/src/rtpbin2/imp.rs b/net/rtp/src/rtpbin2/imp.rs index bfddf6e4a..f1eaea770 100644 --- a/net/rtp/src/rtpbin2/imp.rs +++ b/net/rtp/src/rtpbin2/imp.rs @@ -791,6 +791,9 @@ impl RtpBin2 { let replies = session .session .handle_rtcp_recv(rtcp, mapped.len(), addr, now, ntp_now); + let rtp_send_sinkpad = session.rtp_send_sinkpad.clone(); + drop(session); + for reply in replies { match reply { RtcpRecvReply::NewSsrc(_ssrc) => (), // TODO: handle new ssrc @@ -801,6 +804,20 @@ impl RtpBin2 { waker.wake_by_ref(); } } + RtcpRecvReply::RequestKeyUnit { ssrcs, fir } => { + if let Some(ref rtp_send_sinkpad) = rtp_send_sinkpad { + gst::debug!(CAT, imp: self, "Sending force-keyunit event for ssrcs {ssrcs:?} (all headers: {fir})"); + // TODO what to do with the ssrc? + let event = gst_video::UpstreamForceKeyUnitEvent::builder() + .all_headers(fir) + .other_field("ssrcs", &gst::Array::new(ssrcs)) + .build(); + + let _ = rtp_send_sinkpad.push_event(event); + } else { + gst::debug!(CAT, imp: self, "Can't send force-keyunit event because of missing sinkpad"); + } + } } } drop(mapped); diff --git a/net/rtp/src/rtpbin2/session.rs b/net/rtp/src/rtpbin2/session.rs index 062e5733a..5be1ff353 100644 --- a/net/rtp/src/rtpbin2/session.rs +++ b/net/rtp/src/rtpbin2/session.rs @@ -129,7 +129,7 @@ pub enum SendReply { SsrcCollision(u32), } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum RtcpRecvReply { /// A new ssrc was discovered. If you want to change things about the new ssrc, then do it now /// before pushing the buffer again @@ -138,6 +138,8 @@ pub enum RtcpRecvReply { SsrcCollision(u32), /// RTCP timer needs to be reconsidered. Call poll_rtcp_send_timeout() to get the new time TimerReconsideration, + /// Request a key unit for the given SSRC of ours + RequestKeyUnit { ssrcs: Vec, fir: bool }, } impl Session { @@ -434,6 +436,7 @@ impl Session { ntp_time: SystemTime, ) -> Option { let mut ret = None; + if let Some(source) = self.local_senders.get_mut(&rb.ssrc()) { source.add_last_rb(sender_ssrc, rb, now, ntp_time); source.set_last_activity(now); @@ -452,6 +455,7 @@ impl Session { source.set_last_activity(now); source.add_last_rb(sender_ssrc, rb, now, ntp_time); } + ret } @@ -655,7 +659,27 @@ impl Session { } } } - Ok(Packet::Unknown(_unk)) => (), + Ok(Packet::PayloadFeedback(pf)) => { + if let Ok(_pli) = pf.parse_fci::() { + self.handle_remote_request_key_unit( + now, + &mut replies, + false, + pf.sender_ssrc(), + std::iter::once(pf.media_ssrc()), + ); + } else if let Ok(fir) = pf.parse_fci::() { + self.handle_remote_request_key_unit( + now, + &mut replies, + true, + pf.sender_ssrc(), + // TODO: What to do with the sequence? + fir.entries().map(|entry| entry.ssrc()), + ); + } + } + Ok(Packet::TransportFeedback(_)) | Ok(Packet::Unknown(_)) => (), // TODO: in RFC4585 profile, need to listen for feedback messages and remove any // that we would have sent Err(_) => (), @@ -665,6 +689,56 @@ impl Session { replies } + fn handle_remote_request_key_unit( + &mut self, + now: Instant, + replies: &mut Vec, + fir: bool, + sender_ssrc: u32, + media_ssrcs: impl Iterator, + ) { + if !self.remote_receivers.contains_key(&sender_ssrc) + && !self.remote_senders.contains_key(&sender_ssrc) + { + trace!("No remote source known for sender ssrc {sender_ssrc}"); + } + + let mut ssrcs = Vec::new(); + for media_ssrc in media_ssrcs { + let Some(sender) = self.local_senders.get(&media_ssrc) else { + trace!("Not a local sender for ssrc {media_ssrc}"); + continue; + }; + + let Some(rb) = sender + .received_report_blocks() + .find(|(ssrc, _rb)| *ssrc == sender_ssrc) + else { + trace!("No RB for sender ssrc {sender_ssrc} yet"); + continue; + }; + + if let Some(sender) = self.remote_senders.get_mut(&sender_ssrc) { + if !sender.remote_request_key_unit_allowed(now, rb.1) { + trace!("Requesting key-unit not allowed again yet"); + continue; + } + } else if let Some(sender) = self.remote_receivers.get_mut(&sender_ssrc) { + if !sender.remote_request_key_unit_allowed(now, rb.1) { + trace!("Requesting key-unit not allowed again yet"); + continue; + } + } + + trace!("Requesting key-unit from sender ssrc {sender_ssrc} for media ssrc {media_ssrc} (fir: {fir})"); + ssrcs.push(media_ssrc); + } + + if !ssrcs.is_empty() { + replies.push(RtcpRecvReply::RequestKeyUnit { ssrcs, fir }); + } + } + fn generate_sr<'a>( &mut self, mut rtcp: CompoundBuilder<'a>, diff --git a/net/rtp/src/rtpbin2/source.rs b/net/rtp/src/rtpbin2/source.rs index 11d915c5c..b5f01a4c8 100644 --- a/net/rtp/src/rtpbin2/source.rs +++ b/net/rtp/src/rtpbin2/source.rs @@ -427,6 +427,7 @@ pub struct RemoteSendSource { bitrate: Bitrate, last_sent_rb: Option, last_received_rb: HashMap, + last_request_key_unit: HashMap, } // The first time we recev a packet for jitter calculations @@ -452,6 +453,7 @@ impl RemoteSendSource { bitrate: Bitrate::new(BITRATE_WINDOW), last_sent_rb: None, last_received_rb: HashMap::new(), + last_request_key_unit: HashMap::new(), } } @@ -886,8 +888,29 @@ impl RemoteSendSource { RemoteReceiveSource { source: self.source, rtcp_from: self.rtcp_from, + last_request_key_unit: self.last_request_key_unit, } } + + pub(crate) fn remote_request_key_unit_allowed( + &mut self, + now: Instant, + rb: &ReceivedRb, + ) -> bool { + let rtt = rb.round_trip_time(); + + // Allow up to one key-unit request per RTT and SSRC. + let mut allowed = false; + self.last_request_key_unit + .entry(rb.rb.ssrc) + .and_modify(|previous| { + allowed = now.duration_since(*previous) >= rtt; + *previous = now; + }) + .or_insert_with(|| now); + + allowed + } } #[derive(Debug)] @@ -969,6 +992,7 @@ impl LocalReceiveSource { pub struct RemoteReceiveSource { source: Source, rtcp_from: Option, + last_request_key_unit: HashMap, } impl RemoteReceiveSource { @@ -976,6 +1000,7 @@ impl RemoteReceiveSource { Self { source: Source::new(ssrc), rtcp_from: None, + last_request_key_unit: HashMap::new(), } } @@ -1038,8 +1063,29 @@ impl RemoteReceiveSource { bitrate: Bitrate::new(BITRATE_WINDOW), last_sent_rb: None, last_received_rb: HashMap::new(), + last_request_key_unit: self.last_request_key_unit, } } + + pub(crate) fn remote_request_key_unit_allowed( + &mut self, + now: Instant, + rb: &ReceivedRb, + ) -> bool { + let rtt = rb.round_trip_time(); + + // Allow up to one key-unit request per RTT. + let mut allowed = false; + self.last_request_key_unit + .entry(rb.rb.ssrc) + .and_modify(|previous| { + allowed = now.duration_since(*previous) >= rtt; + *previous = now; + }) + .or_insert_with(|| now); + + allowed + } } #[derive(Debug)]