From 1ce1a41ca7d4f2b825bb58c92cb33c42a1eaa580 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 15 Jul 2025 16:15:24 +0200 Subject: [PATCH] rtprecv: optimize src Pad scheduling This commit implements optimizations to reduce the number of threads and context switches necessary to handle items on src Pads. 1. A single task for all src pads in a session Instead of starting one pad task (thread) per src pad in a session, we can spawn a single async task that dequeues all ready items out of the jitter buffers and pushes them to their respective src pads. 2. Handle downstream-travelling items immediately when possible It is possible to immediately handle downstream-travelling items without queuing them in the jitter buffer: * for an RTP packet, if the packet is the next packet that we were expecting and the src Pad's jitter buffer is empty. * for a downstream query or event if the src Pad's jitter buffer is empty. Otherwise, the item needs to be enqueued in the jitter buffer. 3. Making sure the above strategies play well together If a jitter buffer is empty because the src pad task just dequeued items, the sink pad might push incoming items before the src pad task had time to push the dequeued items, which would break ordering. In order to avoid this situation, each src pad uses a semaphore which is held from the moment when the sink pad or the src pad makes decisions regarding the jitter buffer and hold it as long as necessary so as to guarantee ordering. Part-of: --- Cargo.lock | 1 + net/rtp/Cargo.toml | 1 + net/rtp/src/rtpbin2/jitterbuffer.rs | 81 ++- net/rtp/src/rtpbin2/rtprecv.rs | 1020 ++++++++++++++++++++++----- 4 files changed, 892 insertions(+), 211 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8619b154..3e9c0f903 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3282,6 +3282,7 @@ dependencies = [ "thiserror 2.0.12", "time", "tokio", + "tokio-util", ] [[package]] diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index 50dcad98d..8eca6eae4 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -33,6 +33,7 @@ thiserror = "2" time = { version = "0.3", default-features = false, features = ["std"] } # TODO: experiment with other async executors (mio, async-std, etc) tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "time", "sync"] } +tokio-util = "0.7.15" [dev-dependencies] gst-check = { workspace = true, features = ["v1_20"] } diff --git a/net/rtp/src/rtpbin2/jitterbuffer.rs b/net/rtp/src/rtpbin2/jitterbuffer.rs index bdb0a4c40..541376a44 100644 --- a/net/rtp/src/rtpbin2/jitterbuffer.rs +++ b/net/rtp/src/rtpbin2/jitterbuffer.rs @@ -40,6 +40,7 @@ pub struct JitterBuffer { last_input_ts: Option, stats: Stats, flushing: bool, + can_forward_packets_when_empty: bool, } #[derive(Debug, PartialEq, Eq)] @@ -53,6 +54,7 @@ pub enum PollResult { #[derive(Debug, PartialEq, Eq)] pub enum QueueResult { + Forward(usize), Queued(usize), Late, Duplicate, @@ -109,10 +111,17 @@ impl JitterBuffer { num_pushed: 0, }, flushing: true, + can_forward_packets_when_empty: false, } } pub fn queue_serialized_item(&mut self) -> QueueResult { + if self.items.is_empty() { + let id = self.packet_counter; + self.packet_counter += 1; + return QueueResult::Forward(id); + } + let id = self.packet_counter; self.packet_counter += 1; let item = Item { @@ -130,6 +139,7 @@ impl JitterBuffer { trace!("Flush changed from {} to {flushing}", self.flushing); self.flushing = flushing; self.last_output_seqnum = None; + self.can_forward_packets_when_empty = false; } pub fn queue_packet(&mut self, rtp: &RtpPacket, mut pts: u64, now: Instant) -> QueueResult { @@ -182,6 +192,25 @@ impl JitterBuffer { } } + if self.items.is_empty() + // can forward after the first packet's deadline has been reached + && self.can_forward_packets_when_empty + && self + .last_output_seqnum + .is_some_and(|last_output_seqnum| seqnum == last_output_seqnum.wrapping_add(1)) + { + // No packets enqueued & seqnum is in order => can forward it immediately + self.last_output_seqnum = Some(seqnum); + let id = self.packet_counter; + self.packet_counter += 1; + self.stats.num_pushed += 1; + + return QueueResult::Forward(id); + } + + // TODO: if the segnum base is known (i.e. first seqnum in RTSP) + // we could also Forward the initial Packet. + let id = self.packet_counter; self.packet_counter += 1; let item = Item { @@ -258,6 +287,7 @@ impl JitterBuffer { let packet = self.items.pop_first().unwrap(); self.stats.num_pushed += 1; + self.can_forward_packets_when_empty = true; PollResult::Forward { id: packet.id, @@ -584,33 +614,31 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_first_serialized_item) = jb.queue_serialized_item() else { + let QueueResult::Forward(id_first_serialized_item) = jb.queue_serialized_item() else { unreachable!() }; + assert_eq!(id_first_serialized_item, 0); + + // query has been forwarded immediately + assert_eq!(jb.poll(now), PollResult::Empty); + + let QueueResult::Queued(id_first_packet) = jb.queue_packet(&packet, 0, now) else { + unreachable!() + }; + assert_eq!(id_first_packet, id_first_serialized_item + 1); + let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else { + unreachable!() + }; + assert_eq!(id_second_serialized_item, id_first_packet + 1); - // query should be forwarded immediately assert_eq!( jb.poll(now), PollResult::Forward { - id: id_first_serialized_item, - discont: false - } - ); - - let QueueResult::Queued(id_first) = jb.queue_packet(&packet, 0, now) else { - unreachable!() - }; - assert_eq!( - jb.poll(now), - PollResult::Forward { - id: id_first, + id: id_first_packet, discont: true } ); - let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else { - unreachable!() - }; assert_eq!( jb.poll(now), PollResult::Forward { @@ -621,16 +649,11 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_second) = jb.queue_packet(&packet, 0, now) else { + let QueueResult::Forward(id_second_packet) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; - assert_eq!( - jb.poll(now), - PollResult::Forward { - id: id_second, - discont: false - } - ); + assert_eq!(id_second_packet, id_second_serialized_item + 1); + assert_eq!(jb.poll(now), PollResult::Empty); } #[test] @@ -643,7 +666,7 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_first_serialized_item) = jb.queue_serialized_item() else { + let QueueResult::Forward(_id_first_serialized_item) = jb.queue_serialized_item() else { unreachable!() }; @@ -651,13 +674,17 @@ mod tests { unreachable!() }; + let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else { + unreachable!() + }; + // Everything after this should eventually return flushing, poll() will instruct to drop // everything stored and then return flushing indefinitely. jb.set_flushing(true); assert_eq!(jb.queue_packet(&packet, 0, now), QueueResult::Flushing); - assert_eq!(jb.poll(now), PollResult::Drop(id_first_serialized_item)); assert_eq!(jb.poll(now), PollResult::Drop(id_first)); + assert_eq!(jb.poll(now), PollResult::Drop(id_second_serialized_item)); assert_eq!(jb.poll(now), PollResult::Flushing); assert_eq!(jb.poll(now), PollResult::Flushing); diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index 925ea3522..d9dc2b0c0 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -41,13 +41,14 @@ use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use std::ops::{ControlFlow, Deref}; use std::pin::Pin; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, LazyLock, Mutex, MutexGuard}; use std::task::{Poll, Waker}; use std::time::{Duration, Instant, SystemTime}; -use futures::StreamExt; +use futures::channel::mpsc as async_mpsc; +use futures::{stream, StreamExt}; use gst::{glib, prelude::*, subclass::prelude::*}; -use std::sync::LazyLock; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use super::internal::{pt_clock_rate_from_caps, GstRustLogger, SharedRtpState, SharedSession}; use super::jitterbuffer::{self, JitterBuffer}; @@ -62,6 +63,10 @@ use crate::rtpbin2; const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); +/// Initial capacity for `SmallVec`s handling items related to src pads for +/// a given RTP seession. E.g.: `RtpRecvSrcPads`, `JitterBufferStreams`, ... +const SRC_PAD_SMALL_VEC_CAPACITY: usize = 16; + static CAT: LazyLock = LazyLock::new(|| { gst::DebugCategory::new( "rtprecv", @@ -90,41 +95,53 @@ impl Default for Settings { #[derive(Debug)] #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] struct JitterBufferStream { - store: Arc>, + semaphore: tokio_util::sync::PollSemaphore, + recv_src_pad: RtpRecvSrcPad, sleep: Pin>, - pending_item: Option, } impl JitterBufferStream { - fn new(store: Arc>) -> Self { + fn new(recv_src_pad: &RtpRecvSrcPad) -> Self { Self { - store, + semaphore: tokio_util::sync::PollSemaphore::new(recv_src_pad.semaphore.clone()), + recv_src_pad: recv_src_pad.clone(), sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))), - pending_item: None, } } } +/// Up to two pending items returned in one go by `JitterBufferStream` +type JitterBufferPendingItems = smallvec::SmallVec<[JitterBufferItem; 2]>; + impl futures::stream::Stream for JitterBufferStream { - type Item = JitterBufferItem; + type Item = ( + gst::Pad, + Option, + JitterBufferPendingItems, + ); fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + let Poll::Ready(src_pad_permit) = self.semaphore.poll_acquire(cx) else { + return Poll::Pending; + }; + let src_pad_permit = src_pad_permit.unwrap(); + let now = Instant::now(); let mut lowest_wait = None; - if let Some(item) = self.pending_item.take() { - return Poll::Ready(Some(item)); - } + let mut jitterbuffer_store = self.recv_src_pad.jitter_buffer_store.lock().unwrap(); - let mut jitterbuffer_store = self.store.lock().unwrap(); - let mut pending_item = None; - let mut next_pending_item = None; + let mut pending_items = JitterBufferPendingItems::new(); loop { let ret = jitterbuffer_store.jitterbuffer.poll(now); - gst::trace!(CAT, "jitterbuffer poll ret: {ret:?}"); + gst::trace!( + CAT, + obj = self.recv_src_pad.pad, + "JitterBuffer poll ret: {ret:?}", + ); match ret { jitterbuffer::PollResult::Flushing => { return Poll::Ready(None); @@ -142,25 +159,34 @@ impl futures::stream::Stream for JitterBufferStream { .unwrap_or_else(|| panic!("Buffer with id {id} not in store!")); if let JitterBufferItem::Packet(ref mut packet) = item { if discont { - gst::debug!(CAT, "Forwarding discont buffer"); + gst::debug!( + CAT, + obj = self.recv_src_pad.pad, + "Forwarding discont buffer", + ); let packet_mut = packet.make_mut(); packet_mut.set_flags(gst::BufferFlags::DISCONT); } } + gst::trace!( + CAT, + obj = self.recv_src_pad.pad, + "Handling item {id}: {item:?}, pending {}", + pending_items.len() + ); + match item { // we don't currently push packet lists into the jitterbuffer JitterBufferItem::PacketList(_list) => unreachable!(), // forward events and queries as-is JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _) => { - if pending_item.is_some() { - // but only after sending the previous pending item - next_pending_item = Some(item); - break; - } + pending_items.push(item); + break; } JitterBufferItem::Packet(ref packet) => { - match pending_item { + // Group consecutive buffers + match pending_items.pop() { Some( JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _), ) => unreachable!(), @@ -169,21 +195,20 @@ impl futures::stream::Stream for JitterBufferStream { let list_mut = list.make_mut(); list_mut.add(pending_buffer); list_mut.add(packet.clone()); - pending_item = Some(JitterBufferItem::PacketList(list)); + pending_items.push(JitterBufferItem::PacketList(list)); } Some(JitterBufferItem::PacketList(mut pending_list)) => { let list_mut = pending_list.make_mut(); list_mut.add(packet.clone()); - pending_item = Some(JitterBufferItem::PacketList(pending_list)); + pending_items.push(JitterBufferItem::PacketList(pending_list)); } None => { - pending_item = Some(item); + pending_items.push(item); } } continue; } } - return Poll::Ready(Some(item)); } jitterbuffer::PollResult::Timeout(timeout) => { if lowest_wait.is_none_or(|lowest_wait| timeout < lowest_wait) { @@ -197,20 +222,37 @@ impl futures::stream::Stream for JitterBufferStream { } jitterbuffer_store.waker = Some(cx.waker().clone()); + let store_is_empty = jitterbuffer_store.store.is_empty(); drop(jitterbuffer_store); - if next_pending_item.is_some() { - self.pending_item = next_pending_item; + if !pending_items.is_empty() { + gst::trace!( + CAT, + obj = self.recv_src_pad.pad, + "Returning {} items, store is empty: {store_is_empty}", + pending_items.len(), + ); + + // No need to hold the src pad semaphore if the JB store is not empty + return Poll::Ready(Some(( + self.recv_src_pad.pad.clone(), + if store_is_empty { + Some(src_pad_permit) + } else { + None + }, + pending_items, + ))); } - if pending_item.is_some() { - return Poll::Ready(pending_item); - } + drop(src_pad_permit); + gst::trace!(CAT, obj = self.recv_src_pad.pad, "Will return Pending"); if let Some(timeout) = lowest_wait { let this = self.get_mut(); this.sleep.as_mut().reset(timeout.into()); if !std::future::Future::poll(this.sleep.as_mut(), cx).is_pending() { + gst::trace!(CAT, obj = this.recv_src_pad.pad, "Waking up due to timeout"); cx.waker().wake_by_ref(); } } @@ -240,11 +282,14 @@ struct JitterBufferStore { jitterbuffer: JitterBuffer, } +type RtpRecvSrcPads = smallvec::SmallVec<[RtpRecvSrcPad; SRC_PAD_SMALL_VEC_CAPACITY]>; + #[derive(Debug)] struct RtpRecvSrcPadInner { pt: u8, ssrc: u32, pad: gst::Pad, + semaphore: Arc, jitter_buffer_store: Arc>, } @@ -257,6 +302,7 @@ impl RtpRecvSrcPad { pt, ssrc, pad, + semaphore: Arc::new(Semaphore::new(1)), jitter_buffer_store: Arc::new(Mutex::new(jb_store)), })) } @@ -335,6 +381,13 @@ impl HeldRecvItem { } } +#[derive(Debug)] +enum RecvSessionSrcTaskCommand { + AddRecvSrcPad(RtpRecvSrcPad), + RemoveRecvSrcPad(RtpRecvSrcPad), + Stop, +} + #[derive(Debug)] struct RecvSession { internal_session: SharedSession, @@ -348,6 +401,9 @@ struct RecvSession { recv_store: Vec, + rtp_task_cmd_tx: async_mpsc::UnboundedSender, + rtp_task_handle: Option>, + rtp_recv_srcpads: Vec, recv_flow_combiner: Arc>, @@ -359,6 +415,10 @@ impl RecvSession { let internal_session = shared_state.session_get_or_init(id, || { SharedSession::new(id, RtpProfile::Avp, RTCP_MIN_REPORT_INTERVAL, false) }); + + let recv_flow_combiner = Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new())); + let (task, rtp_task_cmd_tx) = RecvSessionSrcTask::new(recv_flow_combiner.clone()); + Self { internal_session, rtp_recv_sinkpad: None, @@ -368,16 +428,22 @@ impl RecvSession { rtp_recv_sink_seqnum: None, recv_store: vec![], + rtp_task_cmd_tx, + rtp_task_handle: Some( + rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .spawn(task.start()), + ), rtp_recv_srcpads: vec![], - recv_flow_combiner: Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new())), + recv_flow_combiner, rtcp_recv_sinkpad: None, } } - fn start_rtp_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { - gst::debug!(CAT, obj = pad, "Starting rtp recv src task"); + fn activate_recv_src_pad(&mut self, pad: &gst::Pad) { + gst::debug!(CAT, obj = pad, "Activating rtp recv src pad"); let recv_pad = self .rtp_recv_srcpads @@ -385,93 +451,39 @@ impl RecvSession { .find(|recv| &recv.pad == pad) .unwrap(); - let pad_weak = pad.downgrade(); - let recv_flow_combiner = self.recv_flow_combiner.clone(); - let store = recv_pad.jitter_buffer_store.clone(); + recv_pad + .jitter_buffer_store + .lock() + .unwrap() + .jitterbuffer + .set_flushing(false); - { - let mut store = store.lock().unwrap(); - store.jitterbuffer.set_flushing(false); - store.waker.take(); - } - - // A task per received ssrc may be a bit excessive. - // Other options are: - // - Single task per received input stream rather than per output ssrc/pt - // - somehow pool multiple recv tasks together (thread pool) - pad.start_task(move || { - let Some(pad) = pad_weak.upgrade() else { - return; - }; - - let recv_flow_combiner = recv_flow_combiner.clone(); - let store = store.clone(); - - rtpbin2::get_or_init_runtime() - .expect("initialized in change_state()") - .block_on(async move { - let mut stream = JitterBufferStream::new(store); - while let Some(item) = stream.next().await { - match item { - JitterBufferItem::PacketList(list) => { - let flow = pad.push_list(list); - gst::trace!( - CAT, - obj = pad, - "Pushed buffer list, flow ret {:?}", - flow - ); - let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); - let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); - // TODO: store flow, return only on session pads? - } - JitterBufferItem::Packet(buffer) => { - let flow = pad.push(buffer); - gst::trace!(CAT, obj = pad, "Pushed buffer, flow ret {:?}", flow); - let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); - let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); - // TODO: store flow, return only on session pads? - } - JitterBufferItem::Event(event) => { - let res = pad.push_event(event); - gst::trace!( - CAT, - obj = pad, - "Pushed serialized event, result: {}", - res - ); - } - JitterBufferItem::Query(mut query, tx) => { - // This is safe because the thread holding the original reference is waiting - // for us exclusively - let res = pad.peer_query(unsafe { query.as_mut() }); - let _ = tx.send(res); - } - } - } - }) - })?; - - gst::debug!(CAT, obj = pad, "Task started"); - - Ok(()) + self.rtp_task_cmd_tx + .unbounded_send(RecvSessionSrcTaskCommand::AddRecvSrcPad(recv_pad.clone())) + .expect("cmd chan valid until RecvSession is dropped"); } - fn stop_rtp_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { - gst::debug!(CAT, obj = pad, "Stopping rtp recv src task"); + fn deactivate_recv_src_pad(&mut self, pad: &gst::Pad) { + gst::debug!(CAT, obj = pad, "Deactivating rtp recv src pad"); + let recv_pad = self .rtp_recv_srcpads .iter_mut() .find(|recv| &recv.pad == pad) .unwrap(); - let mut store = recv_pad.jitter_buffer_store.lock().unwrap(); - store.jitterbuffer.set_flushing(true); - if let Some(waker) = store.waker.take() { - waker.wake(); - } + recv_pad + .jitter_buffer_store + .lock() + .unwrap() + .jitterbuffer + .set_flushing(true); - Ok(()) + self.rtp_task_cmd_tx + .unbounded_send(RecvSessionSrcTaskCommand::RemoveRecvSrcPad( + recv_pad.clone(), + )) + .expect("cmd chan valid until RecvSession is dropped"); } fn get_or_create_rtp_src( @@ -550,6 +562,229 @@ impl RecvSession { } } +impl Drop for RecvSession { + fn drop(&mut self) { + let Some(rtp_task_handle) = self.rtp_task_handle.take() else { + return; + }; + if rtp_task_handle.is_finished() { + return; + } + if self + .rtp_task_cmd_tx + .unbounded_send(RecvSessionSrcTaskCommand::Stop) + .is_err() + { + return; + } + let _ = futures::executor::block_on(rtp_task_handle); + } +} + +#[derive(Debug)] +struct RecvSessionSrcTask { + cmd_rx: async_mpsc::UnboundedReceiver, + recv_flow_combiner: Arc>, +} + +type JitterBufferStreams = smallvec::SmallVec<[JitterBufferStream; SRC_PAD_SMALL_VEC_CAPACITY]>; + +impl RecvSessionSrcTask { + fn new( + recv_flow_combiner: Arc>, + ) -> ( + RecvSessionSrcTask, + async_mpsc::UnboundedSender, + ) { + // task commands will be sent from a sync context + // let's not rely on back pressure if the channel is full + let (cmd_tx, cmd_rx) = async_mpsc::unbounded(); + + ( + RecvSessionSrcTask { + cmd_rx, + recv_flow_combiner, + }, + cmd_tx, + ) + } + + fn combine_jb_streams( + jb_streams: &mut [JitterBufferStream], + ) -> impl stream::FusedStream< + Item = Vec<( + gst::Pad, + Option, + JitterBufferPendingItems, + )>, + > + use<'_> { + let len = jb_streams.len(); + // + 1 so we don't end up with a 0 capacity when jb_streams is empty + stream::select_all(jb_streams.iter_mut()).ready_chunks(len + 1) + } + + fn handle_cmd( + jb_streams: &mut JitterBufferStreams, + cmd: RecvSessionSrcTaskCommand, + ) -> ControlFlow<()> { + gst::trace!(CAT, "Handling {cmd:?}"); + + use RecvSessionSrcTaskCommand::*; + match cmd { + AddRecvSrcPad(recv_src_pad) => { + let jb_stream = JitterBufferStream::new(&recv_src_pad); + jb_streams.push(jb_stream); + + gst::debug!(CAT, obj = recv_src_pad.pad, "activated"); + } + RemoveRecvSrcPad(recv_src_pad) => { + if let Some(pos) = jb_streams + .iter() + .position(|jb| jb.recv_src_pad.pad == recv_src_pad.pad) + { + let _ = jb_streams.remove(pos); + } + + gst::debug!(CAT, obj = recv_src_pad.pad, "deactivated"); + } + Stop => { + return ControlFlow::Break(()); + } + } + + ControlFlow::Continue(()) + } + + async fn start(mut self) { + gst::debug!(CAT, "Entering rtp stream task"); + + let mut jb_streams = JitterBufferStreams::new(); + let mut combined_jb_stream = Self::combine_jb_streams(&mut jb_streams); + + loop { + gst::trace!(CAT, "RecvSessionSrcTask iter"); + + let all_pad_items = futures::select! { + cmd = self.cmd_rx.next() => { + drop(combined_jb_stream); + if Self::handle_cmd( + &mut jb_streams, + cmd.expect("cmd chan valid until RecvSession is dropped")).is_break() { + break; + } + + combined_jb_stream = Self::combine_jb_streams(&mut jb_streams); + continue; + } + all_pad_items = combined_jb_stream.next() => { + let Some(all_pad_items) = all_pad_items else { + continue; + }; + if all_pad_items.is_empty() { + gst::debug!(CAT, "rtp stream task: all pad items is empty"); + continue; + } + all_pad_items + } + }; + + drop(combined_jb_stream); + match rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .spawn_blocking(move || { + gst::log!(CAT, "Handling items for {} src pads", all_pad_items.len()); + self.push_all_pad_items_blocking(jb_streams, all_pad_items) + }) + .await + .unwrap() + { + ControlFlow::Continue((this, src_pad_jb_list_)) => { + self = this; + jb_streams = src_pad_jb_list_; + combined_jb_stream = Self::combine_jb_streams(&mut jb_streams); + } + ControlFlow::Break(_) => { + break; + } + } + } + + gst::debug!(CAT, "Leaving RecvSessionSrc task"); + } + + fn push_all_pad_items_blocking( + mut self, + mut jb_streams: JitterBufferStreams, + mut all_pad_items: Vec<( + gst::Pad, + Option, + JitterBufferPendingItems, + )>, + ) -> ControlFlow { + use futures::task; + + loop { + for (pad, _semaphore_permit, items) in all_pad_items.drain(..) { + for item in items { + gst::log!(CAT, obj = pad, "Pushing item {item:?}"); + + match item { + JitterBufferItem::PacketList(list) => { + let flow = pad.push_list(list); + gst::trace!(CAT, obj = pad, "Pushed buffer list, flow ret {flow:?}"); + let mut recv_flow_combiner = self.recv_flow_combiner.lock().unwrap(); + let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); + // TODO: store flow, return only on session pads? + } + JitterBufferItem::Packet(buffer) => { + let flow = pad.push(buffer); + gst::trace!(CAT, obj = pad, "Pushed buffer, flow ret {flow:?}"); + let mut recv_flow_combiner = self.recv_flow_combiner.lock().unwrap(); + let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); + // TODO: store flow, return only on session pads? + } + JitterBufferItem::Event(event) => { + let res = pad.push_event(event); + gst::trace!(CAT, obj = pad, "Pushed serialized event, result: {res}"); + } + JitterBufferItem::Query(mut query, tx) => { + // This is safe because the thread holding the original reference is waiting + // for us exclusively + let res = pad.peer_query(unsafe { query.as_mut() }); + let _ = tx.send(res); + } + } + } + } + + // Check whether there's anything else we can do before leaving this blocking task + + if let Ok(Some(cmd)) = self.cmd_rx.try_next() { + if Self::handle_cmd(&mut jb_streams, cmd).is_break() { + return ControlFlow::Break(self); + } + } + // else, let the async task deal with errors + + let mut combined_jb_stream = Self::combine_jb_streams(&mut jb_streams); + + // With MSRC >= 1.85.0, this could use `std::task` && `task::Waker::noop` + let mut cx = task::Context::from_waker(task::noop_waker_ref()); + let Poll::Ready(Some(all_pad_items_)) = combined_jb_stream.poll_next_unpin(&mut cx) + else { + drop(combined_jb_stream); + gst::trace!(CAT, "Done processing items"); + + return ControlFlow::Continue((self, jb_streams)); + }; + + all_pad_items = all_pad_items_; + gst::log!(CAT, "Got new items for {} src pads", all_pad_items.len()); + // iterate and process the new batch + } + } +} + #[derive(Debug, Default)] struct State { shared_state: Option, @@ -627,13 +862,9 @@ impl RtpRecv { }; if active { - session.start_rtp_task(pad)?; + session.activate_recv_src_pad(pad); } else { - session.stop_rtp_task(pad)?; - - gst::debug!(CAT, obj = pad, "Stopping task"); - - let _ = pad.stop_task(); + session.deactivate_recv_src_pad(pad); } Ok(()) @@ -969,6 +1200,11 @@ impl RtpRecv { // FIXME: Should block if too many packets are stored here because the source pad task // is blocked + + let _src_pad_permit = rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .block_on(buffer.recv_src_pad.semaphore.acquire()); + let mut jb_store = buffer.recv_src_pad.jitter_buffer_store.lock().unwrap(); let ret = jb_store.jitterbuffer.queue_packet( @@ -978,8 +1214,8 @@ impl RtpRecv { ); gst::trace!( CAT, - "{}: jb queue buffer pts {} rtp ts {} marker {}: {ret:?}", - buffer.recv_src_pad.pad.name(), + obj = buffer.recv_src_pad.pad, + "jb queue buffer pts {} rtp ts {} marker {}: {ret:?}", buffer.buffer.pts().display(), rtp.timestamp(), rtp.marker_bit(), @@ -988,6 +1224,23 @@ impl RtpRecv { jitterbuffer::QueueResult::Flushing => { // TODO: return flushing result upstream } + jitterbuffer::QueueResult::Forward(_) => { + drop(mapped); + + if let Some(session) = state.session_by_id(id) { + let flow_combiner = session.recv_flow_combiner.clone(); + drop(state); + + let res = buffer.recv_src_pad.pad.push(buffer.buffer); + let _ = flow_combiner + .lock() + .unwrap() + .update_pad_flow(&buffer.recv_src_pad.pad, res); + // TODO: store flow, return only on session pads? + + state = self.state.lock().unwrap(); + } + } jitterbuffer::QueueResult::Queued(id) => { drop(mapped); @@ -999,19 +1252,33 @@ impl RtpRecv { } } jitterbuffer::QueueResult::Late => { - gst::warning!(CAT, "Late buffer was dropped"); + gst::warning!( + CAT, + obj = buffer.recv_src_pad.pad, + "Late buffer was dropped" + ); } jitterbuffer::QueueResult::Duplicate => { - gst::warning!(CAT, "Duplicate buffer was dropped"); + gst::warning!( + CAT, + obj = buffer.recv_src_pad.pad, + "Duplicate buffer was dropped" + ); } } } - HeldRecvItem::BufferList(list) => { + HeldRecvItem::BufferList(mut list) => { // FIXME: Should block if too many packets are stored here because the source pad task // is blocked + + let _src_pad_permit = rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .block_on(list.recv_src_pad.semaphore.acquire()); + let mut jb_store = list.recv_src_pad.jitter_buffer_store.lock().unwrap(); - for buffer in list.list.iter_owned() { + let buf_list = list.list.make_mut(); + for buffer in buf_list.drain(..) { let mapped = buffer.map_readable().map_err(|e| { gst::error!(CAT, imp = self, "Failed to map input buffer {e:?}"); gst::FlowError::Error @@ -1033,11 +1300,35 @@ impl RtpRecv { buffer.pts().unwrap().nseconds(), now, ); - gst::trace!(CAT, "jb queue buffer in list: {ret:?}"); + gst::trace!( + CAT, + obj = list.recv_src_pad.pad, + "jb queue buffer in list: {ret:?}", + ); match ret { jitterbuffer::QueueResult::Flushing => { return Err(gst::FlowError::Flushing); } + jitterbuffer::QueueResult::Forward(_) => { + // TODO: group consecutive buffers and push them as a list + // See: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2346#note_2989287 + + drop(mapped); + + if let Some(session) = state.session_by_id(id) { + let flow_combiner = session.recv_flow_combiner.clone(); + drop(state); + + let res = list.recv_src_pad.pad.push(buffer); + let _ = flow_combiner + .lock() + .unwrap() + .update_pad_flow(&list.recv_src_pad.pad, res); + // TODO: store flow, return only on session pads? + + state = self.state.lock().unwrap(); + } + } jitterbuffer::QueueResult::Queued(id) => { drop(mapped); @@ -1048,10 +1339,18 @@ impl RtpRecv { } } jitterbuffer::QueueResult::Late => { - gst::warning!(CAT, "Late buffer was dropped"); + gst::warning!( + CAT, + obj = list.recv_src_pad.pad, + "Late buffer was dropped" + ); } jitterbuffer::QueueResult::Duplicate => { - gst::warning!(CAT, "Duplicate buffer was dropped"); + gst::warning!( + CAT, + obj = list.recv_src_pad.pad, + "Duplicate buffer was dropped" + ); } } } @@ -1360,16 +1659,16 @@ impl RtpRecv { let mut ret = true; if let Some(session) = state.session_by_id(id) { - let jb_stores: Vec>> = session + let recv_src_pads: RtpRecvSrcPads = session .rtp_recv_srcpads .iter() .filter(|r| state.pads_session_id_map.contains_key(&r.pad)) - .map(|p| p.jitter_buffer_store.clone()) + .cloned() .collect(); drop(state); - let query = std::ptr::NonNull::from(query); + let mut query = std::ptr::NonNull::from(query); // The idea here is to reproduce the default behavior of GstPad, where // queries will run sequentially on each internally linked source pad @@ -1385,39 +1684,65 @@ impl RtpRecv { // // Also note that if there were no internally linked pads, GstPad's // behavior is to return TRUE, we do this here too. - for jb_store in jb_stores { - let mut jitterbuffer_store = jb_store.lock().unwrap(); + for recv_src_pad in recv_src_pads { + let src_pad_permit = rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .block_on(recv_src_pad.semaphore.acquire()); - let jitterbuffer::QueueResult::Queued(id) = - jitterbuffer_store.jitterbuffer.queue_serialized_item() - else { - unreachable!() - }; + let mut jb_store = recv_src_pad.jitter_buffer_store.lock().unwrap(); - let (query_tx, query_rx) = std::sync::mpsc::sync_channel(1); + match jb_store.jitterbuffer.queue_serialized_item() { + jitterbuffer::QueueResult::Forward(id) => { + // SAFETY: the `query` `ptr::NonNull` was built above from + // the `query` argument with type `&ref mut gst::QueryRef`. + let query = unsafe { query.as_mut() }; - jitterbuffer_store - .store - .insert(id, JitterBufferItem::Query(query, query_tx)); + gst::trace!( + CAT, + obj = recv_src_pad.pad, + "querying ({id}) peer: {query:?}", + ); - if let Some(waker) = jitterbuffer_store.waker.take() { - waker.wake(); - } - - drop(jitterbuffer_store); - - // Now block until the jitterbuffer has processed the query - match query_rx.recv() { - Ok(res) => { - ret |= res; + ret |= recv_src_pad.pad.peer_query(query); if ret { break; } } - _ => { - // The sender was closed because of a state change - break; + jitterbuffer::QueueResult::Queued(id) => { + gst::trace!( + CAT, + obj = recv_src_pad.pad, + "jb queuing serialized query ({id}): {query:?}", + ); + + let (query_tx, query_rx) = std::sync::mpsc::sync_channel(1); + + jb_store + .store + .insert(id, JitterBufferItem::Query(query, query_tx)); + + if let Some(waker) = jb_store.waker.take() { + waker.wake(); + } + + drop(jb_store); + drop(src_pad_permit); + + // Now block until the jitterbuffer has processed the query + match query_rx.recv() { + Ok(res) => { + ret |= res; + if ret { + break; + } + } + _ => { + // The sender was closed because of a state change + break; + } + } } + _ => unreachable!(), } } } @@ -1433,26 +1758,48 @@ impl RtpRecv { // consistently ordered with the RTP packets once output on our source // pads fn rtp_sink_queue_serialized_event(&self, id: usize, event: gst::Event) -> bool { - let state = self.state.lock().unwrap(); - if let Some(session) = state.session_by_id(id) { - for srcpad in session + let recv_src_pads: RtpRecvSrcPads = { + let state = self.state.lock().unwrap(); + + let Some(session) = state.session_by_id(id) else { + return true; + }; + + session .rtp_recv_srcpads .iter() .filter(|r| state.pads_session_id_map.contains_key(&r.pad)) - { - let mut jitterbuffer_store = srcpad.jitter_buffer_store.lock().unwrap(); + .cloned() + .collect() + }; - let jitterbuffer::QueueResult::Queued(id) = - jitterbuffer_store.jitterbuffer.queue_serialized_item() - else { - unreachable!() - }; + for recv_src_pad in recv_src_pads { + let _src_pad_permit = rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .block_on(recv_src_pad.semaphore.acquire()); - jitterbuffer_store - .store - .insert(id, JitterBufferItem::Event(event.clone())); - if let Some(waker) = jitterbuffer_store.waker.take() { - waker.wake(); + let mut jb_store = recv_src_pad.jitter_buffer_store.lock().unwrap(); + + match jb_store.jitterbuffer.queue_serialized_item() { + jitterbuffer::QueueResult::Forward(id) => { + gst::trace!(CAT, obj = recv_src_pad.pad, "Forwarding {id}: {event:?}"); + + if !recv_src_pad.pad.push_event(event.clone()) { + gst::warning!(CAT, obj = recv_src_pad.pad, "Failed to push event"); + } + } + jitterbuffer::QueueResult::Queued(id) => { + gst::trace!(CAT, obj = recv_src_pad.pad, "Queuing as {id}: {event:?}"); + + jb_store + .store + .insert(id, JitterBufferItem::Event(event.clone())); + if let Some(waker) = jb_store.waker.take() { + waker.wake(); + } + } + _ => { + unreachable!(); } } } @@ -1560,22 +1907,18 @@ impl RtpRecv { true } gst::EventView::FlushStart(_fs) => { - let state = self.state.lock().unwrap(); - let mut pause_tasks = vec![]; - if let Some(session) = state.session_by_id(id) { - for recv_pad in session.rtp_recv_srcpads.iter() { - let mut store = recv_pad.jitter_buffer_store.lock().unwrap(); - store.jitterbuffer.set_flushing(true); - if let Some(waker) = store.waker.take() { - waker.wake(); - } - pause_tasks.push(recv_pad.pad.clone()); + let mut state = self.state.lock().unwrap(); + if let Some(session) = state.mut_session_by_id(id) { + let pads = session + .rtp_recv_srcpads + .iter() + .map(|r| r.pad.clone()) + .collect::>(); + for pad in pads { + session.deactivate_recv_src_pad(&pad); } } drop(state); - for pad in pause_tasks { - let _ = pad.pause_task(); - } gst::Pad::event_default(pad, Some(&*self.obj()), event) } gst::EventView::FlushStop(_fs) => { @@ -1587,8 +1930,7 @@ impl RtpRecv { .map(|r| r.pad.clone()) .collect::>(); for pad in pads { - // Will reset flushing to false and ensure task is woken up - let _ = session.start_rtp_task(&pad); + session.activate_recv_src_pad(&pad); } } drop(state); @@ -2153,3 +2495,313 @@ impl Drop for RtpRecv { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::rtpbin2::{self, jitterbuffer::QueueResult}; + use rtp_types::RtpPacket; + use std::{sync::mpsc, thread::sleep}; + use RecvSessionSrcTaskCommand::*; + + const LATENCY: Duration = Duration::from_millis(20); + const PACKET_DURATION: Duration = Duration::from_millis(10); + + enum BufferOrList { + Buffer(gst::Buffer), + BufferList(gst::BufferList), + } + + fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + rtpbin2::get_or_init_runtime().unwrap(); + }); + } + + fn runtime<'a>() -> &'a tokio::runtime::Runtime { + rtpbin2::get_or_init_runtime().unwrap() + } + + fn make_link_recv_src_pad( + session_id: usize, + pt: u8, + ssrc: u32, + ) -> (RtpRecvSrcPad, gst::Pad, mpsc::Receiver) { + let (buf_tx, buf_rx) = mpsc::sync_channel(0); + let peer = gst::Pad::builder(gst::PadDirection::Sink) + .chain_function({ + let buf_tx = buf_tx.clone(); + move |_, _, buf| { + buf_tx.send(BufferOrList::Buffer(buf)).unwrap(); + Ok(gst::FlowSuccess::Ok) + } + }) + .chain_list_function(move |_, _, list| { + buf_tx.send(BufferOrList::BufferList(list)).unwrap(); + Ok(gst::FlowSuccess::Ok) + }) + .build(); + + let rspad = RtpRecvSrcPad::new( + pt, + ssrc, + gst::Pad::builder(gst::PadDirection::Src) + .name(format!("rtp_src_{session_id}_{pt}_{ssrc}")) + .build(), + JitterBufferStore { + waker: None, + store: BTreeMap::new(), + jitterbuffer: JitterBuffer::new(LATENCY), + }, + ); + + rspad.pad.link(&peer).unwrap(); + rspad.pad.set_active(true).unwrap(); + peer.set_active(true).unwrap(); + + rspad + .jitter_buffer_store + .lock() + .unwrap() + .jitterbuffer + .set_flushing(false); + + (rspad, peer, buf_rx) + } + + fn push_initial_events(session_id: usize, rspad: &RtpRecvSrcPad) { + let mut jb_store = rspad.jitter_buffer_store.lock().unwrap(); + + assert_eq!( + jb_store.jitterbuffer.queue_serialized_item(), + QueueResult::Forward(0) + ); + assert!(rspad.pad.push_event( + gst::event::StreamStart::builder(&format!("{session_id}_{}_{}", rspad.pt, rspad.ssrc)) + .build() + )); + assert_eq!( + jb_store.jitterbuffer.queue_serialized_item(), + QueueResult::Forward(1) + ); + assert!(rspad + .0 + .pad + .push_event(gst::event::Segment::new(&gst::FormattedSegment::< + gst::format::Time, + >::new()))); + + if let Some(waker) = jb_store.waker.take() { + waker.wake() + } + } + + #[track_caller] + fn queue_packet(rspad: &RtpRecvSrcPad, seq_no: u16, now: Instant) -> u64 { + let mut jb_store = rspad.jitter_buffer_store.lock().unwrap(); + + let pts = PACKET_DURATION.as_nanos() as u64 * seq_no as u64; + let rtp_ts = pts.mul_div_floor(90_000, *gst::ClockTime::SECOND).unwrap() as u32; + let rtp_data = + crate::rtpbin2::session::tests::generate_rtp_packet(rspad.ssrc, seq_no, rtp_ts, 4); + let packet = RtpPacket::parse(&rtp_data).unwrap(); + let QueueResult::Queued(id) = jb_store.jitterbuffer.queue_packet(&packet, pts, now) else { + unreachable!() + }; + + let mut buf = gst::Buffer::from_mut_slice(rtp_data); + let buf_mut = buf.make_mut(); + buf_mut.set_pts(gst::ClockTime::from_nseconds(pts)); + + jb_store.store.insert(id, JitterBufferItem::Packet(buf)); + if let Some(waker) = jb_store.waker.take() { + waker.wake() + } + + pts + } + + #[test] + fn recv_session_src_task_add_push_remove_stop() { + const SESSION_ID: usize = 0; + + init(); + + let (task, cmd_tx) = + RecvSessionSrcTask::new(Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new()))); + + let task_hdl = runtime().spawn(task.start()); + + let (rspad, _peer, buf_rx) = make_link_recv_src_pad(SESSION_ID, 96, 1234); + assert_eq!(Arc::strong_count(&rspad.0), 1); + cmd_tx.unbounded_send(AddRecvSrcPad(rspad.clone())).unwrap(); + + push_initial_events(SESSION_ID, &rspad); + + let mut now = Instant::now(); + let pts0 = queue_packet(&rspad, 0, now); + now += PACKET_DURATION; + let pts1 = queue_packet(&rspad, 1, now); + + match buf_rx.recv().unwrap() { + BufferOrList::Buffer(buf) => { + assert_eq!(buf.pts().unwrap().nseconds(), pts0); + let BufferOrList::Buffer(buf) = buf_rx.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts1); + } + BufferOrList::BufferList(list) => { + assert_eq!(list.len(), 2); + assert_eq!(list.get(0).unwrap().pts().unwrap().nseconds(), pts0); + assert_eq!(list.get(1).unwrap().pts().unwrap().nseconds(), pts1); + } + } + let Err(mpsc::TryRecvError::Empty) = buf_rx.try_recv() else { + unreachable!(); + }; + + assert_eq!(Arc::strong_count(&rspad.0), 2); + + cmd_tx + .unbounded_send(RemoveRecvSrcPad(rspad.clone())) + .unwrap(); + + while Arc::strong_count(&rspad.0) > 1 { + sleep(Duration::from_millis(10)); + } + + cmd_tx.unbounded_send(Stop).unwrap(); + + futures::executor::block_on(task_hdl).unwrap(); + } + + #[test] + #[ignore] // See FIXME below + fn recv_session_src_task_two_pads() { + const SESSION_ID: usize = 1; + + init(); + + let (task, cmd_tx) = + RecvSessionSrcTask::new(Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new()))); + + let task_hdl = runtime().spawn(task.start()); + + let (rspad1, _peer1, buf_rx1) = make_link_recv_src_pad(SESSION_ID, 96, 2345); + cmd_tx + .unbounded_send(AddRecvSrcPad(rspad1.clone())) + .unwrap(); + push_initial_events(SESSION_ID, &rspad1); + + let (rspad2, _peer2, buf_rx2) = make_link_recv_src_pad(SESSION_ID, 97, 3456); + cmd_tx + .unbounded_send(AddRecvSrcPad(rspad2.clone())) + .unwrap(); + push_initial_events(SESSION_ID, &rspad2); + + let mut now = Instant::now(); + let pts10 = queue_packet(&rspad1, 0, now); + let pts20 = queue_packet(&rspad2, 0, now); + + now += PACKET_DURATION; + let pts11 = queue_packet(&rspad1, 1, now); + let pts21 = queue_packet(&rspad2, 1, now); + + match buf_rx1.recv().unwrap() { + BufferOrList::Buffer(buf) => { + assert_eq!(buf.pts().unwrap().nseconds(), pts10); + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts20); + + let BufferOrList::Buffer(buf) = buf_rx1.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts11); + } + BufferOrList::BufferList(list) => { + assert_eq!(list.len(), 2); + assert_eq!(list.get(0).unwrap().pts().unwrap().nseconds(), pts10); + assert_eq!(list.get(1).unwrap().pts().unwrap().nseconds(), pts11); + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts20); + } + } + + // packets 1 has reach its deadline + // rspad2 is blocked => the recv session task is stuck in the blocking handler + + // rspad1 jb is empty => skip one packet for packets to be queued again + // otherwise the jitter buffer will consider packet 2 can be forwarded + now += 2 * PACKET_DURATION; + let pts13 = queue_packet(&rspad1, 3, now); + let pts23 = queue_packet(&rspad2, 3, now); + + now += PACKET_DURATION; + let _pts14 = queue_packet(&rspad1, 4, now); + let pts24 = queue_packet(&rspad2, 4, now); + + // wait for packets 3 to reach their deadlines + sleep(LATENCY); + + // Unlock rspad2 + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts21); + + // Blocking handler can pull packets 3 + + // FIXME with slow hardware with low precision clock (e.g. Windows CI runner) + // we can get a BufferList here. Ignoring this test for now. + let BufferOrList::Buffer(buf) = buf_rx1.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts13); + + // Blocking handler is stuck waiting for rspad2 to complete pushing packet 3 + // Remove rspad1 before its packet 3 could be handled + cmd_tx + .unbounded_send(RemoveRecvSrcPad(rspad1.clone())) + .unwrap(); + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts23); + + // Back to the async handler + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts24); + + cmd_tx + .unbounded_send(RemoveRecvSrcPad(rspad2.clone())) + .unwrap(); + + sleep(LATENCY); + let Err(mpsc::TryRecvError::Empty) = buf_rx1.try_recv() else { + unreachable!(); + }; + let Err(mpsc::TryRecvError::Empty) = buf_rx2.try_recv() else { + unreachable!(); + }; + + cmd_tx.unbounded_send(Stop).unwrap(); + + futures::executor::block_on(task_hdl).unwrap(); + } +}