From c575c93d89b7e40d88305c895a912b52d55afdc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Wed, 9 Jul 2025 19:07:51 +0200 Subject: [PATCH] rtprecv: pass RtpRecvSrcPad around instead of only the JitterBufferStore This allows accessing the src pad too (e.g. to get its name), which can ease log intepretation when multiple ssrc are involved and more will also ease subsequent commits. Part-of: --- net/rtp/src/rtpbin2/rtprecv.rs | 109 ++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 41 deletions(-) diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index bd313c217..031a1c493 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -27,7 +27,7 @@ */ use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; -use std::ops::ControlFlow; +use std::ops::{ControlFlow, Deref}; use std::pin::Pin; use std::sync::{Arc, Mutex, MutexGuard}; use std::task::{Poll, Waker}; @@ -228,14 +228,36 @@ struct JitterBufferStore { jitterbuffer: JitterBuffer, } -#[derive(Debug, Clone)] -struct RtpRecvSrcPad { +#[derive(Debug)] +struct RtpRecvSrcPadInner { pt: u8, ssrc: u32, pad: gst::Pad, jitter_buffer_store: Arc>, } +#[derive(Debug, Clone)] +struct RtpRecvSrcPad(Arc); + +impl RtpRecvSrcPad { + fn new(pt: u8, ssrc: u32, pad: gst::Pad, jb_store: JitterBufferStore) -> RtpRecvSrcPad { + RtpRecvSrcPad(Arc::new(RtpRecvSrcPadInner { + pt, + ssrc, + pad, + jitter_buffer_store: Arc::new(Mutex::new(jb_store)), + })) + } +} + +impl Deref for RtpRecvSrcPad { + type Target = RtpRecvSrcPadInner; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + impl PartialEq for RtpRecvSrcPad { fn eq(&self, other: &Self) -> bool { self.pt == other.pt && self.ssrc == other.ssrc && self.pad == other.pad @@ -275,13 +297,13 @@ impl RtpRecvSrcPad { struct HeldRecvBuffer { hold_id: Option, buffer: gst::Buffer, - jb: Arc>, + recv_src_pad: RtpRecvSrcPad, } #[derive(Debug)] struct HeldRecvBufferList { list: gst::BufferList, - jb: Arc>, + recv_src_pad: RtpRecvSrcPad, } #[derive(Debug)] @@ -484,16 +506,16 @@ impl RecvSession { let settings = rtpbin.settings.lock().unwrap(); - let recv_pad = RtpRecvSrcPad { + let recv_pad = RtpRecvSrcPad::new( pt, ssrc, - pad: srcpad.clone(), - jitter_buffer_store: Arc::new(Mutex::new(JitterBufferStore { + srcpad.clone(), + JitterBufferStore { waker: None, store: BTreeMap::new(), jitterbuffer: JitterBuffer::new(settings.latency.into()), - })), - }; + }, + ); self.recv_flow_combiner .lock() @@ -516,7 +538,7 @@ struct State { enum RecvRtpBuffer { IsRtcp(gst::Buffer), SsrcCollision(u32), - Forward((gst::Buffer, Arc>)), + Forward((gst::Buffer, RtpRecvSrcPad)), Drop, } @@ -803,15 +825,14 @@ impl RtpRecv { let buf_mut = buffer.make_mut(); buf_mut.set_pts(pts); } - let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); - let jb = pad.jitter_buffer_store.clone(); - if new_pad { - items_to_pre_push.push(HeldRecvItem::NewPad(pad)); + let (recv_src_pad, is_new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); + if is_new_pad { + items_to_pre_push.push(HeldRecvItem::NewPad(recv_src_pad.clone())); } held_buffers.push(HeldRecvBuffer { hold_id: Some(hold_id), buffer, - jb, + recv_src_pad, }); break; } @@ -850,12 +871,11 @@ impl RtpRecv { let buf_mut = buffer.make_mut(); buf_mut.set_pts(pts); } - let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); - let jb = pad.jitter_buffer_store.clone(); - if new_pad { - items_to_pre_push.push(HeldRecvItem::NewPad(pad)); + let (recv_src_pad, is_new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); + if is_new_pad { + items_to_pre_push.push(HeldRecvItem::NewPad(recv_src_pad.clone())); } - return Ok(RecvRtpBuffer::Forward((buffer, jb))); + return Ok(RecvRtpBuffer::Forward((buffer, recv_src_pad))); } } } @@ -926,14 +946,21 @@ impl RtpRecv { // FIXME: Should block if too many packets are stored here because the source pad task // is blocked - let mut jitterbuffer_store = buffer.jb.lock().unwrap(); + let mut jb_store = buffer.recv_src_pad.jitter_buffer_store.lock().unwrap(); - let ret = jitterbuffer_store.jitterbuffer.queue_packet( + let ret = jb_store.jitterbuffer.queue_packet( &rtp, buffer.buffer.pts().unwrap().nseconds(), now, ); - gst::trace!(CAT, "jb queue buffer: {ret:?}"); + gst::trace!( + CAT, + "{}: jb queue buffer pts {} rtp ts {} marker {}: {ret:?}", + buffer.recv_src_pad.pad.name(), + buffer.buffer.pts().display(), + rtp.timestamp(), + rtp.marker_bit(), + ); match ret { jitterbuffer::QueueResult::Flushing => { // TODO: return flushing result upstream @@ -941,10 +968,10 @@ impl RtpRecv { jitterbuffer::QueueResult::Queued(id) => { drop(mapped); - jitterbuffer_store + jb_store .store .insert(id, JitterBufferItem::Packet(buffer.buffer)); - if let Some(waker) = jitterbuffer_store.waker.take() { + if let Some(waker) = jb_store.waker.take() { waker.wake() } } @@ -959,7 +986,7 @@ impl RtpRecv { HeldRecvItem::BufferList(list) => { // FIXME: Should block if too many packets are stored here because the source pad task // is blocked - let mut jitterbuffer_store = list.jb.lock().unwrap(); + let mut jb_store = list.recv_src_pad.jitter_buffer_store.lock().unwrap(); for buffer in list.list.iter_owned() { let mapped = buffer.map_readable().map_err(|e| { @@ -978,7 +1005,7 @@ impl RtpRecv { } }; - let ret = jitterbuffer_store.jitterbuffer.queue_packet( + let ret = jb_store.jitterbuffer.queue_packet( &rtp, buffer.pts().unwrap().nseconds(), now, @@ -991,11 +1018,9 @@ impl RtpRecv { jitterbuffer::QueueResult::Queued(id) => { drop(mapped); - jitterbuffer_store - .store - .insert(id, JitterBufferItem::Packet(buffer)); + jb_store.store.insert(id, JitterBufferItem::Packet(buffer)); - if let Some(waker) = jitterbuffer_store.waker.take() { + if let Some(waker) = jb_store.waker.take() { waker.wake() } } @@ -1031,7 +1056,7 @@ impl RtpRecv { smallvec::SmallVec::with_capacity(list.len() + 2); let mut held_buffers: smallvec::SmallVec<[HeldRecvBuffer; 4]> = Default::default(); let mut split_bufferlist = false; - let mut previous_jb = None; + let mut previous_recv_src_pad = None; let list_mut = list.make_mut(); let mut ret = Ok(()); list_mut.foreach_mut(|buffer, _i| { @@ -1057,17 +1082,19 @@ impl RtpRecv { } } Ok(RecvRtpBuffer::Drop) => ControlFlow::Continue(None), - Ok(RecvRtpBuffer::Forward((buffer, jb))) => { + Ok(RecvRtpBuffer::Forward((buffer, recv_src_pad))) => { // if all the buffers do not end up in the same jitterbuffer, then we need to // split if !split_bufferlist - && previous_jb + && previous_recv_src_pad .as_ref() - .is_some_and(|previous| !Arc::ptr_eq(previous, &jb)) + .is_some_and(|previous: &RtpRecvSrcPad| { + !Arc::ptr_eq(&previous.0, &recv_src_pad.0) + }) { split_bufferlist = true; } - previous_jb = Some(jb); + previous_recv_src_pad = Some(recv_src_pad); ControlFlow::Continue(Some(buffer)) } Err(e) => { @@ -1095,7 +1122,7 @@ impl RtpRecv { [HeldRecvItem::Buffer(HeldRecvBuffer { hold_id: None, buffer, - jb: previous_jb.clone().unwrap(), + recv_src_pad: previous_recv_src_pad.clone().unwrap(), })], now, ) { @@ -1117,7 +1144,7 @@ impl RtpRecv { id, [HeldRecvItem::BufferList(HeldRecvBufferList { list, - jb: previous_jb.unwrap(), + recv_src_pad: previous_recv_src_pad.unwrap(), })], now, )?; @@ -1161,14 +1188,14 @@ impl RtpRecv { .extend(held_buffers.into_iter().map(HeldRecvItem::Buffer)); state = self.handle_push_jitterbuffer(state, id, items_to_pre_push, now)?; - if let Some((buffer, jb)) = forward { + if let Some((buffer, recv_src_pad)) = forward { state = self.handle_push_jitterbuffer( state, id, [HeldRecvItem::Buffer(HeldRecvBuffer { hold_id: None, buffer, - jb, + recv_src_pad, })], now, )?;