rtpbin2: expose session signals for new/bye ssrc

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Matthew Waters 2024-02-26 13:46:46 +11:00
parent d480c6c2d3
commit 0121d78482
3 changed files with 152 additions and 25 deletions

View file

@ -144,9 +144,14 @@ mod imp {
fn signals() -> &'static [glib::subclass::Signal] { fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| { static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
vec![glib::subclass::Signal::builder("new-ssrc") vec![
glib::subclass::Signal::builder("new-ssrc")
.param_types([u32::static_type()]) .param_types([u32::static_type()])
.build()] .build(),
glib::subclass::Signal::builder("bye-ssrc")
.param_types([u32::static_type()])
.build(),
]
}); });
SIGNALS.as_ref() SIGNALS.as_ref()
@ -243,4 +248,45 @@ mod tests {
let buf4 = h.pull().unwrap(); let buf4 = h.pull().unwrap();
assert_eq!(buf4, buf2); assert_eq!(buf4, buf2);
} }
#[test]
fn bye_send_ssrc() {
test_init();
let ssrc = 0x12345678;
let (bye_ssrc_sender, bye_ssrc_receiver) = std::sync::mpsc::sync_channel(16);
let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap();
let mut h = gst_check::Harness::with_element(
&rtpbin2,
Some("rtp_send_sink_0"),
Some("rtp_send_src_0"),
);
let mut h_rtcp = gst_check::Harness::with_element(&rtpbin2, None, Some("rtcp_send_src_0"));
let session = h
.element()
.unwrap()
.emit_by_name::<gst::glib::Object>("get-session", &[&0u32]);
session.connect("bye-ssrc", false, move |args| {
let bye_ssrc = args[1].get::<u32>().unwrap();
assert_eq!(bye_ssrc, ssrc);
bye_ssrc_sender.send(ssrc).unwrap();
None
});
h.set_src_caps_str("application/x-rtp,payload=96,clock-rate=90000");
let mut segment = gst::Segment::new();
segment.set_format(gst::Format::Time);
h.push_event(gst::event::Segment::builder(&segment).build());
let buf1 = gst::Buffer::from_mut_slice(generate_rtp_packet(ssrc, 0x34, 0x10, 16));
h.push(buf1.clone()).unwrap();
let buf2 = gst::Buffer::from_mut_slice(generate_rtp_packet(ssrc, 0x35, 0x10, 16));
h.push(buf2.clone()).unwrap();
let buf3 = h.pull().unwrap();
assert_eq!(buf3, buf1);
let buf4 = h.pull().unwrap();
assert_eq!(buf4, buf2);
h.push_event(gst::event::Eos::builder().build());
let _rtcp = h_rtcp.pull().unwrap();
assert_eq!(bye_ssrc_receiver.recv().unwrap(), ssrc);
}
} }

View file

