diff --git a/net/rtp/src/rtpbin2/rtpsend.rs b/net/rtp/src/rtpbin2/rtpsend.rs index 40eea848..04104c0e 100644 --- a/net/rtp/src/rtpbin2/rtpsend.rs +++ b/net/rtp/src/rtpbin2/rtpsend.rs @@ -198,28 +198,39 @@ impl SendSession { async fn rtcp_task(state: Arc>, session_id: usize) { let mut stream = RtcpSendStream::new(state.clone(), session_id); + // we use a semaphore instead of mutex for ordering + // i.e. we should only allow a single pad push, but still allow other rtcp tasks to + // continue operating + let sem = Arc::new(tokio::sync::Semaphore::new(1)); while let Some(reply) = stream.next().await { - let state = state.lock().unwrap(); - let Some(session) = state.session_by_id(session_id) else { - continue; - }; - match reply { - RtcpSendReply::Data(data) => { - let Some(rtcp_srcpad) = session.rtcp_send_srcpad.clone() else { - continue; - }; - drop(state); - RUNTIME.spawn_blocking(move || { - let buffer = gst::Buffer::from_mut_slice(data); - if let Err(e) = rtcp_srcpad.push(buffer) { - gst::warning!(CAT, obj: rtcp_srcpad, "Failed to send rtcp data: flow return {e:?}"); - } - }); + let send = { + let state = state.lock().unwrap(); + let Some(session) = state.session_by_id(session_id) else { + continue; + }; + match reply { + RtcpSendReply::Data(data) => { + session.rtcp_send_srcpad.clone().map(|pad| (pad, data)) + } + RtcpSendReply::SsrcBye(ssrc) => { + session + .internal_session + .config + .emit_by_name::<()>("bye-ssrc", &[&ssrc]); + None + } } - RtcpSendReply::SsrcBye(ssrc) => session - .internal_session - .config - .emit_by_name::<()>("bye-ssrc", &[&ssrc]), + }; + + if let Some((rtcp_srcpad, data)) = send { + let acquired = sem.clone().acquire_owned().await; + RUNTIME.spawn_blocking(move || { + let buffer = gst::Buffer::from_mut_slice(data); + if let Err(e) = rtcp_srcpad.push(buffer) { + gst::warning!(CAT, obj: rtcp_srcpad, "Failed to send rtcp data: flow return {e:?}"); + } + drop(acquired); + }); } } }