diff --git a/net/rtp/src/rtpbin2/config.rs b/net/rtp/src/rtpbin2/config.rs index 7757341d..079ebd66 100644 --- a/net/rtp/src/rtpbin2/config.rs +++ b/net/rtp/src/rtpbin2/config.rs @@ -144,9 +144,14 @@ mod imp { fn signals() -> &'static [glib::subclass::Signal] { static SIGNALS: Lazy> = Lazy::new(|| { - vec![glib::subclass::Signal::builder("new-ssrc") - .param_types([u32::static_type()]) - .build()] + vec![ + glib::subclass::Signal::builder("new-ssrc") + .param_types([u32::static_type()]) + .build(), + glib::subclass::Signal::builder("bye-ssrc") + .param_types([u32::static_type()]) + .build(), + ] }); SIGNALS.as_ref() @@ -243,4 +248,45 @@ mod tests { let buf4 = h.pull().unwrap(); assert_eq!(buf4, buf2); } + + #[test] + fn bye_send_ssrc() { + test_init(); + let ssrc = 0x12345678; + let (bye_ssrc_sender, bye_ssrc_receiver) = std::sync::mpsc::sync_channel(16); + let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap(); + let mut h = gst_check::Harness::with_element( + &rtpbin2, + Some("rtp_send_sink_0"), + Some("rtp_send_src_0"), + ); + let mut h_rtcp = gst_check::Harness::with_element(&rtpbin2, None, Some("rtcp_send_src_0")); + let session = h + .element() + .unwrap() + .emit_by_name::("get-session", &[&0u32]); + session.connect("bye-ssrc", false, move |args| { + let bye_ssrc = args[1].get::().unwrap(); + assert_eq!(bye_ssrc, ssrc); + bye_ssrc_sender.send(ssrc).unwrap(); + None + }); + h.set_src_caps_str("application/x-rtp,payload=96,clock-rate=90000"); + let mut segment = gst::Segment::new(); + segment.set_format(gst::Format::Time); + h.push_event(gst::event::Segment::builder(&segment).build()); + let buf1 = gst::Buffer::from_mut_slice(generate_rtp_packet(ssrc, 0x34, 0x10, 16)); + h.push(buf1.clone()).unwrap(); + let buf2 = gst::Buffer::from_mut_slice(generate_rtp_packet(ssrc, 0x35, 0x10, 16)); + h.push(buf2.clone()).unwrap(); + + let buf3 = h.pull().unwrap(); + assert_eq!(buf3, buf1); + let buf4 = h.pull().unwrap(); + assert_eq!(buf4, buf2); + + h.push_event(gst::event::Eos::builder().build()); + let _rtcp = h_rtcp.pull().unwrap(); + assert_eq!(bye_ssrc_receiver.recv().unwrap(), ssrc); + } } diff --git a/net/rtp/src/rtpbin2/imp.rs b/net/rtp/src/rtpbin2/imp.rs index 23c4ded2..bdec146c 100644 --- a/net/rtp/src/rtpbin2/imp.rs +++ b/net/rtp/src/rtpbin2/imp.rs @@ -14,8 +14,8 @@ use once_cell::sync::Lazy; use super::jitterbuffer::{self, JitterBuffer}; use super::session::{ - KeyUnitRequestType, RecvReply, RequestRemoteKeyUnitReply, RtcpRecvReply, RtpProfile, SendReply, - Session, RTCP_MIN_REPORT_INTERVAL, + KeyUnitRequestType, RecvReply, RequestRemoteKeyUnitReply, RtcpRecvReply, RtcpSendReply, + RtpProfile, SendReply, Session, RTCP_MIN_REPORT_INTERVAL, }; use super::source::{ReceivedRb, SourceState}; use super::sync; @@ -102,7 +102,7 @@ impl RtcpSendStream { } impl futures::stream::Stream for RtcpSendStream { - type Item = (Vec, usize); + type Item = (usize, RtcpSendReply); fn poll_next( self: Pin<&mut Self>, @@ -114,8 +114,8 @@ impl futures::stream::Stream for RtcpSendStream { let mut lowest_wait = None; for session in state.sessions.iter_mut() { let mut session = session.inner.lock().unwrap(); - if let Some(data) = session.session.poll_rtcp_send(now, ntp_now) { - return Poll::Ready(Some((data, session.id))); + if let Some(reply) = session.session.poll_rtcp_send(now, ntp_now) { + return Poll::Ready(Some((session.id, reply))); } if let Some(wait) = session.session.poll_rtcp_send_timeout(now) { if lowest_wait.map_or(true, |lowest_wait| wait < lowest_wait) { @@ -808,20 +808,28 @@ impl RtpBin2 { async fn rtcp_task(state: Arc>) { let mut stream = RtcpSendStream::new(state.clone()); - while let Some((data, session_id)) = stream.next().await { + while let Some((session_id, reply)) = stream.next().await { let state = state.lock().unwrap(); let Some(session) = state.session_by_id(session_id) else { continue; }; - let Some(rtcp_srcpad) = session.inner.lock().unwrap().rtcp_send_srcpad.clone() else { - continue; - }; - RUNTIME.spawn_blocking(move || { - let buffer = gst::Buffer::from_mut_slice(data); - if let Err(e) = rtcp_srcpad.push(buffer) { - gst::warning!(CAT, obj: rtcp_srcpad, "Failed to send rtcp data: flow return {e:?}"); + match reply { + RtcpSendReply::Data(data) => { + let Some(rtcp_srcpad) = session.inner.lock().unwrap().rtcp_send_srcpad.clone() + else { + continue; + }; + RUNTIME.spawn_blocking(move || { + let buffer = gst::Buffer::from_mut_slice(data); + if let Err(e) = rtcp_srcpad.push(buffer) { + gst::warning!(CAT, obj: rtcp_srcpad, "Failed to send rtcp data: flow return {e:?}"); + } + }); } - }); + RtcpSendReply::SsrcBye(ssrc) => { + session.config.emit_by_name::<()>("bye-ssrc", &[&ssrc]) + } + } } } @@ -1265,6 +1273,9 @@ impl RtpBin2 { .unwrap() .add_sender_report(ssrc, rtp, ntp); } + RtcpRecvReply::SsrcBye(ssrc) => { + session.config.emit_by_name::<()>("bye-ssrc", &[&ssrc]) + } } } drop(mapped); diff --git a/net/rtp/src/rtpbin2/session.rs b/net/rtp/src/rtpbin2/session.rs index 780528ce..e7107161 100644 --- a/net/rtp/src/rtpbin2/session.rs +++ b/net/rtp/src/rtpbin2/session.rs @@ -101,6 +101,7 @@ pub struct Session { last_rtcp_sent_times: VecDeque, // time for the next early rtcp to be sent next_early_rtcp_time: Option, + pending_rtcp_send: VecDeque, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -151,6 +152,16 @@ pub enum RtcpRecvReply { NewCName((String, u32)), /// A new RTP to NTP mapping was received for an ssrc: (ssrc, RTP, NTP) NewRtpNtp((u32, u32, u64)), + /// A ssrc has byed + SsrcBye(u32), +} + +#[derive(Debug)] +pub enum RtcpSendReply { + /// Data needs to be sent + Data(Vec), + /// A ssrc has byed + SsrcBye(u32), } #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -190,6 +201,7 @@ impl Session { next_early_rtcp_time: None, last_rtcp_handle_time: None, is_point_to_point: true, + pending_rtcp_send: VecDeque::new(), } } @@ -550,10 +562,12 @@ impl Session { source.set_last_activity(now); source.set_state(SourceState::Bye); check_reconsideration = true; + replies.push(RtcpRecvReply::SsrcBye(ssrc)); } else if let Some(source) = self.remote_receivers.get_mut(&ssrc) { source.set_last_activity(now); source.set_state(SourceState::Bye); check_reconsideration = true; + replies.push(RtcpRecvReply::SsrcBye(ssrc)); } // XXX: do we need to handle an unknown ssrc here? // TODO: signal rtcp timeout needs recalcuating @@ -1047,6 +1061,8 @@ impl Session { for (reason, ssrcs) in bye_reason_ssrcs.iter() { let mut bye = Bye::builder().reason_owned(reason); for ssrc in ssrcs.iter() { + self.pending_rtcp_send + .push_front(RtcpSendReply::SsrcBye(*ssrc)); bye = bye.add_source(*ssrc); if let Some(source) = self.local_senders.get_mut(ssrc) { source.bye_sent_at(now); @@ -1165,7 +1181,11 @@ impl Session { /// Produce a RTCP packet (or `None` if it is too early to send a RTCP packet). After this call returns, /// the next time to send a RTCP packet can be retrieved from `poll_rtcp_send_timeout` // TODO: return RtcpPacketBuilder thing - pub fn poll_rtcp_send(&mut self, now: Instant, ntp_now: SystemTime) -> Option> { + pub fn poll_rtcp_send(&mut self, now: Instant, ntp_now: SystemTime) -> Option { + if let Some(event) = self.pending_rtcp_send.pop_back() { + return Some(event); + } + let Some(next_rtcp_send) = self.next_rtcp_send.time else { trace!("no next check time yet"); return None; @@ -1258,7 +1278,7 @@ impl Session { } self.last_rtcp_handle_time = Some(now); self.next_early_rtcp_time = None; - Some(data) + Some(RtcpSendReply::Data(data)) } /// Returns the next time to send a RTCP packet. @@ -1678,7 +1698,7 @@ pub(crate) mod tests { session: &mut Session, mut now: Instant, mut ntp_now: SystemTime, - ) -> (Vec, Instant, SystemTime) { + ) -> (RtcpSendReply, Instant, SystemTime) { let mut ret = None; while ret.is_none() { ret = session.poll_rtcp_send(now, ntp_now); @@ -1777,6 +1797,9 @@ pub(crate) mod tests { ); let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::parse(&rtcp_data).unwrap(); let mut n_rb_ssrcs = 0; @@ -1860,6 +1883,9 @@ pub(crate) mod tests { assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::parse(&rtcp_data).unwrap(); let mut n_rb_ssrcs = 0; for p in rtcp { @@ -1953,6 +1979,9 @@ pub(crate) mod tests { ); let (rtcp_data, _new_now, new_ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { @@ -2022,6 +2051,9 @@ pub(crate) mod tests { assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; trace!("rtcp data {rtcp_data:?}"); let rtcp = Compound::parse(&rtcp_data).unwrap(); let mut n_sr_ssrcs = 0; @@ -2060,6 +2092,9 @@ pub(crate) mod tests { ); let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; trace!("rtcp data {rtcp_data:?}"); let rtcp = Compound::parse(&rtcp_data).unwrap(); @@ -2102,6 +2137,9 @@ pub(crate) mod tests { ); let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { @@ -2139,6 +2177,9 @@ pub(crate) mod tests { assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { @@ -2156,6 +2197,9 @@ pub(crate) mod tests { let mut seen_rr = false; for _ in 0..=5 { let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { trace!("{p:?}"); @@ -2194,6 +2238,9 @@ pub(crate) mod tests { assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::parse(&rtcp_data).unwrap(); for p in rtcp { @@ -2359,7 +2406,10 @@ pub(crate) mod tests { ); // send initial rtcp - let (_rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(_rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::builder().add_packet(Bye::builder().add_source(ssrc)); let mut data = vec![0; 128]; @@ -2369,7 +2419,10 @@ pub(crate) mod tests { let rtcp = Compound::parse(data).unwrap(); assert_eq!( session.handle_rtcp_recv(rtcp, len, None, now, ntp_now), - vec![RtcpRecvReply::TimerReconsideration] + vec![ + RtcpRecvReply::SsrcBye(ssrc), + RtcpRecvReply::TimerReconsideration + ] ); let source = session.mut_remote_sender_source_by_ssrc(ssrc).unwrap(); assert_eq!(source.state(), SourceState::Bye); @@ -2392,7 +2445,10 @@ pub(crate) mod tests { assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); // send initial rtcp - let (_rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(_rtcp_data) = rtcp_data else { + unreachable!(); + }; let source = session.mut_local_send_source_by_ssrc(ssrc).unwrap(); source.mark_bye("Cya"); @@ -2401,7 +2457,10 @@ pub(crate) mod tests { // data after bye should be dropped assert_eq!(session.handle_send(&packet, now), SendReply::Drop); - let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::parse(&rtcp_data).unwrap(); let mut received_bye = false; @@ -2421,6 +2480,11 @@ pub(crate) mod tests { } } assert!(received_bye); + let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::SsrcBye(bye_ssrc) = rtcp_data else { + unreachable!(); + }; + assert_eq!(bye_ssrc, ssrc); } #[test] @@ -2459,7 +2523,10 @@ pub(crate) mod tests { assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); // complete first regular rtcp - let (_rtcp_data, now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let (rtcp_data, now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(_rtcp_data) = rtcp_data else { + unreachable!(); + }; let source = session.mut_local_send_source_by_ssrc(send_ssrc).unwrap(); source.set_sdes_item(SdesItem::NAME, b"name"); @@ -2470,6 +2537,9 @@ pub(crate) mod tests { assert!(session.next_early_rtcp_time.is_some()); let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); + let RtcpSendReply::Data(rtcp_data) = rtcp_data else { + unreachable!(); + }; let rtcp = Compound::parse(&rtcp_data).unwrap(); let mut n_sr_ssrc = 0; let mut n_sdes = 0;