diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index a3f6de5f..866ec0fc 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; +use std::ops::ControlFlow; use std::pin::Pin; use std::sync::{Arc, Mutex, MutexGuard}; use std::task::{Poll, Waker}; @@ -194,8 +195,30 @@ impl RtpRecvSrcPad { struct HeldRecvBuffer { hold_id: Option, buffer: gst::Buffer, - pad: RtpRecvSrcPad, - new_pad: bool, + jb: Arc>, +} + +#[derive(Debug)] +struct HeldRecvBufferList { + list: gst::BufferList, + jb: Arc>, +} + +#[derive(Debug)] +enum HeldRecvItem { + NewPad(RtpRecvSrcPad), + Buffer(HeldRecvBuffer), + BufferList(HeldRecvBufferList), +} + +impl HeldRecvItem { + fn hold_id(&self) -> Option { + match self { + Self::NewPad(_) => None, + Self::Buffer(buf) => buf.hold_id, + Self::BufferList(_list) => None, + } + } } #[derive(Debug)] @@ -209,7 +232,7 @@ struct RecvSession { rtp_recv_sink_segment: Option>, rtp_recv_sink_seqnum: Option, - recv_store: Vec, + recv_store: Vec, rtp_recv_srcpads: Vec, recv_flow_combiner: Arc>, @@ -239,7 +262,7 @@ impl RecvSession { } } - fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { + fn start_rtp_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { gst::debug!(CAT, obj: pad, "Starting rtp recv src task"); let recv_pad = self @@ -301,7 +324,7 @@ impl RecvSession { Ok(()) } - fn stop_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { + fn stop_rtp_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { gst::debug!(CAT, obj: pad, "Stopping rtp recv src task"); let recv_pad = self .rtp_recv_srcpads @@ -318,7 +341,7 @@ impl RecvSession { Ok(()) } - fn get_or_create_rtp_recv_src( + fn get_or_create_rtp_src( &mut self, rtpbin: &RtpRecv, pt: u8, @@ -352,7 +375,7 @@ impl RecvSession { RtpRecv::catch_panic_pad_function( parent, || false, - |this| this.rtp_recv_src_event(pad, event, id, pt, ssrc), + |this| this.rtp_src_event(pad, event, id, pt, ssrc), ) }) .activatemode_function({ @@ -364,7 +387,7 @@ impl RecvSession { glib::bool_error!("rtprecv does not exist anymore"), )); }; - this.rtp_recv_src_activatemode(pad, mode, active, id) + this.rtp_src_activatemode(pad, mode, active, id) } }) .name(format!("rtp_src_{}_{}_{}", id, pt, ssrc)) @@ -401,7 +424,13 @@ struct State { sessions: Vec, max_session_id: usize, pads_session_id_map: HashMap, - sync_context: Option, +} + +enum RecvRtpBuffer { + IsRtcp(gst::Buffer), + SsrcCollision(u32), + Forward((gst::Buffer, Arc>)), + Drop, } impl State { @@ -441,10 +470,11 @@ impl State { pub struct RtpRecv { settings: Mutex, state: Arc>, + sync_context: Arc>>, } impl RtpRecv { - fn rtp_recv_src_activatemode( + fn rtp_src_activatemode( &self, pad: &gst::Pad, mode: gst::PadMode, @@ -465,9 +495,9 @@ impl RtpRecv { }; if active { - session.start_rtp_recv_task(pad)?; + session.start_rtp_task(pad)?; } else { - session.stop_rtp_recv_task(pad)?; + session.stop_rtp_task(pad)?; gst::debug!(CAT, obj: pad, "Stopping task"); @@ -535,17 +565,15 @@ impl RtpRecv { gst::Iterator::from_vec(vec![]) } - fn rtp_recv_sink_chain( + fn handle_buffer_locked( &self, pad: &gst::Pad, - id: usize, + session: &mut RecvSession, mut buffer: gst::Buffer, - ) -> Result { - let mut state = self.state.lock().unwrap(); - let Some(session) = state.mut_session_by_id(id) else { - return Err(gst::FlowError::Error); - }; - + now: Instant, + items_to_pre_push: &mut smallvec::SmallVec<[HeldRecvItem; P]>, + held_buffers: &mut smallvec::SmallVec<[HeldRecvBuffer; H]>, + ) -> Result { // TODO: this is different from the old C implementation, where we // simply used the RTP timestamps as they were instead of doing any // sort of skew calculations. @@ -573,8 +601,6 @@ impl RtpRecv { }, }; - gst::trace!(CAT, obj: pad, "using arrival time {}", arrival_time); - let addr: Option = buffer .meta::() @@ -589,6 +615,7 @@ impl RtpRecv { gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}"); gst::FlowError::Error })?; + let rtp = match rtp_types::RtpPacket::parse(&mapped) { Ok(rtp) => rtp, Err(e) => { @@ -598,59 +625,47 @@ impl RtpRecv { .is_ok_and(|mut rtcp| rtcp.next().is_some_and(|rtcp| rtcp.is_ok())) { drop(mapped); - return Self::rtcp_recv_sink_chain(self, id, buffer); + return Ok(RecvRtpBuffer::IsRtcp(buffer)); } gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}"); - return Ok(gst::FlowSuccess::Ok); + return Ok(RecvRtpBuffer::Drop); } }; - let internal_session = session.internal_session.clone(); + gst::trace!(CAT, obj: pad, "using arrival time {}", arrival_time); + let internal_session = session.internal_session.clone(); let mut session_inner = internal_session.inner.lock().unwrap(); - if state - .sync_context - .as_ref() - .unwrap() - .clock_rate(rtp.ssrc()) - .is_none() - { - let clock_rate = session_inner - .session - .clock_rate_from_pt(rtp.payload_type()) - .unwrap(); - state - .sync_context - .as_mut() - .unwrap() - .set_clock_rate(rtp.ssrc(), clock_rate); - } + let pts = { + let mut sync_context = self.sync_context.lock().unwrap(); + let sync_context = sync_context.as_mut().unwrap(); + if !sync_context.has_clock_rate(rtp.ssrc()) { + let clock_rate = session_inner + .session + .clock_rate_from_pt(rtp.payload_type()) + .unwrap(); + sync_context.set_clock_rate(rtp.ssrc(), clock_rate); + } + + // TODO: Put NTP time as `gst::ReferenceTimeStampMeta` on the buffers if selected via property + let (pts, _ntp_time) = + sync_context.calculate_pts(rtp.ssrc(), rtp.timestamp(), arrival_time.nseconds()); + pts + }; - // TODO: Put NTP time as `gst::ReferenceTimeStampMeta` on the buffers if selected via property - let (pts, _ntp_time) = state.sync_context.as_mut().unwrap().calculate_pts( - rtp.ssrc(), - rtp.timestamp(), - arrival_time.nseconds(), - ); - let session = state.mut_session_by_id(id).unwrap(); let segment = session.rtp_recv_sink_segment.as_ref().unwrap(); let pts = segment .position_from_running_time(gst::ClockTime::from_nseconds(pts)) .unwrap(); - gst::debug!(CAT, "Calculated PTS: {}", pts); + gst::debug!(CAT, obj: pad, "Calculated PTS: {}", pts); - let now = Instant::now(); - let mut buffers_to_push = vec![]; - let mut ssrc_collision = vec![]; loop { - match session_inner.session.handle_recv(&rtp, addr, now) { - RecvReply::SsrcCollision(ssrc) => { - if !ssrc_collision.iter().any(|&needle| needle == ssrc) { - ssrc_collision.push(ssrc); - } - } + let recv_ret = session_inner.session.handle_recv(&rtp, addr, now); + gst::trace!(CAT, obj: pad, "session handle_recv ret: {recv_ret:?}"); + match recv_ret { + RecvReply::SsrcCollision(ssrc) => return Ok(RecvRtpBuffer::SsrcCollision(ssrc)), RecvReply::NewSsrc(ssrc, _pt) => { drop(session_inner); internal_session @@ -666,36 +681,45 @@ impl RtpRecv { let buf_mut = buffer.make_mut(); buf_mut.set_pts(pts); } - let (pad, new_pad) = session.get_or_create_rtp_recv_src(self, pt, ssrc); - session.recv_store.push(HeldRecvBuffer { + let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); + let jb = pad.jitter_buffer_store.clone(); + if new_pad { + items_to_pre_push.push(HeldRecvItem::NewPad(pad)); + } + held_buffers.push(HeldRecvBuffer { hold_id: Some(hold_id), buffer, - pad, - new_pad, + jb, }); break; } RecvReply::Drop(hold_id) => { - if let Some(pos) = session + if let Some(pos) = held_buffers.iter().position(|b| b.hold_id == Some(hold_id)) + { + held_buffers.remove(pos); + } else if let Some(pos) = session .recv_store .iter() - .position(|b| b.hold_id.unwrap() == hold_id) + .position(|b| b.hold_id() == Some(hold_id)) { session.recv_store.remove(pos); } } RecvReply::Forward(hold_id) => { - if let Some(pos) = session + if let Some(pos) = held_buffers.iter().position(|b| b.hold_id == Some(hold_id)) + { + items_to_pre_push.push(HeldRecvItem::Buffer(held_buffers.remove(pos))); + } else if let Some(pos) = session .recv_store .iter() - .position(|b| b.hold_id.unwrap() == hold_id) + .position(|b| b.hold_id() == Some(hold_id)) { - buffers_to_push.push(session.recv_store.remove(pos)); + items_to_pre_push.push(session.recv_store.remove(pos)); } else { unreachable!(); } } - RecvReply::Ignore => break, + RecvReply::Ignore => return Ok(RecvRtpBuffer::Drop), RecvReply::Passthrough => { let pt = rtp.payload_type(); let ssrc = rtp.ssrc(); @@ -704,20 +728,26 @@ impl RtpRecv { let buf_mut = buffer.make_mut(); buf_mut.set_pts(pts); } - let (pad, new_pad) = session.get_or_create_rtp_recv_src(self, pt, ssrc); - buffers_to_push.push(HeldRecvBuffer { - hold_id: None, - buffer, - pad, - new_pad, - }); - break; + let (pad, new_pad) = session.get_or_create_rtp_src(self, pt, ssrc); + let jb = pad.jitter_buffer_store.clone(); + if new_pad { + items_to_pre_push.push(HeldRecvItem::NewPad(pad)); + } + return Ok(RecvRtpBuffer::Forward((buffer, jb))); } } } - let send_rtp_sink = session_inner.rtp_send_sinkpad.clone(); + Ok(RecvRtpBuffer::Drop) + } + fn handle_ssrc_collision( + &self, + session: &mut RecvSession, + ssrc_collision: impl IntoIterator, + ) -> Result { + let session_inner = session.internal_session.inner.lock().unwrap(); + let send_rtp_sink = session_inner.rtp_send_sinkpad.clone(); drop(session_inner); if let Some(pad) = send_rtp_sink { @@ -735,63 +765,290 @@ impl RtpRecv { } } + Ok(gst::FlowSuccess::Ok) + } + + fn handle_push_jitterbuffer<'a>( + &'a self, + mut state: MutexGuard<'a, State>, + id: usize, + buffers_to_push: impl IntoIterator, + now: Instant, + ) -> Result, gst::FlowError> { for mut held in buffers_to_push { - // TODO: handle other processing - if held.new_pad { - state.pads_session_id_map.insert(held.pad.pad.clone(), id); - // drops the state lock - held.pad.activate(state, id); - self.obj().add_pad(&held.pad.pad).unwrap(); - state = self.state.lock().unwrap(); - } - - let mapped = held.buffer.map_readable().map_err(|e| { - gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}"); - gst::FlowError::Error - })?; - let rtp = match rtp_types::RtpPacket::parse(&mapped) { - Ok(rtp) => rtp, - Err(e) => { - gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}"); - return Ok(gst::FlowSuccess::Ok); + match held { + HeldRecvItem::NewPad(ref mut pad) => { + // TODO: handle other processing + state.pads_session_id_map.insert(pad.pad.clone(), id); + // drops the state lock + pad.activate(state, id); + self.obj().add_pad(&pad.pad).unwrap(); + state = self.state.lock().unwrap(); } - }; + HeldRecvItem::Buffer(buffer) => { + let mapped = buffer.buffer.map_readable().map_err(|e| { + gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}"); + gst::FlowError::Error + })?; + let rtp = match rtp_types::RtpPacket::parse(&mapped) { + Ok(rtp) => rtp, + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}"); + return Ok(state); + } + }; - // FIXME: Should block if too many packets are stored here because the source pad task - // is blocked - let mut jitterbuffer_store = held.pad.jitter_buffer_store.lock().unwrap(); + // FIXME: Should block if too many packets are stored here because the source pad task + // is blocked + let mut jitterbuffer_store = buffer.jb.lock().unwrap(); - match jitterbuffer_store.jitterbuffer.queue_packet( - &rtp, - held.buffer.pts().unwrap().nseconds(), - now, - ) { - jitterbuffer::QueueResult::Flushing => { - // TODO: return flushing result upstream - } - jitterbuffer::QueueResult::Queued(id) => { - drop(mapped); + let ret = jitterbuffer_store.jitterbuffer.queue_packet( + &rtp, + buffer.buffer.pts().unwrap().nseconds(), + now, + ); + gst::trace!(CAT, "jb queue buffer: {ret:?}"); + match ret { + jitterbuffer::QueueResult::Flushing => { + // TODO: return flushing result upstream + } + jitterbuffer::QueueResult::Queued(id) => { + drop(mapped); - jitterbuffer_store - .store - .insert(id, JitterBufferItem::Packet(held.buffer)); - if let Some(waker) = jitterbuffer_store.waker.take() { - waker.wake() + jitterbuffer_store + .store + .insert(id, JitterBufferItem::Packet(buffer.buffer)); + if let Some(waker) = jitterbuffer_store.waker.take() { + waker.wake() + } + } + jitterbuffer::QueueResult::Late => { + gst::warning!(CAT, "Late buffer was dropped"); + } + jitterbuffer::QueueResult::Duplicate => { + gst::warning!(CAT, "Duplicate buffer was dropped"); + } } } - jitterbuffer::QueueResult::Late => { - gst::warning!(CAT, "Late buffer was dropped"); - } - jitterbuffer::QueueResult::Duplicate => { - gst::warning!(CAT, "Duplicate buffer was dropped"); + HeldRecvItem::BufferList(list) => { + // FIXME: Should block if too many packets are stored here because the source pad task + // is blocked + let mut jitterbuffer_store = list.jb.lock().unwrap(); + + for buffer in list.list.iter_owned() { + let mapped = buffer.map_readable().map_err(|e| { + gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}"); + gst::FlowError::Error + })?; + let rtp = match rtp_types::RtpPacket::parse(&mapped) { + Ok(rtp) => rtp, + Err(e) => { + gst::error!(CAT, imp: self, "Failed to parse input as valid rtp packet: {e:?}"); + return Ok(state); + } + }; + + let ret = jitterbuffer_store.jitterbuffer.queue_packet( + &rtp, + buffer.pts().unwrap().nseconds(), + now, + ); + gst::trace!(CAT, "jb queue buffer in list: {ret:?}"); + match ret { + jitterbuffer::QueueResult::Flushing => { + return Err(gst::FlowError::Flushing); + } + jitterbuffer::QueueResult::Queued(id) => { + drop(mapped); + + jitterbuffer_store + .store + .insert(id, JitterBufferItem::Packet(buffer)); + + if let Some(waker) = jitterbuffer_store.waker.take() { + waker.wake() + } + } + jitterbuffer::QueueResult::Late => { + gst::warning!(CAT, "Late buffer was dropped"); + } + jitterbuffer::QueueResult::Duplicate => { + gst::warning!(CAT, "Duplicate buffer was dropped"); + } + } + } } } } + Ok(state) + } + + fn rtp_sink_chain_list( + &self, + pad: &gst::Pad, + id: usize, + mut list: gst::BufferList, + ) -> Result { + let mut state = self.state.lock().unwrap(); + let Some(session) = state.mut_session_by_id(id) else { + return Err(gst::FlowError::Error); + }; + + let now = Instant::now(); + let mut ssrc_collision: smallvec::SmallVec<[u32; 4]> = Default::default(); + let mut items_to_pre_push: smallvec::SmallVec<[HeldRecvItem; 4]> = + smallvec::SmallVec::with_capacity(list.len() + 2); + let mut held_buffers: smallvec::SmallVec<[HeldRecvBuffer; 4]> = Default::default(); + let mut split_bufferlist = false; + let mut previous_jb = None; + let list_mut = list.make_mut(); + let mut ret = Ok(()); + list_mut.foreach_mut(|buffer, _i| { + match self.handle_buffer_locked( + pad, + session, + buffer, + now, + &mut items_to_pre_push, + &mut held_buffers, + ) { + Ok(RecvRtpBuffer::SsrcCollision(ssrc)) => { + ssrc_collision.push(ssrc); + ControlFlow::Continue(None) + } + Ok(RecvRtpBuffer::IsRtcp(buffer)) => { + match Self::rtcp_sink_chain(self, id, buffer) { + Ok(_buf) => ControlFlow::Continue(None), + Err(e) => { + ret = Err(e); + ControlFlow::Break(None) + } + } + } + Ok(RecvRtpBuffer::Drop) => ControlFlow::Continue(None), + Ok(RecvRtpBuffer::Forward((buffer, jb))) => { + // if all the buffers do not end up in the same jitterbuffer, then we need to + // split + if !split_bufferlist + && previous_jb + .as_ref() + .map_or(false, |previous| !Arc::ptr_eq(previous, &jb)) + { + split_bufferlist = true; + } + previous_jb = Some(jb); + ControlFlow::Continue(Some(buffer)) + } + Err(e) => { + ret = Err(e); + ControlFlow::Break(None) + } + } + }); + ret?; + session + .recv_store + .extend(held_buffers.into_iter().map(HeldRecvItem::Buffer)); + + self.handle_ssrc_collision(session, ssrc_collision)?; + state = self.handle_push_jitterbuffer(state, id, items_to_pre_push, now)?; + if split_bufferlist { + // this abomination is to work around passing state through handle_push_jitterbuffer + // inside a closure + let mut maybe_state = Some(state); + list_mut.foreach_mut({ + let maybe_state = &mut maybe_state; + |buffer, _i| match self.handle_push_jitterbuffer( + maybe_state.take().unwrap(), + id, + [HeldRecvItem::Buffer(HeldRecvBuffer { + hold_id: None, + buffer, + jb: previous_jb.clone().unwrap(), + })], + now, + ) { + Ok(state) => { + *maybe_state = Some(state); + ControlFlow::Continue(None) + } + Err(e) => { + ret = Err(e); + ControlFlow::Break(None) + } + } + }); + state = maybe_state.unwrap(); + ret?; + } else { + state = self.handle_push_jitterbuffer( + state, + id, + [HeldRecvItem::BufferList(HeldRecvBufferList { + list, + jb: previous_jb.unwrap(), + })], + now, + )?; + } + drop(state); + Ok(gst::FlowSuccess::Ok) } - fn rtcp_recv_sink_chain( + fn rtp_sink_chain( + &self, + pad: &gst::Pad, + id: usize, + buffer: gst::Buffer, + ) -> Result { + let mut state = self.state.lock().unwrap(); + let Some(session) = state.mut_session_by_id(id) else { + return Err(gst::FlowError::Error); + }; + + let now = Instant::now(); + let mut items_to_pre_push: smallvec::SmallVec<[HeldRecvItem; 4]> = Default::default(); + let mut held_buffers: smallvec::SmallVec<[HeldRecvBuffer; 4]> = Default::default(); + let forward = match self.handle_buffer_locked( + pad, + session, + buffer, + now, + &mut items_to_pre_push, + &mut held_buffers, + )? { + RecvRtpBuffer::SsrcCollision(ssrc) => { + return self.handle_ssrc_collision(session, [ssrc]) + } + RecvRtpBuffer::IsRtcp(buffer) => return Self::rtcp_sink_chain(self, id, buffer), + RecvRtpBuffer::Drop => None, + RecvRtpBuffer::Forward((buffer, jb)) => Some((buffer, jb)), + }; + session + .recv_store + .extend(held_buffers.into_iter().map(HeldRecvItem::Buffer)); + + state = self.handle_push_jitterbuffer(state, id, items_to_pre_push, now)?; + if let Some((buffer, jb)) = forward { + state = self.handle_push_jitterbuffer( + state, + id, + [HeldRecvItem::Buffer(HeldRecvBuffer { + hold_id: None, + buffer, + jb, + })], + now, + )?; + } + drop(state); + + Ok(gst::FlowSuccess::Ok) + } + + fn rtcp_sink_chain( &self, id: usize, buffer: gst::Buffer, @@ -881,15 +1138,14 @@ impl RtpRecv { } } RtcpRecvReply::NewCName((cname, ssrc)) => { - let mut state = self.state.lock().unwrap(); + let mut sync_context = self.sync_context.lock().unwrap(); - state.sync_context.as_mut().unwrap().associate(ssrc, &cname); + sync_context.as_mut().unwrap().associate(ssrc, &cname); } RtcpRecvReply::NewRtpNtp((ssrc, rtp, ntp)) => { - let mut state = self.state.lock().unwrap(); + let mut sync_context = self.sync_context.lock().unwrap(); - state - .sync_context + sync_context .as_mut() .unwrap() .add_sender_report(ssrc, rtp, ntp); @@ -904,12 +1160,7 @@ impl RtpRecv { Ok(gst::FlowSuccess::Ok) } - pub fn rtp_recv_sink_query( - &self, - pad: &gst::Pad, - query: &mut gst::QueryRef, - id: usize, - ) -> bool { + pub fn rtp_sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef, id: usize) -> bool { gst::log!(CAT, obj: pad, "Handling query {query:?}"); if query.is_serialized() { @@ -985,7 +1236,7 @@ impl RtpRecv { // through the relevant jitterbuffers in order to remain (reasonably) // consistently ordered with the RTP packets once output on our source // pads - fn rtp_recv_sink_queue_serialized_event(&self, id: usize, event: gst::Event) -> bool { + fn rtp_sink_queue_serialized_event(&self, id: usize, event: gst::Event) -> bool { let state = self.state.lock().unwrap(); if let Some(session) = state.session_by_id(id) { for srcpad in session @@ -1013,7 +1264,7 @@ impl RtpRecv { true } - fn rtp_recv_sink_event(&self, pad: &gst::Pad, mut event: gst::Event, id: usize) -> bool { + fn rtp_sink_event(&self, pad: &gst::Pad, mut event: gst::Event, id: usize) -> bool { match event.view() { gst::EventView::StreamStart(stream_start) => { let mut state = self.state.lock().unwrap(); @@ -1070,7 +1321,7 @@ impl RtpRecv { drop(state); - self.rtp_recv_sink_queue_serialized_event(id, event) + self.rtp_sink_queue_serialized_event(id, event) } gst::EventView::Eos(_eos) => { let now = Instant::now(); @@ -1105,7 +1356,7 @@ impl RtpRecv { } drop(state); // FIXME: may need to delay sending eos under some circumstances - self.rtp_recv_sink_queue_serialized_event(id, event); + self.rtp_sink_queue_serialized_event(id, event); true } gst::EventView::FlushStart(_fs) => { @@ -1137,15 +1388,15 @@ impl RtpRecv { .collect::>(); for pad in pads { // Will reset flushing to false and ensure task is woken up - let _ = session.start_rtp_recv_task(&pad); + let _ = session.start_rtp_task(&pad); } } drop(state); - self.rtp_recv_sink_queue_serialized_event(id, event) + self.rtp_sink_queue_serialized_event(id, event) } _ => { if event.is_serialized() { - self.rtp_recv_sink_queue_serialized_event(id, event) + self.rtp_sink_queue_serialized_event(id, event) } else { gst::Pad::event_default(pad, Some(&*self.obj()), event) } @@ -1153,7 +1404,7 @@ impl RtpRecv { } } - fn rtp_recv_src_event( + fn rtp_src_event( &self, pad: &gst::Pad, event: gst::Event, @@ -1220,6 +1471,7 @@ impl ObjectSubclass for RtpRecv { Self { settings: Default::default(), state: Default::default(), + sync_context: Default::default(), } } } @@ -1417,11 +1669,18 @@ impl ElementImpl for RtpRecv { Vec, )> { let sinkpad = gst::Pad::builder_from_template(templ) + .chain_list_function(move |pad, parent, buffer_list| { + RtpRecv::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this| this.rtp_sink_chain_list(pad, id, buffer_list), + ) + }) .chain_function(move |pad, parent, buffer| { RtpRecv::catch_panic_pad_function( parent, || Err(gst::FlowError::Error), - |this| this.rtp_recv_sink_chain(pad, id, buffer), + |this| this.rtp_sink_chain(pad, id, buffer), ) }) .iterate_internal_links_function(|pad, parent| { @@ -1435,14 +1694,14 @@ impl ElementImpl for RtpRecv { RtpRecv::catch_panic_pad_function( parent, || false, - |this| this.rtp_recv_sink_event(pad, event, id), + |this| this.rtp_sink_event(pad, event, id), ) }) .query_function(move |pad, parent, query| { RtpRecv::catch_panic_pad_function( parent, || false, - |this| this.rtp_recv_sink_query(pad, query, id), + |this| this.rtp_sink_query(pad, query, id), ) }) .name(format!("rtp_sink_{}", id)) @@ -1480,7 +1739,7 @@ impl ElementImpl for RtpRecv { RtpRecv::catch_panic_pad_function( parent, || Err(gst::FlowError::Error), - |this| this.rtcp_recv_sink_chain(id, buffer), + |this| this.rtcp_sink_chain(id, buffer), ) }) .iterate_internal_links_function(|pad, parent| { @@ -1619,9 +1878,9 @@ impl ElementImpl for RtpRecv { } gst::StateChange::ReadyToPaused => { let settings = self.settings.lock().unwrap(); - let mut state = self.state.lock().unwrap(); + let mut sync_context = self.sync_context.lock().unwrap(); - state.sync_context = Some(sync::Context::new(settings.timestamping_mode)); + *sync_context = Some(sync::Context::new(settings.timestamping_mode)); } _ => (), } @@ -1647,7 +1906,9 @@ impl ElementImpl for RtpRecv { session.rtp_recv_sink_seqnum = None; session.rtp_recv_sink_group_id = None; } - state.sync_context = None; + let mut sync_context = self.sync_context.lock().unwrap(); + *sync_context = None; + drop(sync_context); drop(state); for pad in removed_pads.iter() { diff --git a/net/rtp/src/rtpbin2/sync.rs b/net/rtp/src/rtpbin2/sync.rs index 38424d6f..6cc68fc7 100644 --- a/net/rtp/src/rtpbin2/sync.rs +++ b/net/rtp/src/rtpbin2/sync.rs @@ -118,8 +118,8 @@ impl Context { } } - pub fn clock_rate(&self, ssrc_val: u32) -> Option { - self.ssrcs.get(&ssrc_val).and_then(|ssrc| ssrc.clock_rate) + pub fn has_clock_rate(&self, ssrc_val: u32) -> bool { + self.ssrcs.contains_key(&ssrc_val) } fn disassociate(&mut self, ssrc_val: u32, cname: &str) { diff --git a/net/rtp/tests/rtpbin2.rs b/net/rtp/tests/rtpbin2.rs index 7abddcec..bf4cc098 100644 --- a/net/rtp/tests/rtpbin2.rs +++ b/net/rtp/tests/rtpbin2.rs @@ -60,19 +60,6 @@ impl PacketInfo { } } -static PACKETS_TEST_1: [PacketInfo; 2] = [ - PacketInfo { - seq_no: 500, - rtp_ts: 20, - payload_len: 7, - }, - PacketInfo { - seq_no: 501, - rtp_ts: 30, - payload_len: 23, - }, -]; - fn send_init() -> gst_check::Harness { init(); @@ -283,9 +270,7 @@ fn test_send_list_benchmark() { ); } -#[test] -fn test_receive() { - init(); +fn receive_init() -> Arc> { let id = next_element_counter(); let elem = gst::ElementFactory::make("rtprecv") @@ -311,7 +296,6 @@ fn test_receive() { .add_element_src_pad(pad) }); inner.play(); - let caps = Caps::builder("application/x-rtp") .field("media", "audio") .field("payload", TEST_PT as i32) @@ -320,6 +304,16 @@ fn test_receive() { .build(); inner.set_src_caps(caps); + drop(inner); + + h +} + +fn receive_push(h: Arc>, packets: I, buffer_list: bool) +where + I: IntoIterator, +{ + let inner = h.lock().unwrap(); // Cannot push with harness lock as the 'pad-added' handler needs to add the newly created pad to // the harness and needs to also take the harness lock. Workaround by pushing from the // internal harness pad directly. @@ -331,21 +325,48 @@ fn test_receive() { .peer() .unwrap(); drop(inner); - push_pad.push(generate_rtp_buffer(500, 20, 9)).unwrap(); - push_pad.push(generate_rtp_buffer(501, 30, 11)).unwrap(); + + if buffer_list { + let mut list = gst::BufferList::new(); + let list_mut = list.make_mut(); + for packet in packets { + list_mut.add(packet.generate_buffer()); + } + push_pad.push_list(list).unwrap(); + } else { + for packet in packets { + push_pad.push(packet.generate_buffer()).unwrap(); + } + } +} + +fn receive_pull(h: Arc>, packets: I) +where + I: IntoIterator, +{ let mut inner = h.lock().unwrap(); + for packet in packets { + let buffer = inner.pull().unwrap(); + let mapped = buffer.map_readable().unwrap(); + let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); + assert_eq!(rtp.sequence_number(), packet.seq_no); + } +} - let buffer = inner.pull().unwrap(); - let mapped = buffer.map_readable().unwrap(); - let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); - assert_eq!(rtp.sequence_number(), 500); - - let buffer = inner.pull().unwrap(); - let mapped = buffer.map_readable().unwrap(); - let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); - assert_eq!(rtp.sequence_number(), 501); +fn receive_check_stats(h: Arc>, packets: I) +where + I: IntoIterator, +{ + let mut n_packets = 0; + let mut n_bytes = 0; + for packet in packets { + n_packets += 1; + n_bytes += packet.payload_len; + } + let inner = h.lock().unwrap(); let stats = inner.element().unwrap().property::("stats"); + drop(inner); let session_stats = stats.get::("0").unwrap(); let source_stats = session_stats @@ -367,12 +388,21 @@ fn test_receive() { ); assert!(source_stats.get::("sender").unwrap()); assert!(!source_stats.get::("local").unwrap()); - assert_eq!(source_stats.get::("packets-received").unwrap(), 2); - assert_eq!(source_stats.get::("octets-received").unwrap(), 20); + assert_eq!( + source_stats.get::("packets-received").unwrap(), + n_packets + ); + assert_eq!( + source_stats.get::("octets-received").unwrap(), + n_bytes as u64 + ); assert_eq!(jitterbuffer_stats.get::("num-late").unwrap(), 0); assert_eq!(jitterbuffer_stats.get::("num-lost").unwrap(), 0); assert_eq!(jitterbuffer_stats.get::("num-duplicates").unwrap(), 0); - assert_eq!(jitterbuffer_stats.get::("num-pushed").unwrap(), 2); + assert_eq!( + jitterbuffer_stats.get::("num-pushed").unwrap(), + n_packets + ); assert_eq!(jitterbuffer_stats.get::("pt").unwrap(), TEST_PT as i32); assert_eq!( jitterbuffer_stats.get::("ssrc").unwrap(), @@ -380,56 +410,46 @@ fn test_receive() { ); } +static PACKETS_TEST_1: [PacketInfo; 2] = [ + PacketInfo { + seq_no: 500, + rtp_ts: 20, + payload_len: 13, + }, + PacketInfo { + seq_no: 501, + rtp_ts: 30, + payload_len: 7, + }, +]; + +#[test] +fn test_receive() { + init(); + + let h = receive_init(); + receive_push(h.clone(), PACKETS_TEST_1, false); + receive_pull(h.clone(), PACKETS_TEST_1); + receive_check_stats(h, PACKETS_TEST_1); +} + +#[test] +fn test_receive_list() { + init(); + + let h = receive_init(); + receive_push(h.clone(), PACKETS_TEST_1, true); + receive_pull(h.clone(), PACKETS_TEST_1); + receive_check_stats(h, PACKETS_TEST_1); +} + #[test] fn test_receive_flush() { init(); - let id = next_element_counter(); + let h = receive_init(); - let elem = gst::ElementFactory::make("rtprecv") - .property("rtp-id", id.to_string()) - .build() - .unwrap(); - let h = Arc::new(Mutex::new(Harness::with_element( - &elem, - Some("rtp_sink_0"), - None, - ))); - let weak_h = Arc::downgrade(&h); - let mut inner = h.lock().unwrap(); - inner - .element() - .unwrap() - .connect_pad_added(move |_elem, pad| { - weak_h - .upgrade() - .unwrap() - .lock() - .unwrap() - .add_element_src_pad(pad) - }); - inner.play(); + receive_push(h.clone(), PACKETS_TEST_1, false); - let caps = Caps::builder("application/x-rtp") - .field("media", "audio") - .field("payload", TEST_PT as i32) - .field("clock-rate", TEST_CLOCK_RATE as i32) - .field("encoding-name", "custom-test") - .build(); - inner.set_src_caps(caps); - - // Cannot push with harness lock as the 'pad-added' handler needs to add the newly created pad to - // the harness and needs to also take the harness lock. Workaround by pushing from the - // internal harness pad directly. - let push_pad = inner - .element() - .unwrap() - .static_pad("rtp_sink_0") - .unwrap() - .peer() - .unwrap(); - drop(inner); - push_pad.push(generate_rtp_buffer(500, 20, 9)).unwrap(); - push_pad.push(generate_rtp_buffer(501, 30, 11)).unwrap(); let mut inner = h.lock().unwrap(); let seqnum = gst::Seqnum::next(); inner.push_event(gst::event::FlushStart::builder().seqnum(seqnum).build()); @@ -446,3 +466,128 @@ fn test_receive_flush() { }; assert_eq!(fs.seqnum(), seqnum); } + +#[test] +fn test_receive_benchmark() { + init(); + let clock = gst::SystemClock::obtain(); + const N_PACKETS: usize = 1024 * 1024; + let mut packets = Vec::with_capacity(N_PACKETS); + for i in 0..N_PACKETS { + packets.push( + PacketInfo { + seq_no: (i % u16::MAX as usize) as u16, + rtp_ts: i as u32, + payload_len: 8, + } + .generate_buffer(), + ) + } + + let h = receive_init(); + let inner = h.lock().unwrap(); + let push_pad = inner + .element() + .unwrap() + .static_pad("rtp_sink_0") + .unwrap() + .peer() + .unwrap(); + drop(inner); + + let start = clock.time(); + + for packet in packets { + push_pad.push(packet).unwrap(); + } + + let pushed = clock.time(); + + let mut inner = h.lock().unwrap(); + for i in 0..N_PACKETS { + let buffer = inner.pull().unwrap(); + let mapped = buffer.map_readable().unwrap(); + let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); + assert_eq!(rtp.sequence_number(), (i % u16::MAX as usize) as u16); + } + + let end = clock.time(); + drop(inner); + + let test_time = end.opt_sub(start); + let pull_time = end.opt_sub(pushed); + let push_time = pushed.opt_sub(start); + println!( + "test took {} (push {}, pull {})", + test_time.display(), + push_time.display(), + pull_time.display() + ); +} + +#[test] +fn test_receive_list_benchmark() { + init(); + let clock = gst::SystemClock::obtain(); + const N_PACKETS: usize = 32 * 1024; + const N_PUSHES: usize = 1024 / 32; + let h = receive_init(); + let inner = h.lock().unwrap(); + + let push_pad = inner + .element() + .unwrap() + .static_pad("rtp_sink_0") + .unwrap() + .peer() + .unwrap(); + drop(inner); + + let mut lists = Vec::with_capacity(N_PUSHES); + for p in 0..N_PUSHES { + let mut list = gst::BufferList::new(); + let list_mut = list.make_mut(); + for i in 0..N_PACKETS { + list_mut.add( + PacketInfo { + seq_no: ((p * N_PACKETS + i) % u16::MAX as usize) as u16, + rtp_ts: (p * N_PACKETS + i) as u32, + payload_len: 8, + } + .generate_buffer(), + ); + } + lists.push(list); + } + + let start = clock.time(); + + for list in lists { + push_pad.push_list(list).unwrap(); + } + + let pushed = clock.time(); + + let mut inner = h.lock().unwrap(); + for p in 0..N_PUSHES { + for i in 0..N_PACKETS { + let buffer = inner.pull().unwrap(); + let mapped = buffer.map_readable().unwrap(); + let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); + assert_eq!(rtp.sequence_number(), ((p * N_PACKETS + i) % u16::MAX as usize) as u16); + } + } + + let end = clock.time(); + drop(inner); + + let test_time = end.opt_sub(start); + let pull_time = end.opt_sub(pushed); + let push_time = pushed.opt_sub(start); + println!( + "test took {} (push {}, pull {})", + test_time.display(), + push_time.display(), + pull_time.display() + ); +}