rtpbin2/config: add a new-ssrc signal

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Matthew Waters 2024-02-07 16:08:41 +11:00
parent 06f40e72cb
commit 7d5789032a
2 changed files with 75 additions and 14 deletions

View file

@ -134,12 +134,24 @@ mod imp {
_ => unreachable!(), _ => unreachable!(),
} }
} }
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
vec![glib::subclass::Signal::builder("new-ssrc")
.param_types([u32::static_type()])
.build()]
});
SIGNALS.as_ref()
}
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::test_init; use std::sync::{atomic::AtomicBool, Arc};
use crate::{rtpbin2::session::tests::generate_rtp_packet, test_init};
use super::*; use super::*;
@ -186,4 +198,42 @@ mod tests {
let prop = session.property::<gst::Structure>("pt-map"); let prop = session.property::<gst::Structure>("pt-map");
assert!(prop.has_name("application/x-rtpbin2-pt-map")); assert!(prop.has_name("application/x-rtpbin2-pt-map"));
} }
#[test]
fn new_send_ssrc() {
test_init();
let ssrc = 0x12345678;
let new_ssrc_hit = Arc::new(AtomicBool::new(false));
let rtpbin2 = gst::ElementFactory::make("rtpbin2").build().unwrap();
let mut h = gst_check::Harness::with_element(
&rtpbin2,
Some("rtp_send_sink_0"),
Some("rtp_send_src_0"),
);
let session = h
.element()
.unwrap()
.emit_by_name::<gst::glib::Object>("get-session", &[&0u32]);
let ssrc_hit = new_ssrc_hit.clone();
session.connect("new-ssrc", false, move |args| {
let new_ssrc = args[1].get::<u32>().unwrap();
ssrc_hit.store(true, std::sync::atomic::Ordering::SeqCst);
assert_eq!(new_ssrc, ssrc);
None
});
h.set_src_caps_str("application/x-rtp,payload=96,clock-rate=90000");
let mut segment = gst::Segment::new();
segment.set_format(gst::Format::Time);
h.push_event(gst::event::Segment::builder(&segment).build());
let buf1 = gst::Buffer::from_mut_slice(generate_rtp_packet(ssrc, 0x34, 0x10, 16));
h.push(buf1.clone()).unwrap();
assert!(new_ssrc_hit.load(std::sync::atomic::Ordering::SeqCst));
let buf2 = gst::Buffer::from_mut_slice(generate_rtp_packet(ssrc, 0x35, 0x10, 16));
h.push(buf2.clone()).unwrap();
let buf3 = h.pull().unwrap();
assert_eq!(buf3, buf1);
let buf4 = h.pull().unwrap();
assert_eq!(buf4, buf2);
}
} }

View file

@ -1008,7 +1008,11 @@ impl RtpBin2 {
loop { loop {
match session_inner.session.handle_recv(&rtp, addr, now) { match session_inner.session.handle_recv(&rtp, addr, now) {
RecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision RecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision
RecvReply::NewSsrc(_ssrc, _pt) => (), // TODO: signal new ssrc externally RecvReply::NewSsrc(ssrc, _pt) => {
drop(session_inner);
session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]);
session_inner = session.inner.lock().unwrap();
}
RecvReply::Hold(hold_id) => { RecvReply::Hold(hold_id) => {
let pt = rtp.payload_type(); let pt = rtp.payload_type();
let ssrc = rtp.ssrc(); let ssrc = rtp.ssrc();
@ -1149,22 +1153,26 @@ impl RtpBin2 {
}; };
let session = session.clone(); let session = session.clone();
let mut session = session.inner.lock().unwrap(); let mut session_inner = session.inner.lock().unwrap();
drop(state); drop(state);
let now = Instant::now(); let now = Instant::now();
loop { loop {
match session.session.handle_send(&rtp, now) { match session_inner.session.handle_send(&rtp, now) {
SendReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision SendReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision
SendReply::NewSsrc(_ssrc, _pt) => (), // TODO; signal ssrc externally SendReply::NewSsrc(ssrc, _pt) => {
drop(session_inner);
session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]);
session_inner = session.inner.lock().unwrap();
}
SendReply::Passthrough => break, SendReply::Passthrough => break,
SendReply::Drop => return Ok(gst::FlowSuccess::Ok), SendReply::Drop => return Ok(gst::FlowSuccess::Ok),
} }
} }
// TODO: handle other processing // TODO: handle other processing
drop(mapped); drop(mapped);
let srcpad = session.rtp_send_srcpad.clone().unwrap(); let srcpad = session_inner.rtp_send_srcpad.clone().unwrap();
drop(session); drop(session_inner);
srcpad.push(buffer) srcpad.push(buffer)
} }
@ -1201,21 +1209,24 @@ impl RtpBin2 {
}; };
let session = session.clone(); let session = session.clone();
let mut session = session.inner.lock().unwrap(); let mut session_inner = session.inner.lock().unwrap();
let waker = state.rtcp_waker.clone(); let waker = state.rtcp_waker.clone();
drop(state); drop(state);
let now = Instant::now(); let now = Instant::now();
let ntp_now = SystemTime::now(); let ntp_now = SystemTime::now();
let replies = session let replies =
session_inner
.session .session
.handle_rtcp_recv(rtcp, mapped.len(), addr, now, ntp_now); .handle_rtcp_recv(rtcp, mapped.len(), addr, now, ntp_now);
let rtp_send_sinkpad = session.rtp_send_sinkpad.clone(); let rtp_send_sinkpad = session_inner.rtp_send_sinkpad.clone();
drop(session); drop(session_inner);
for reply in replies { for reply in replies {
match reply { match reply {
RtcpRecvReply::NewSsrc(_ssrc) => (), // TODO: handle new ssrc RtcpRecvReply::NewSsrc(ssrc) => {
session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]);
}
RtcpRecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision RtcpRecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision
RtcpRecvReply::TimerReconsideration => { RtcpRecvReply::TimerReconsideration => {
if let Some(ref waker) = waker { if let Some(ref waker) = waker {