@ -14,8 +14,8 @@ use once_cell::sync::Lazy;
use super::jitterbuffer::{self, JitterBuffer}; use super::jitterbuffer::{self, JitterBuffer};
use super::session::{ use super::session::{
KeyUnitRequestType, RecvReply, RequestRemoteKeyUnitReply, RtcpRecvReply, RtpProfile, SendReply, KeyUnitRequestType, RecvReply, RequestRemoteKeyUnitReply, RtcpRecvReply, RtcpSendReply,
Session, RTCP_MIN_REPORT_INTERVAL, RtpProfile, SendReply, Session, RTCP_MIN_REPORT_INTERVAL,
}; };
use super::source::{ReceivedRb, SourceState}; use super::source::{ReceivedRb, SourceState};
use super::sync; use super::sync;
@ -102,7 +102,7 @@ impl RtcpSendStream {
} }
impl futures::stream::Stream for RtcpSendStream { impl futures::stream::Stream for RtcpSendStream {
type Item = (Vec<u8>, usize); type Item = (usize, RtcpSendReply);
fn poll_next( fn poll_next(
self: Pin<&mut Self>, self: Pin<&mut Self>,
@ -114,8 +114,8 @@ impl futures::stream::Stream for RtcpSendStream {
let mut lowest_wait = None; let mut lowest_wait = None;
for session in state.sessions.iter_mut() { for session in state.sessions.iter_mut() {
let mut session = session.inner.lock().unwrap(); let mut session = session.inner.lock().unwrap();
if let Some(data) = session.session.poll_rtcp_send(now, ntp_now) { if let Some(reply) = session.session.poll_rtcp_send(now, ntp_now) {
return Poll::Ready(Some((data, session.id))); return Poll::Ready(Some((session.id, reply)));
} }
if let Some(wait) = session.session.poll_rtcp_send_timeout(now) { if let Some(wait) = session.session.poll_rtcp_send_timeout(now) {
if lowest_wait.map_or(true, |lowest_wait| wait < lowest_wait) { if lowest_wait.map_or(true, |lowest_wait| wait < lowest_wait) {
@ -808,12 +808,15 @@ impl RtpBin2 {
async fn rtcp_task(state: Arc<Mutex<State>>) { async fn rtcp_task(state: Arc<Mutex<State>>) {
let mut stream = RtcpSendStream::new(state.clone()); let mut stream = RtcpSendStream::new(state.clone());
while let Some((data, session_id)) = stream.next().await { while let Some((session_id, reply)) = stream.next().await {
let state = state.lock().unwrap(); let state = state.lock().unwrap();
let Some(session) = state.session_by_id(session_id) else { let Some(session) = state.session_by_id(session_id) else {
continue; continue;
}; };
let Some(rtcp_srcpad) = session.inner.lock().unwrap().rtcp_send_srcpad.clone() else { match reply {
RtcpSendReply::Data(data) => {
let Some(rtcp_srcpad) = session.inner.lock().unwrap().rtcp_send_srcpad.clone()
else {
continue; continue;
}; };
RUNTIME.spawn_blocking(move || { RUNTIME.spawn_blocking(move || {
@ -823,6 +826,11 @@ impl RtpBin2 {
} }
}); });
} }
RtcpSendReply::SsrcBye(ssrc) => {
session.config.emit_by_name::<()>("bye-ssrc", &[&ssrc])
}
}
}
} }
fn stop_rtcp_task(&self) { fn stop_rtcp_task(&self) {
@ -1265,6 +1273,9 @@ impl RtpBin2 {
.unwrap() .unwrap()
.add_sender_report(ssrc, rtp, ntp); .add_sender_report(ssrc, rtp, ntp);
} }
RtcpRecvReply::SsrcBye(ssrc) => {
session.config.emit_by_name::<()>("bye-ssrc", &[&ssrc])
}
} }
} }
drop(mapped); drop(mapped);

View file

@ -101,6 +101,7 @@ pub struct Session {
last_rtcp_sent_times: VecDeque<Instant>, last_rtcp_sent_times: VecDeque<Instant>,
// time for the next early rtcp to be sent // time for the next early rtcp to be sent
next_early_rtcp_time: Option<Instant>, next_early_rtcp_time: Option<Instant>,
pending_rtcp_send: VecDeque<RtcpSendReply>,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -151,6 +152,16 @@ pub enum RtcpRecvReply {
NewCName((String, u32)), NewCName((String, u32)),
/// A new RTP to NTP mapping was received for an ssrc: (ssrc, RTP, NTP) /// A new RTP to NTP mapping was received for an ssrc: (ssrc, RTP, NTP)
NewRtpNtp((u32, u32, u64)), NewRtpNtp((u32, u32, u64)),
/// A ssrc has byed
SsrcBye(u32),
}
#[derive(Debug)]
pub enum RtcpSendReply {
/// Data needs to be sent
Data(Vec<u8>),
/// A ssrc has byed
SsrcBye(u32),
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
@ -190,6 +201,7 @@ impl Session {
next_early_rtcp_time: None, next_early_rtcp_time: None,
last_rtcp_handle_time: None, last_rtcp_handle_time: None,
is_point_to_point: true, is_point_to_point: true,
pending_rtcp_send: VecDeque::new(),
} }
} }
@ -550,10 +562,12 @@ impl Session {
source.set_last_activity(now); source.set_last_activity(now);
source.set_state(SourceState::Bye); source.set_state(SourceState::Bye);
check_reconsideration = true; check_reconsideration = true;
replies.push(RtcpRecvReply::SsrcBye(ssrc));
} else if let Some(source) = self.remote_receivers.get_mut(&ssrc) { } else if let Some(source) = self.remote_receivers.get_mut(&ssrc) {
source.set_last_activity(now); source.set_last_activity(now);
source.set_state(SourceState::Bye); source.set_state(SourceState::Bye);
check_reconsideration = true; check_reconsideration = true;
replies.push(RtcpRecvReply::SsrcBye(ssrc));
} }
// XXX: do we need to handle an unknown ssrc here? // XXX: do we need to handle an unknown ssrc here?
// TODO: signal rtcp timeout needs recalcuating // TODO: signal rtcp timeout needs recalcuating
@ -1047,6 +1061,8 @@ impl Session {
for (reason, ssrcs) in bye_reason_ssrcs.iter() { for (reason, ssrcs) in bye_reason_ssrcs.iter() {
let mut bye = Bye::builder().reason_owned(reason); let mut bye = Bye::builder().reason_owned(reason);
for ssrc in ssrcs.iter() { for ssrc in ssrcs.iter() {
self.pending_rtcp_send
.push_front(RtcpSendReply::SsrcBye(*ssrc));
bye = bye.add_source(*ssrc); bye = bye.add_source(*ssrc);
if let Some(source) = self.local_senders.get_mut(ssrc) { if let Some(source) = self.local_senders.get_mut(ssrc) {
source.bye_sent_at(now); source.bye_sent_at(now);
@ -1165,7 +1181,11 @@ impl Session {
/// Produce a RTCP packet (or `None` if it is too early to send a RTCP packet). After this call returns, /// Produce a RTCP packet (or `None` if it is too early to send a RTCP packet). After this call returns,
/// the next time to send a RTCP packet can be retrieved from `poll_rtcp_send_timeout` /// the next time to send a RTCP packet can be retrieved from `poll_rtcp_send_timeout`
// TODO: return RtcpPacketBuilder thing // TODO: return RtcpPacketBuilder thing
pub fn poll_rtcp_send(&mut self, now: Instant, ntp_now: SystemTime) -> Option<Vec<u8>> { pub fn poll_rtcp_send(&mut self, now: Instant, ntp_now: SystemTime) -> Option<RtcpSendReply> {
if let Some(event) = self.pending_rtcp_send.pop_back() {
return Some(event);
}
let Some(next_rtcp_send) = self.next_rtcp_send.time else { let Some(next_rtcp_send) = self.next_rtcp_send.time else {
trace!("no next check time yet"); trace!("no next check time yet");
return None; return None;
@ -1258,7 +1278,7 @@ impl Session {
} }
self.last_rtcp_handle_time = Some(now); self.last_rtcp_handle_time = Some(now);
self.next_early_rtcp_time = None; self.next_early_rtcp_time = None;
Some(data) Some(RtcpSendReply::Data(data))
} }
/// Returns the next time to send a RTCP packet. /// Returns the next time to send a RTCP packet.
@ -1678,7 +1698,7 @@ pub(crate) mod tests {
session: &mut Session, session: &mut Session,
mut now: Instant, mut now: Instant,
mut ntp_now: SystemTime, mut ntp_now: SystemTime,
) -> (Vec<u8>, Instant, SystemTime) { ) -> (RtcpSendReply, Instant, SystemTime) {
let mut ret = None; let mut ret = None;
while ret.is_none() { while ret.is_none() {
ret = session.poll_rtcp_send(now, ntp_now); ret = session.poll_rtcp_send(now, ntp_now);
@ -1777,6 +1797,9 @@ pub(crate) mod tests {
); );
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
let mut n_rb_ssrcs = 0; let mut n_rb_ssrcs = 0;
@ -1860,6 +1883,9 @@ pub(crate) mod tests {
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
let mut n_rb_ssrcs = 0; let mut n_rb_ssrcs = 0;
for p in rtcp { for p in rtcp {
@ -1953,6 +1979,9 @@ pub(crate) mod tests {
); );
let (rtcp_data, _new_now, new_ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, _new_now, new_ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp { for p in rtcp {
@ -2022,6 +2051,9 @@ pub(crate) mod tests {
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap(); let rtcp_data = session.poll_rtcp_send(now, ntp_now).unwrap();
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
trace!("rtcp data {rtcp_data:?}"); trace!("rtcp data {rtcp_data:?}");
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
let mut n_sr_ssrcs = 0; let mut n_sr_ssrcs = 0;
@ -2060,6 +2092,9 @@ pub(crate) mod tests {
); );
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
trace!("rtcp data {rtcp_data:?}"); trace!("rtcp data {rtcp_data:?}");
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
@ -2102,6 +2137,9 @@ pub(crate) mod tests {
); );
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp { for p in rtcp {
@ -2139,6 +2177,9 @@ pub(crate) mod tests {
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp { for p in rtcp {
@ -2156,6 +2197,9 @@ pub(crate) mod tests {
let mut seen_rr = false; let mut seen_rr = false;
for _ in 0..=5 { for _ in 0..=5 {
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp { for p in rtcp {
trace!("{p:?}"); trace!("{p:?}");
@ -2194,6 +2238,9 @@ pub(crate) mod tests {
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
for p in rtcp { for p in rtcp {
@ -2359,7 +2406,10 @@ pub(crate) mod tests {
); );
// send initial rtcp // send initial rtcp
let (_rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(_rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::builder().add_packet(Bye::builder().add_source(ssrc)); let rtcp = Compound::builder().add_packet(Bye::builder().add_source(ssrc));
let mut data = vec![0; 128]; let mut data = vec![0; 128];
@ -2369,7 +2419,10 @@ pub(crate) mod tests {
let rtcp = Compound::parse(data).unwrap(); let rtcp = Compound::parse(data).unwrap();
assert_eq!( assert_eq!(
session.handle_rtcp_recv(rtcp, len, None, now, ntp_now), session.handle_rtcp_recv(rtcp, len, None, now, ntp_now),
vec![RtcpRecvReply::TimerReconsideration] vec![
RtcpRecvReply::SsrcBye(ssrc),
RtcpRecvReply::TimerReconsideration
]
); );
let source = session.mut_remote_sender_source_by_ssrc(ssrc).unwrap(); let source = session.mut_remote_sender_source_by_ssrc(ssrc).unwrap();
assert_eq!(source.state(), SourceState::Bye); assert_eq!(source.state(), SourceState::Bye);
@ -2392,7 +2445,10 @@ pub(crate) mod tests {
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
// send initial rtcp // send initial rtcp
let (_rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(_rtcp_data) = rtcp_data else {
unreachable!();
};
let source = session.mut_local_send_source_by_ssrc(ssrc).unwrap(); let source = session.mut_local_send_source_by_ssrc(ssrc).unwrap();
source.mark_bye("Cya"); source.mark_bye("Cya");
@ -2401,7 +2457,10 @@ pub(crate) mod tests {
// data after bye should be dropped // data after bye should be dropped
assert_eq!(session.handle_send(&packet, now), SendReply::Drop); assert_eq!(session.handle_send(&packet, now), SendReply::Drop);
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, now, ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
let mut received_bye = false; let mut received_bye = false;
@ -2421,6 +2480,11 @@ pub(crate) mod tests {
} }
} }
assert!(received_bye); assert!(received_bye);
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::SsrcBye(bye_ssrc) = rtcp_data else {
unreachable!();
};
assert_eq!(bye_ssrc, ssrc);
} }
#[test] #[test]
@ -2459,7 +2523,10 @@ pub(crate) mod tests {
assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough); assert_eq!(session.handle_send(&packet, now), SendReply::Passthrough);
// complete first regular rtcp // complete first regular rtcp
let (_rtcp_data, now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(_rtcp_data) = rtcp_data else {
unreachable!();
};
let source = session.mut_local_send_source_by_ssrc(send_ssrc).unwrap(); let source = session.mut_local_send_source_by_ssrc(send_ssrc).unwrap();
source.set_sdes_item(SdesItem::NAME, b"name"); source.set_sdes_item(SdesItem::NAME, b"name");
@ -2470,6 +2537,9 @@ pub(crate) mod tests {
assert!(session.next_early_rtcp_time.is_some()); assert!(session.next_early_rtcp_time.is_some());
let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now); let (rtcp_data, _now, _ntp_now) = next_rtcp_packet(&mut session, now, ntp_now);
let RtcpSendReply::Data(rtcp_data) = rtcp_data else {
unreachable!();
};
let rtcp = Compound::parse(&rtcp_data).unwrap(); let rtcp = Compound::parse(&rtcp_data).unwrap();
let mut n_sr_ssrc = 0; let mut n_sr_ssrc = 0;
let mut n_sdes = 0; let mut n_sdes = 0;