rtpbin2: handle ssrc collisions

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Matthew Waters 2024-05-24 12:58:51 +10:00
parent 9485265769
commit 525179f666
2 changed files with 58 additions and 3 deletions

View file

@ -643,9 +643,14 @@ impl RtpRecv {
let now = Instant::now(); let now = Instant::now();
let mut buffers_to_push = vec![]; let mut buffers_to_push = vec![];
let mut ssrc_collision = vec![];
loop { loop {
match session_inner.session.handle_recv(&rtp, addr, now) { match session_inner.session.handle_recv(&rtp, addr, now) {
RecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision RecvReply::SsrcCollision(ssrc) => {
if !ssrc_collision.iter().any(|&needle| needle == ssrc) {
ssrc_collision.push(ssrc);
}
}
RecvReply::NewSsrc(ssrc, _pt) => { RecvReply::NewSsrc(ssrc, _pt) => {
drop(session_inner); drop(session_inner);
internal_session internal_session
@ -711,8 +716,25 @@ impl RtpRecv {
} }
} }
let send_rtp_sink = session_inner.rtp_send_sinkpad.clone();
drop(session_inner); drop(session_inner);
if let Some(pad) = send_rtp_sink {
// XXX: Another option is to have us rewrite ssrc's instead of asking upstream to do
// so.
for ssrc in ssrc_collision {
pad.send_event(
gst::event::CustomUpstream::builder(
gst::Structure::builder("GstRTPCollision")
.field("ssrc", ssrc)
.build(),
)
.build(),
);
}
}
for mut held in buffers_to_push { for mut held in buffers_to_push {
// TODO: handle other processing // TODO: handle other processing
if held.new_pad { if held.new_pad {
@ -821,7 +843,20 @@ impl RtpRecv {
.config .config
.emit_by_name::<()>("new-ssrc", &[&ssrc]); .emit_by_name::<()>("new-ssrc", &[&ssrc]);
} }
RtcpRecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision RtcpRecvReply::SsrcCollision(ssrc) => {
if let Some(pad) = rtp_send_sinkpad.as_ref() {
// XXX: Another option is to have us rewrite ssrc's instead of asking
// upstream to do so.
pad.send_event(
gst::event::CustomUpstream::builder(
gst::Structure::builder("GstRTPCollision")
.field("ssrc", ssrc)
.build(),
)
.build(),
);
}
}
RtcpRecvReply::TimerReconsideration => { RtcpRecvReply::TimerReconsideration => {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let session = state.session_by_id(id).unwrap(); let session = state.session_by_id(id).unwrap();

View file

@ -320,14 +320,20 @@ impl RtpSend {
}; };
let srcpad = session.rtp_send_srcpad.clone().unwrap(); let srcpad = session.rtp_send_srcpad.clone().unwrap();
let sinkpad = session.rtp_send_sinkpad.clone().unwrap();
let session = session.internal_session.clone(); let session = session.internal_session.clone();
let mut session_inner = session.inner.lock().unwrap(); let mut session_inner = session.inner.lock().unwrap();
drop(state); drop(state);
let now = Instant::now(); let now = Instant::now();
let mut ssrc_collision = vec![];
loop { loop {
match session_inner.session.handle_send(&rtp, now) { match session_inner.session.handle_send(&rtp, now) {
SendReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision SendReply::SsrcCollision(ssrc) => {
if !ssrc_collision.iter().any(|&needle| needle == ssrc) {
ssrc_collision.push(ssrc);
}
}
SendReply::NewSsrc(ssrc, _pt) => { SendReply::NewSsrc(ssrc, _pt) => {
drop(session_inner); drop(session_inner);
session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]); session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]);
@ -340,6 +346,20 @@ impl RtpSend {
// TODO: handle other processing // TODO: handle other processing
drop(mapped); drop(mapped);
drop(session_inner); drop(session_inner);
for ssrc in ssrc_collision {
// XXX: Another option is to have us rewrite ssrc's instead of asking upstream to do
// so.
sinkpad.send_event(
gst::event::CustomUpstream::builder(
gst::Structure::builder("GstRTPCollision")
.field("ssrc", ssrc)
.build(),
)
.build(),
);
}
srcpad.push(buffer) srcpad.push(buffer)
} }