diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index bd313c217..031a1c493 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -27,7 +27,7 @@ */ use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; -use std::ops::ControlFlow; +use std::ops::{ControlFlow, Deref}; use std::pin::Pin; use std::sync::{Arc, Mutex, MutexGuard}; use std::task::{Poll, Waker}; @@ -228,14 +228,36 @@ struct JitterBufferStore { jitterbuffer: JitterBuffer, } -#[derive(Debug, Clone)] -struct RtpRecvSrcPad { +#[derive(Debug)] +struct RtpRecvSrcPadInner { pt: u8, ssrc: u32, pad: gst::Pad, jitter_buffer_store: Arc>, } +#[derive(Debug, Clone)] +struct RtpRecvSrcPad(Arc); + +impl RtpRecvSrcPad { + fn new(pt: u8, ssrc: u32, pad: gst::Pad, jb_store: JitterBufferStore) -> RtpRecvSrcPad { + RtpRecvSrcPad(Arc::new(RtpRecvSrcPadInner { + pt, + ssrc, + pad, + jitter_buffer_store: Arc::new(Mutex::new(jb_store)), + })) + } +} + +impl Deref for RtpRecvSrcPad { + type Target = RtpRecvSrcPadInner; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + impl PartialEq for RtpRecvSrcPad { fn eq(&self, other: &Self) -> bool { self.pt == other.pt && self.ssrc == other.ssrc && self.pad == other.pad @@ -275,13 +297,13 @@ impl RtpRecvSrcPad { struct HeldRecvBuffer { hold_id: Option, buffer: gst::Buffer, - jb: Arc>, + recv_src_pad: RtpRecvSrcPad, } #[derive(Debug)] struct HeldRecvBufferList { list: gst::BufferList, - jb: Arc>, + recv_src_pad: RtpRecvSrcPad, } #[derive(Debug)] @@ -484,16 +506,16 @@ impl RecvSession { let settings = rtpbin.settings.lock().unwrap(); - let recv_pad = RtpRecvSrcPad { + let recv_pad = RtpRecvSrcPad::new( pt, ssrc, - pad: srcpad.clone(), - jitter_buffer_store: Arc::new(Mutex::new(JitterBufferStore { + srcpad.clone(), + JitterBufferStore { waker: None, store: BTreeMap::new(), jitterbuffer: JitterBuffer::new(settings.latency.into()), - })), - }; + }, + ); self.recv_flow_combiner .lock() @@ -516,7 +538,7 @@ struct State { enum RecvRtpBuffer { IsRtcp(gst::Buffer), SsrcCollision(u32), - Forward((gst::Buffer, Arc>)), + Forward((gst::Buffer, RtpRecvSrcPad)), Drop, } @@ -803,15 +825,14 @@ impl RtpRecv { let buf_mut = buffer.make_mut(); buf_mut.set_pts(pts); } - let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); - let jb = pad.jitter_buffer_store.clone(); - if new_pad { - items_to_pre_push.push(HeldRecvItem::NewPad(pad)); + let (recv_src_pad, is_new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); + if is_new_pad { + items_to_pre_push.push(HeldRecvItem::NewPad(recv_src_pad.clone())); } held_buffers.push(HeldRecvBuffer { hold_id: Some(hold_id), buffer, - jb, + recv_src_pad, }); break; } @@ -850,12 +871,11 @@ impl RtpRecv { let buf_mut = buffer.make_mut(); buf_mut.set_pts(pts); } - let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); - let jb = pad.jitter_buffer_store.clone(); - if new_pad { - items_to_pre_push.push(HeldRecvItem::NewPad(pad)); + let (recv_src_pad, is_new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); + if is_new_pad { + items_to_pre_push.push(HeldRecvItem::NewPad(recv_src_pad.clone())); } - return Ok(RecvRtpBuffer::Forward((buffer, jb))); + return Ok(RecvRtpBuffer::Forward((buffer, recv_src_pad))); } } } @@ -926,14 +946,21 @@ impl RtpRecv { // FIXME: Should block if too many packets are stored here because the source pad task // is blocked - let mut jitterbuffer_store = buffer.jb.lock().unwrap(); + let mut jb_store = buffer.recv_src_pad.jitter_buffer_store.lock().unwrap(); - let ret = jitterbuffer_store.jitterbuffer.queue_packet( + let ret = jb_store.jitterbuffer.queue_packet( &rtp, buffer.buffer.pts().unwrap().nseconds(), now, ); - gst::trace!(CAT, "jb queue buffer: {ret:?}"); + gst::trace!( + CAT, + "{}: jb queue buffer pts {} rtp ts {} marker {}: {ret:?}", + buffer.recv_src_pad.pad.name(), + buffer.buffer.pts().display(), + rtp.timestamp(), + rtp.marker_bit(), + ); match ret { jitterbuffer::QueueResult::Flushing => { // TODO: return flushing result upstream @@ -941,10 +968,10 @@ impl RtpRecv { jitterbuffer::QueueResult::Queued(id) => { drop(mapped); - jitterbuffer_store + jb_store .store .insert(id, JitterBufferItem::Packet(buffer.buffer)); - if let Some(waker) = jitterbuffer_store.waker.take() { + if let Some(waker) = jb_store.waker.take() { waker.wake() } } @@ -959,7 +986,7 @@ impl RtpRecv { HeldRecvItem::BufferList(list) => { // FIXME: Should block if too many packets are stored here because the source pad task // is blocked - let mut jitterbuffer_store = list.jb.lock().unwrap(); + let mut jb_store = list.recv_src_pad.jitter_buffer_store.lock().unwrap(); for buffer in list.list.iter_owned() { let mapped = buffer.map_readable().map_err(|e| { @@ -978,7 +1005,7 @@ impl RtpRecv { } }; - let ret = jitterbuffer_store.jitterbuffer.queue_packet( + let ret = jb_store.jitterbuffer.queue_packet( &rtp, buffer.pts().unwrap().nseconds(), now, @@ -991,11 +1018,9 @@ impl RtpRecv { jitterbuffer::QueueResult::Queued(id) => { drop(mapped); - jitterbuffer_store - .store - .insert(id, JitterBufferItem::Packet(buffer)); + jb_store.store.insert(id, JitterBufferItem::Packet(buffer)); - if let Some(waker) = jitterbuffer_store.waker.take() { + if let Some(waker) = jb_store.waker.take() { waker.wake() } } @@ -1031,7 +1056,7 @@ impl RtpRecv { smallvec::SmallVec::with_capacity(list.len() + 2); let mut held_buffers: smallvec::SmallVec<[HeldRecvBuffer; 4]> = Default::default(); let mut split_bufferlist = false; - let mut previous_jb = None; + let mut previous_recv_src_pad = None; let list_mut = list.make_mut(); let mut ret = Ok(()); list_mut.foreach_mut(|buffer, _i| { @@ -1057,17 +1082,19 @@ impl RtpRecv { } } Ok(RecvRtpBuffer::Drop) => ControlFlow::Continue(None), - Ok(RecvRtpBuffer::Forward((buffer, jb))) => { + Ok(RecvRtpBuffer::Forward((buffer, recv_src_pad))) => { // if all the buffers do not end up in the same jitterbuffer, then we need to // split if !split_bufferlist - && previous_jb + && previous_recv_src_pad .as_ref() - .is_some_and(|previous| !Arc::ptr_eq(previous, &jb)) + .is_some_and(|previous: &RtpRecvSrcPad| { + !Arc::ptr_eq(&previous.0, &recv_src_pad.0) + }) { split_bufferlist = true; } - previous_jb = Some(jb); + previous_recv_src_pad = Some(recv_src_pad); ControlFlow::Continue(Some(buffer)) } Err(e) => { @@ -1095,7 +1122,7 @@ impl RtpRecv { [HeldRecvItem::Buffer(HeldRecvBuffer { hold_id: None, buffer, - jb: previous_jb.clone().unwrap(), + recv_src_pad: previous_recv_src_pad.clone().unwrap(), })], now, ) { @@ -1117,7 +1144,7 @@ impl RtpRecv { id, [HeldRecvItem::BufferList(HeldRecvBufferList { list, - jb: previous_jb.unwrap(), + recv_src_pad: previous_recv_src_pad.unwrap(), })], now, )?; @@ -1161,14 +1188,14 @@ impl RtpRecv { .extend(held_buffers.into_iter().map(HeldRecvItem::Buffer)); state = self.handle_push_jitterbuffer(state, id, items_to_pre_push, now)?; - if let Some((buffer, jb)) = forward { + if let Some((buffer, recv_src_pad)) = forward { state = self.handle_push_jitterbuffer( state, id, [HeldRecvItem::Buffer(HeldRecvBuffer { hold_id: None, buffer, - jb, + recv_src_pad, })], now, )?;