rtprecv: pass RtpRecvSrcPad around instead of only the JitterBufferStore

This allows accessing the src pad too (e.g. to get its name), which can ease
log intepretation when multiple ssrc are involved and more will also ease
subsequent commits.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2346>
This commit is contained in:
François Laignel 2025-07-09 19:07:51 +02:00 committed by GStreamer Marge Bot
parent 618a348b28
commit c575c93d89

View file

@ -27,7 +27,7 @@
*/
use std::collections::{BTreeMap, HashMap};
use std::net::SocketAddr;
use std::ops::ControlFlow;
use std::ops::{ControlFlow, Deref};
use std::pin::Pin;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Poll, Waker};
@ -228,14 +228,36 @@ struct JitterBufferStore {
jitterbuffer: JitterBuffer,
}
#[derive(Debug, Clone)]
struct RtpRecvSrcPad {
#[derive(Debug)]
struct RtpRecvSrcPadInner {
pt: u8,
ssrc: u32,
pad: gst::Pad,
jitter_buffer_store: Arc<Mutex<JitterBufferStore>>,
}
#[derive(Debug, Clone)]
struct RtpRecvSrcPad(Arc<RtpRecvSrcPadInner>);
impl RtpRecvSrcPad {
fn new(pt: u8, ssrc: u32, pad: gst::Pad, jb_store: JitterBufferStore) -> RtpRecvSrcPad {
RtpRecvSrcPad(Arc::new(RtpRecvSrcPadInner {
pt,
ssrc,
pad,
jitter_buffer_store: Arc::new(Mutex::new(jb_store)),
}))
}
}
impl Deref for RtpRecvSrcPad {
type Target = RtpRecvSrcPadInner;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl PartialEq for RtpRecvSrcPad {
fn eq(&self, other: &Self) -> bool {
self.pt == other.pt && self.ssrc == other.ssrc && self.pad == other.pad
@ -275,13 +297,13 @@ impl RtpRecvSrcPad {
struct HeldRecvBuffer {
hold_id: Option<usize>,
buffer: gst::Buffer,
jb: Arc<Mutex<JitterBufferStore>>,
recv_src_pad: RtpRecvSrcPad,
}
#[derive(Debug)]
struct HeldRecvBufferList {
list: gst::BufferList,
jb: Arc<Mutex<JitterBufferStore>>,
recv_src_pad: RtpRecvSrcPad,
}
#[derive(Debug)]
@ -484,16 +506,16 @@ impl RecvSession {
let settings = rtpbin.settings.lock().unwrap();
let recv_pad = RtpRecvSrcPad {
let recv_pad = RtpRecvSrcPad::new(
pt,
ssrc,
pad: srcpad.clone(),
jitter_buffer_store: Arc::new(Mutex::new(JitterBufferStore {
srcpad.clone(),
JitterBufferStore {
waker: None,
store: BTreeMap::new(),
jitterbuffer: JitterBuffer::new(settings.latency.into()),
})),
};
},
);
self.recv_flow_combiner
.lock()
@ -516,7 +538,7 @@ struct State {
enum RecvRtpBuffer {
IsRtcp(gst::Buffer),
SsrcCollision(u32),
Forward((gst::Buffer, Arc<Mutex<JitterBufferStore>>)),
Forward((gst::Buffer, RtpRecvSrcPad)),
Drop,
}
@ -803,15 +825,14 @@ impl RtpRecv {
let buf_mut = buffer.make_mut();
buf_mut.set_pts(pts);
}
let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc);
let jb = pad.jitter_buffer_store.clone();
if new_pad {
items_to_pre_push.push(HeldRecvItem::NewPad(pad));
let (recv_src_pad, is_new_pad) = session.get_or_create_rtp_src(self, pt, ssrc);
if is_new_pad {
items_to_pre_push.push(HeldRecvItem::NewPad(recv_src_pad.clone()));
}
held_buffers.push(HeldRecvBuffer {
hold_id: Some(hold_id),
buffer,
jb,
recv_src_pad,
});
break;
}
@ -850,12 +871,11 @@ impl RtpRecv {
let buf_mut = buffer.make_mut();
buf_mut.set_pts(pts);
}
let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc);
let jb = pad.jitter_buffer_store.clone();
if new_pad {
items_to_pre_push.push(HeldRecvItem::NewPad(pad));
let (recv_src_pad, is_new_pad) = session.get_or_create_rtp_src(self, pt, ssrc);
if is_new_pad {
items_to_pre_push.push(HeldRecvItem::NewPad(recv_src_pad.clone()));
}
return Ok(RecvRtpBuffer::Forward((buffer, jb)));
return Ok(RecvRtpBuffer::Forward((buffer, recv_src_pad)));
}
}
}
@ -926,14 +946,21 @@ impl RtpRecv {
// FIXME: Should block if too many packets are stored here because the source pad task
// is blocked
let mut jitterbuffer_store = buffer.jb.lock().unwrap();
let mut jb_store = buffer.recv_src_pad.jitter_buffer_store.lock().unwrap();
let ret = jitterbuffer_store.jitterbuffer.queue_packet(
let ret = jb_store.jitterbuffer.queue_packet(
&rtp,
buffer.buffer.pts().unwrap().nseconds(),
now,
);
gst::trace!(CAT, "jb queue buffer: {ret:?}");
gst::trace!(
CAT,
"{}: jb queue buffer pts {} rtp ts {} marker {}: {ret:?}",
buffer.recv_src_pad.pad.name(),
buffer.buffer.pts().display(),
rtp.timestamp(),
rtp.marker_bit(),
);
match ret {
jitterbuffer::QueueResult::Flushing => {
// TODO: return flushing result upstream
@ -941,10 +968,10 @@ impl RtpRecv {
jitterbuffer::QueueResult::Queued(id) => {
drop(mapped);
jitterbuffer_store
jb_store
.store
.insert(id, JitterBufferItem::Packet(buffer.buffer));
if let Some(waker) = jitterbuffer_store.waker.take() {
if let Some(waker) = jb_store.waker.take() {
waker.wake()
}
}
@ -959,7 +986,7 @@ impl RtpRecv {
HeldRecvItem::BufferList(list) => {
// FIXME: Should block if too many packets are stored here because the source pad task
// is blocked
let mut jitterbuffer_store = list.jb.lock().unwrap();
let mut jb_store = list.recv_src_pad.jitter_buffer_store.lock().unwrap();
for buffer in list.list.iter_owned() {
let mapped = buffer.map_readable().map_err(|e| {
@ -978,7 +1005,7 @@ impl RtpRecv {
}
};
let ret = jitterbuffer_store.jitterbuffer.queue_packet(
let ret = jb_store.jitterbuffer.queue_packet(
&rtp,
buffer.pts().unwrap().nseconds(),
now,
@ -991,11 +1018,9 @@ impl RtpRecv {
jitterbuffer::QueueResult::Queued(id) => {
drop(mapped);
jitterbuffer_store
.store
.insert(id, JitterBufferItem::Packet(buffer));
jb_store.store.insert(id, JitterBufferItem::Packet(buffer));
if let Some(waker) = jitterbuffer_store.waker.take() {
if let Some(waker) = jb_store.waker.take() {
waker.wake()
}
}
@ -1031,7 +1056,7 @@ impl RtpRecv {
smallvec::SmallVec::with_capacity(list.len() + 2);
let mut held_buffers: smallvec::SmallVec<[HeldRecvBuffer; 4]> = Default::default();
let mut split_bufferlist = false;
let mut previous_jb = None;
let mut previous_recv_src_pad = None;
let list_mut = list.make_mut();
let mut ret = Ok(());
list_mut.foreach_mut(|buffer, _i| {
@ -1057,17 +1082,19 @@ impl RtpRecv {
}
}
Ok(RecvRtpBuffer::Drop) => ControlFlow::Continue(None),
Ok(RecvRtpBuffer::Forward((buffer, jb))) => {
Ok(RecvRtpBuffer::Forward((buffer, recv_src_pad))) => {
// if all the buffers do not end up in the same jitterbuffer, then we need to
// split
if !split_bufferlist
&& previous_jb
&& previous_recv_src_pad
.as_ref()
.is_some_and(|previous| !Arc::ptr_eq(previous, &jb))
.is_some_and(|previous: &RtpRecvSrcPad| {
!Arc::ptr_eq(&previous.0, &recv_src_pad.0)
})
{
split_bufferlist = true;
}
previous_jb = Some(jb);
previous_recv_src_pad = Some(recv_src_pad);
ControlFlow::Continue(Some(buffer))
}
Err(e) => {
@ -1095,7 +1122,7 @@ impl RtpRecv {
[HeldRecvItem::Buffer(HeldRecvBuffer {
hold_id: None,
buffer,
jb: previous_jb.clone().unwrap(),
recv_src_pad: previous_recv_src_pad.clone().unwrap(),
})],
now,
) {
@ -1117,7 +1144,7 @@ impl RtpRecv {
id,
[HeldRecvItem::BufferList(HeldRecvBufferList {
list,
jb: previous_jb.unwrap(),
recv_src_pad: previous_recv_src_pad.unwrap(),
})],
now,
)?;
@ -1161,14 +1188,14 @@ impl RtpRecv {
.extend(held_buffers.into_iter().map(HeldRecvItem::Buffer));
state = self.handle_push_jitterbuffer(state, id, items_to_pre_push, now)?;
if let Some((buffer, jb)) = forward {
if let Some((buffer, recv_src_pad)) = forward {
state = self.handle_push_jitterbuffer(
state,
id,
[HeldRecvItem::Buffer(HeldRecvBuffer {
hold_id: None,
buffer,
jb,
recv_src_pad,
})],
now,
)?;