rtpsend: ensure only a single rtcp pad push

Otherwise, it can occur that multiple rtcp packets may be produced out
of order.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Matthew Waters 2024-05-27 23:35:33 +10:00
parent 525179f666
commit df32e1ebfa

View file

@ -198,29 +198,40 @@ impl SendSession {
async fn rtcp_task(state: Arc<Mutex<State>>, session_id: usize) { async fn rtcp_task(state: Arc<Mutex<State>>, session_id: usize) {
let mut stream = RtcpSendStream::new(state.clone(), session_id); let mut stream = RtcpSendStream::new(state.clone(), session_id);
// we use a semaphore instead of mutex for ordering
// i.e. we should only allow a single pad push, but still allow other rtcp tasks to
// continue operating
let sem = Arc::new(tokio::sync::Semaphore::new(1));
while let Some(reply) = stream.next().await { while let Some(reply) = stream.next().await {
let send = {
let state = state.lock().unwrap(); let state = state.lock().unwrap();
let Some(session) = state.session_by_id(session_id) else { let Some(session) = state.session_by_id(session_id) else {
continue; continue;
}; };
match reply { match reply {
RtcpSendReply::Data(data) => { RtcpSendReply::Data(data) => {
let Some(rtcp_srcpad) = session.rtcp_send_srcpad.clone() else { session.rtcp_send_srcpad.clone().map(|pad| (pad, data))
continue; }
RtcpSendReply::SsrcBye(ssrc) => {
session
.internal_session
.config
.emit_by_name::<()>("bye-ssrc", &[&ssrc]);
None
}
}
}; };
drop(state);
if let Some((rtcp_srcpad, data)) = send {
let acquired = sem.clone().acquire_owned().await;
RUNTIME.spawn_blocking(move || { RUNTIME.spawn_blocking(move || {
let buffer = gst::Buffer::from_mut_slice(data); let buffer = gst::Buffer::from_mut_slice(data);
if let Err(e) = rtcp_srcpad.push(buffer) { if let Err(e) = rtcp_srcpad.push(buffer) {
gst::warning!(CAT, obj: rtcp_srcpad, "Failed to send rtcp data: flow return {e:?}"); gst::warning!(CAT, obj: rtcp_srcpad, "Failed to send rtcp data: flow return {e:?}");
} }
drop(acquired);
}); });
} }
RtcpSendReply::SsrcBye(ssrc) => session
.internal_session
.config
.emit_by_name::<()>("bye-ssrc", &[&ssrc]),
}
} }
} }