diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index 7242e1ac..92ca398f 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -643,9 +643,14 @@ impl RtpRecv { let now = Instant::now(); let mut buffers_to_push = vec![]; + let mut ssrc_collision = vec![]; loop { match session_inner.session.handle_recv(&rtp, addr, now) { - RecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision + RecvReply::SsrcCollision(ssrc) => { + if !ssrc_collision.iter().any(|&needle| needle == ssrc) { + ssrc_collision.push(ssrc); + } + } RecvReply::NewSsrc(ssrc, _pt) => { drop(session_inner); internal_session @@ -711,8 +716,25 @@ impl RtpRecv { } } + let send_rtp_sink = session_inner.rtp_send_sinkpad.clone(); + drop(session_inner); + if let Some(pad) = send_rtp_sink { + // XXX: Another option is to have us rewrite ssrc's instead of asking upstream to do + // so. + for ssrc in ssrc_collision { + pad.send_event( + gst::event::CustomUpstream::builder( + gst::Structure::builder("GstRTPCollision") + .field("ssrc", ssrc) + .build(), + ) + .build(), + ); + } + } + for mut held in buffers_to_push { // TODO: handle other processing if held.new_pad { @@ -821,7 +843,20 @@ impl RtpRecv { .config .emit_by_name::<()>("new-ssrc", &[&ssrc]); } - RtcpRecvReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision + RtcpRecvReply::SsrcCollision(ssrc) => { + if let Some(pad) = rtp_send_sinkpad.as_ref() { + // XXX: Another option is to have us rewrite ssrc's instead of asking + // upstream to do so. + pad.send_event( + gst::event::CustomUpstream::builder( + gst::Structure::builder("GstRTPCollision") + .field("ssrc", ssrc) + .build(), + ) + .build(), + ); + } + } RtcpRecvReply::TimerReconsideration => { let state = self.state.lock().unwrap(); let session = state.session_by_id(id).unwrap(); diff --git a/net/rtp/src/rtpbin2/rtpsend.rs b/net/rtp/src/rtpbin2/rtpsend.rs index 8aaf7c3a..40eea848 100644 --- a/net/rtp/src/rtpbin2/rtpsend.rs +++ b/net/rtp/src/rtpbin2/rtpsend.rs @@ -320,14 +320,20 @@ impl RtpSend { }; let srcpad = session.rtp_send_srcpad.clone().unwrap(); + let sinkpad = session.rtp_send_sinkpad.clone().unwrap(); let session = session.internal_session.clone(); let mut session_inner = session.inner.lock().unwrap(); drop(state); let now = Instant::now(); + let mut ssrc_collision = vec![]; loop { match session_inner.session.handle_send(&rtp, now) { - SendReply::SsrcCollision(_ssrc) => (), // TODO: handle ssrc collision + SendReply::SsrcCollision(ssrc) => { + if !ssrc_collision.iter().any(|&needle| needle == ssrc) { + ssrc_collision.push(ssrc); + } + } SendReply::NewSsrc(ssrc, _pt) => { drop(session_inner); session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]); @@ -340,6 +346,20 @@ impl RtpSend { // TODO: handle other processing drop(mapped); drop(session_inner); + + for ssrc in ssrc_collision { + // XXX: Another option is to have us rewrite ssrc's instead of asking upstream to do + // so. + sinkpad.send_event( + gst::event::CustomUpstream::builder( + gst::Structure::builder("GstRTPCollision") + .field("ssrc", ssrc) + .build(), + ) + .build(), + ); + } + srcpad.push(buffer) }