diff --git a/Cargo.lock b/Cargo.lock index d8619b154..3e9c0f903 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3282,6 +3282,7 @@ dependencies = [ "thiserror 2.0.12", "time", "tokio", + "tokio-util", ] [[package]] diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index 50dcad98d..8eca6eae4 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -33,6 +33,7 @@ thiserror = "2" time = { version = "0.3", default-features = false, features = ["std"] } # TODO: experiment with other async executors (mio, async-std, etc) tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "time", "sync"] } +tokio-util = "0.7.15" [dev-dependencies] gst-check = { workspace = true, features = ["v1_20"] } diff --git a/net/rtp/src/rtpbin2/jitterbuffer.rs b/net/rtp/src/rtpbin2/jitterbuffer.rs index bdb0a4c40..541376a44 100644 --- a/net/rtp/src/rtpbin2/jitterbuffer.rs +++ b/net/rtp/src/rtpbin2/jitterbuffer.rs @@ -40,6 +40,7 @@ pub struct JitterBuffer { last_input_ts: Option, stats: Stats, flushing: bool, + can_forward_packets_when_empty: bool, } #[derive(Debug, PartialEq, Eq)] @@ -53,6 +54,7 @@ pub enum PollResult { #[derive(Debug, PartialEq, Eq)] pub enum QueueResult { + Forward(usize), Queued(usize), Late, Duplicate, @@ -109,10 +111,17 @@ impl JitterBuffer { num_pushed: 0, }, flushing: true, + can_forward_packets_when_empty: false, } } pub fn queue_serialized_item(&mut self) -> QueueResult { + if self.items.is_empty() { + let id = self.packet_counter; + self.packet_counter += 1; + return QueueResult::Forward(id); + } + let id = self.packet_counter; self.packet_counter += 1; let item = Item { @@ -130,6 +139,7 @@ impl JitterBuffer { trace!("Flush changed from {} to {flushing}", self.flushing); self.flushing = flushing; self.last_output_seqnum = None; + self.can_forward_packets_when_empty = false; } pub fn queue_packet(&mut self, rtp: &RtpPacket, mut pts: u64, now: Instant) -> QueueResult { @@ -182,6 +192,25 @@ impl JitterBuffer { } } + if self.items.is_empty() + // can forward after the first packet's deadline has been reached + && self.can_forward_packets_when_empty + && self + .last_output_seqnum + .is_some_and(|last_output_seqnum| seqnum == last_output_seqnum.wrapping_add(1)) + { + // No packets enqueued & seqnum is in order => can forward it immediately + self.last_output_seqnum = Some(seqnum); + let id = self.packet_counter; + self.packet_counter += 1; + self.stats.num_pushed += 1; + + return QueueResult::Forward(id); + } + + // TODO: if the segnum base is known (i.e. first seqnum in RTSP) + // we could also Forward the initial Packet. + let id = self.packet_counter; self.packet_counter += 1; let item = Item { @@ -258,6 +287,7 @@ impl JitterBuffer { let packet = self.items.pop_first().unwrap(); self.stats.num_pushed += 1; + self.can_forward_packets_when_empty = true; PollResult::Forward { id: packet.id, @@ -584,33 +614,31 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_first_serialized_item) = jb.queue_serialized_item() else { + let QueueResult::Forward(id_first_serialized_item) = jb.queue_serialized_item() else { unreachable!() }; + assert_eq!(id_first_serialized_item, 0); + + // query has been forwarded immediately + assert_eq!(jb.poll(now), PollResult::Empty); + + let QueueResult::Queued(id_first_packet) = jb.queue_packet(&packet, 0, now) else { + unreachable!() + }; + assert_eq!(id_first_packet, id_first_serialized_item + 1); + let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else { + unreachable!() + }; + assert_eq!(id_second_serialized_item, id_first_packet + 1); - // query should be forwarded immediately assert_eq!( jb.poll(now), PollResult::Forward { - id: id_first_serialized_item, - discont: false - } - ); - - let QueueResult::Queued(id_first) = jb.queue_packet(&packet, 0, now) else { - unreachable!() - }; - assert_eq!( - jb.poll(now), - PollResult::Forward { - id: id_first, + id: id_first_packet, discont: true } ); - let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else { - unreachable!() - }; assert_eq!( jb.poll(now), PollResult::Forward { @@ -621,16 +649,11 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_second) = jb.queue_packet(&packet, 0, now) else { + let QueueResult::Forward(id_second_packet) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; - assert_eq!( - jb.poll(now), - PollResult::Forward { - id: id_second, - discont: false - } - ); + assert_eq!(id_second_packet, id_second_serialized_item + 1); + assert_eq!(jb.poll(now), PollResult::Empty); } #[test] @@ -643,7 +666,7 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_first_serialized_item) = jb.queue_serialized_item() else { + let QueueResult::Forward(_id_first_serialized_item) = jb.queue_serialized_item() else { unreachable!() }; @@ -651,13 +674,17 @@ mod tests { unreachable!() }; + let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else { + unreachable!() + }; + // Everything after this should eventually return flushing, poll() will instruct to drop // everything stored and then return flushing indefinitely. jb.set_flushing(true); assert_eq!(jb.queue_packet(&packet, 0, now), QueueResult::Flushing); - assert_eq!(jb.poll(now), PollResult::Drop(id_first_serialized_item)); assert_eq!(jb.poll(now), PollResult::Drop(id_first)); + assert_eq!(jb.poll(now), PollResult::Drop(id_second_serialized_item)); assert_eq!(jb.poll(now), PollResult::Flushing); assert_eq!(jb.poll(now), PollResult::Flushing); diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index 925ea3522..d9dc2b0c0 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -41,13 +41,14 @@ use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use std::ops::{ControlFlow, Deref}; use std::pin::Pin; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, LazyLock, Mutex, MutexGuard}; use std::task::{Poll, Waker}; use std::time::{Duration, Instant, SystemTime}; -use futures::StreamExt; +use futures::channel::mpsc as async_mpsc; +use futures::{stream, StreamExt}; use gst::{glib, prelude::*, subclass::prelude::*}; -use std::sync::LazyLock; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use super::internal::{pt_clock_rate_from_caps, GstRustLogger, SharedRtpState, SharedSession}; use super::jitterbuffer::{self, JitterBuffer}; @@ -62,6 +63,10 @@ use crate::rtpbin2; const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); +/// Initial capacity for `SmallVec`s handling items related to src pads for +/// a given RTP seession. E.g.: `RtpRecvSrcPads`, `JitterBufferStreams`, ... +const SRC_PAD_SMALL_VEC_CAPACITY: usize = 16; + static CAT: LazyLock = LazyLock::new(|| { gst::DebugCategory::new( "rtprecv", @@ -90,41 +95,53 @@ impl Default for Settings { #[derive(Debug)] #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] struct JitterBufferStream { - store: Arc>, + semaphore: tokio_util::sync::PollSemaphore, + recv_src_pad: RtpRecvSrcPad, sleep: Pin>, - pending_item: Option, } impl JitterBufferStream { - fn new(store: Arc>) -> Self { + fn new(recv_src_pad: &RtpRecvSrcPad) -> Self { Self { - store, + semaphore: tokio_util::sync::PollSemaphore::new(recv_src_pad.semaphore.clone()), + recv_src_pad: recv_src_pad.clone(), sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))), - pending_item: None, } } } +/// Up to two pending items returned in one go by `JitterBufferStream` +type JitterBufferPendingItems = smallvec::SmallVec<[JitterBufferItem; 2]>; + impl futures::stream::Stream for JitterBufferStream { - type Item = JitterBufferItem; + type Item = ( + gst::Pad, + Option, + JitterBufferPendingItems, + ); fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + let Poll::Ready(src_pad_permit) = self.semaphore.poll_acquire(cx) else { + return Poll::Pending; + }; + let src_pad_permit = src_pad_permit.unwrap(); + let now = Instant::now(); let mut lowest_wait = None; - if let Some(item) = self.pending_item.take() { - return Poll::Ready(Some(item)); - } + let mut jitterbuffer_store = self.recv_src_pad.jitter_buffer_store.lock().unwrap(); - let mut jitterbuffer_store = self.store.lock().unwrap(); - let mut pending_item = None; - let mut next_pending_item = None; + let mut pending_items = JitterBufferPendingItems::new(); loop { let ret = jitterbuffer_store.jitterbuffer.poll(now); - gst::trace!(CAT, "jitterbuffer poll ret: {ret:?}"); + gst::trace!( + CAT, + obj = self.recv_src_pad.pad, + "JitterBuffer poll ret: {ret:?}", + ); match ret { jitterbuffer::PollResult::Flushing => { return Poll::Ready(None); @@ -142,25 +159,34 @@ impl futures::stream::Stream for JitterBufferStream { .unwrap_or_else(|| panic!("Buffer with id {id} not in store!")); if let JitterBufferItem::Packet(ref mut packet) = item { if discont { - gst::debug!(CAT, "Forwarding discont buffer"); + gst::debug!( + CAT, + obj = self.recv_src_pad.pad, + "Forwarding discont buffer", + ); let packet_mut = packet.make_mut(); packet_mut.set_flags(gst::BufferFlags::DISCONT); } } + gst::trace!( + CAT, + obj = self.recv_src_pad.pad, + "Handling item {id}: {item:?}, pending {}", + pending_items.len() + ); + match item { // we don't currently push packet lists into the jitterbuffer JitterBufferItem::PacketList(_list) => unreachable!(), // forward events and queries as-is JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _) => { - if pending_item.is_some() { - // but only after sending the previous pending item - next_pending_item = Some(item); - break; - } + pending_items.push(item); + break; } JitterBufferItem::Packet(ref packet) => { - match pending_item { + // Group consecutive buffers + match pending_items.pop() { Some( JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _), ) => unreachable!(), @@ -169,21 +195,20 @@ impl futures::stream::Stream for JitterBufferStream { let list_mut = list.make_mut(); list_mut.add(pending_buffer); list_mut.add(packet.clone()); - pending_item = Some(JitterBufferItem::PacketList(list)); + pending_items.push(JitterBufferItem::PacketList(list)); } Some(JitterBufferItem::PacketList(mut pending_list)) => { let list_mut = pending_list.make_mut(); list_mut.add(packet.clone()); - pending_item = Some(JitterBufferItem::PacketList(pending_list)); + pending_items.push(JitterBufferItem::PacketList(pending_list)); } None => { - pending_item = Some(item); + pending_items.push(item); } } continue; } } - return Poll::Ready(Some(item)); } jitterbuffer::PollResult::Timeout(timeout) => { if lowest_wait.is_none_or(|lowest_wait| timeout < lowest_wait) { @@ -197,20 +222,37 @@ impl futures::stream::Stream for JitterBufferStream { } jitterbuffer_store.waker = Some(cx.waker().clone()); + let store_is_empty = jitterbuffer_store.store.is_empty(); drop(jitterbuffer_store); - if next_pending_item.is_some() { - self.pending_item = next_pending_item; + if !pending_items.is_empty() { + gst::trace!( + CAT, + obj = self.recv_src_pad.pad, + "Returning {} items, store is empty: {store_is_empty}", + pending_items.len(), + ); + + // No need to hold the src pad semaphore if the JB store is not empty + return Poll::Ready(Some(( + self.recv_src_pad.pad.clone(), + if store_is_empty { + Some(src_pad_permit) + } else { + None + }, + pending_items, + ))); } - if pending_item.is_some() { - return Poll::Ready(pending_item); - } + drop(src_pad_permit); + gst::trace!(CAT, obj = self.recv_src_pad.pad, "Will return Pending"); if let Some(timeout) = lowest_wait { let this = self.get_mut(); this.sleep.as_mut().reset(timeout.into()); if !std::future::Future::poll(this.sleep.as_mut(), cx).is_pending() { + gst::trace!(CAT, obj = this.recv_src_pad.pad, "Waking up due to timeout"); cx.waker().wake_by_ref(); } } @@ -240,11 +282,14 @@ struct JitterBufferStore { jitterbuffer: JitterBuffer, } +type RtpRecvSrcPads = smallvec::SmallVec<[RtpRecvSrcPad; SRC_PAD_SMALL_VEC_CAPACITY]>; + #[derive(Debug)] struct RtpRecvSrcPadInner { pt: u8, ssrc: u32, pad: gst::Pad, + semaphore: Arc, jitter_buffer_store: Arc>, } @@ -257,6 +302,7 @@ impl RtpRecvSrcPad { pt, ssrc, pad, + semaphore: Arc::new(Semaphore::new(1)), jitter_buffer_store: Arc::new(Mutex::new(jb_store)), })) } @@ -335,6 +381,13 @@ impl HeldRecvItem { } } +#[derive(Debug)] +enum RecvSessionSrcTaskCommand { + AddRecvSrcPad(RtpRecvSrcPad), + RemoveRecvSrcPad(RtpRecvSrcPad), + Stop, +} + #[derive(Debug)] struct RecvSession { internal_session: SharedSession, @@ -348,6 +401,9 @@ struct RecvSession { recv_store: Vec, + rtp_task_cmd_tx: async_mpsc::UnboundedSender, + rtp_task_handle: Option>, + rtp_recv_srcpads: Vec, recv_flow_combiner: Arc>, @@ -359,6 +415,10 @@ impl RecvSession { let internal_session = shared_state.session_get_or_init(id, || { SharedSession::new(id, RtpProfile::Avp, RTCP_MIN_REPORT_INTERVAL, false) }); + + let recv_flow_combiner = Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new())); + let (task, rtp_task_cmd_tx) = RecvSessionSrcTask::new(recv_flow_combiner.clone()); + Self { internal_session, rtp_recv_sinkpad: None, @@ -368,16 +428,22 @@ impl RecvSession { rtp_recv_sink_seqnum: None, recv_store: vec![], + rtp_task_cmd_tx, + rtp_task_handle: Some( + rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .spawn(task.start()), + ), rtp_recv_srcpads: vec![], - recv_flow_combiner: Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new())), + recv_flow_combiner, rtcp_recv_sinkpad: None, } } - fn start_rtp_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { - gst::debug!(CAT, obj = pad, "Starting rtp recv src task"); + fn activate_recv_src_pad(&mut self, pad: &gst::Pad) { + gst::debug!(CAT, obj = pad, "Activating rtp recv src pad"); let recv_pad = self .rtp_recv_srcpads @@ -385,93 +451,39 @@ impl RecvSession { .find(|recv| &recv.pad == pad) .unwrap(); - let pad_weak = pad.downgrade(); - let recv_flow_combiner = self.recv_flow_combiner.clone(); - let store = recv_pad.jitter_buffer_store.clone(); + recv_pad + .jitter_buffer_store + .lock() + .unwrap() + .jitterbuffer + .set_flushing(false); - { - let mut store = store.lock().unwrap(); - store.jitterbuffer.set_flushing(false); - store.waker.take(); - } - - // A task per received ssrc may be a bit excessive. - // Other options are: - // - Single task per received input stream rather than per output ssrc/pt - // - somehow pool multiple recv tasks together (thread pool) - pad.start_task(move || { - let Some(pad) = pad_weak.upgrade() else { - return; - }; - - let recv_flow_combiner = recv_flow_combiner.clone(); - let store = store.clone(); - - rtpbin2::get_or_init_runtime() - .expect("initialized in change_state()") - .block_on(async move { - let mut stream = JitterBufferStream::new(store); - while let Some(item) = stream.next().await { - match item { - JitterBufferItem::PacketList(list) => { - let flow = pad.push_list(list); - gst::trace!( - CAT, - obj = pad, - "Pushed buffer list, flow ret {:?}", - flow - ); - let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); - let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); - // TODO: store flow, return only on session pads? - } - JitterBufferItem::Packet(buffer) => { - let flow = pad.push(buffer); - gst::trace!(CAT, obj = pad, "Pushed buffer, flow ret {:?}", flow); - let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); - let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); - // TODO: store flow, return only on session pads? - } - JitterBufferItem::Event(event) => { - let res = pad.push_event(event); - gst::trace!( - CAT, - obj = pad, - "Pushed serialized event, result: {}", - res - ); - } - JitterBufferItem::Query(mut query, tx) => { - // This is safe because the thread holding the original reference is waiting - // for us exclusively - let res = pad.peer_query(unsafe { query.as_mut() }); - let _ = tx.send(res); - } - } - } - }) - })?; - - gst::debug!(CAT, obj = pad, "Task started"); - - Ok(()) + self.rtp_task_cmd_tx + .unbounded_send(RecvSessionSrcTaskCommand::AddRecvSrcPad(recv_pad.clone())) + .expect("cmd chan valid until RecvSession is dropped"); } - fn stop_rtp_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> { - gst::debug!(CAT, obj = pad, "Stopping rtp recv src task"); + fn deactivate_recv_src_pad(&mut self, pad: &gst::Pad) { + gst::debug!(CAT, obj = pad, "Deactivating rtp recv src pad"); + let recv_pad = self .rtp_recv_srcpads .iter_mut() .find(|recv| &recv.pad == pad) .unwrap(); - let mut store = recv_pad.jitter_buffer_store.lock().unwrap(); - store.jitterbuffer.set_flushing(true); - if let Some(waker) = store.waker.take() { - waker.wake(); - } + recv_pad + .jitter_buffer_store + .lock() + .unwrap() + .jitterbuffer + .set_flushing(true); - Ok(()) + self.rtp_task_cmd_tx + .unbounded_send(RecvSessionSrcTaskCommand::RemoveRecvSrcPad( + recv_pad.clone(), + )) + .expect("cmd chan valid until RecvSession is dropped"); } fn get_or_create_rtp_src( @@ -550,6 +562,229 @@ impl RecvSession { } } +impl Drop for RecvSession { + fn drop(&mut self) { + let Some(rtp_task_handle) = self.rtp_task_handle.take() else { + return; + }; + if rtp_task_handle.is_finished() { + return; + } + if self + .rtp_task_cmd_tx + .unbounded_send(RecvSessionSrcTaskCommand::Stop) + .is_err() + { + return; + } + let _ = futures::executor::block_on(rtp_task_handle); + } +} + +#[derive(Debug)] +struct RecvSessionSrcTask { + cmd_rx: async_mpsc::UnboundedReceiver, + recv_flow_combiner: Arc>, +} + +type JitterBufferStreams = smallvec::SmallVec<[JitterBufferStream; SRC_PAD_SMALL_VEC_CAPACITY]>; + +impl RecvSessionSrcTask { + fn new( + recv_flow_combiner: Arc>, + ) -> ( + RecvSessionSrcTask, + async_mpsc::UnboundedSender, + ) { + // task commands will be sent from a sync context + // let's not rely on back pressure if the channel is full + let (cmd_tx, cmd_rx) = async_mpsc::unbounded(); + + ( + RecvSessionSrcTask { + cmd_rx, + recv_flow_combiner, + }, + cmd_tx, + ) + } + + fn combine_jb_streams( + jb_streams: &mut [JitterBufferStream], + ) -> impl stream::FusedStream< + Item = Vec<( + gst::Pad, + Option, + JitterBufferPendingItems, + )>, + > + use<'_> { + let len = jb_streams.len(); + // + 1 so we don't end up with a 0 capacity when jb_streams is empty + stream::select_all(jb_streams.iter_mut()).ready_chunks(len + 1) + } + + fn handle_cmd( + jb_streams: &mut JitterBufferStreams, + cmd: RecvSessionSrcTaskCommand, + ) -> ControlFlow<()> { + gst::trace!(CAT, "Handling {cmd:?}"); + + use RecvSessionSrcTaskCommand::*; + match cmd { + AddRecvSrcPad(recv_src_pad) => { + let jb_stream = JitterBufferStream::new(&recv_src_pad); + jb_streams.push(jb_stream); + + gst::debug!(CAT, obj = recv_src_pad.pad, "activated"); + } + RemoveRecvSrcPad(recv_src_pad) => { + if let Some(pos) = jb_streams + .iter() + .position(|jb| jb.recv_src_pad.pad == recv_src_pad.pad) + { + let _ = jb_streams.remove(pos); + } + + gst::debug!(CAT, obj = recv_src_pad.pad, "deactivated"); + } + Stop => { + return ControlFlow::Break(()); + } + } + + ControlFlow::Continue(()) + } + + async fn start(mut self) { + gst::debug!(CAT, "Entering rtp stream task"); + + let mut jb_streams = JitterBufferStreams::new(); + let mut combined_jb_stream = Self::combine_jb_streams(&mut jb_streams); + + loop { + gst::trace!(CAT, "RecvSessionSrcTask iter"); + + let all_pad_items = futures::select! { + cmd = self.cmd_rx.next() => { + drop(combined_jb_stream); + if Self::handle_cmd( + &mut jb_streams, + cmd.expect("cmd chan valid until RecvSession is dropped")).is_break() { + break; + } + + combined_jb_stream = Self::combine_jb_streams(&mut jb_streams); + continue; + } + all_pad_items = combined_jb_stream.next() => { + let Some(all_pad_items) = all_pad_items else { + continue; + }; + if all_pad_items.is_empty() { + gst::debug!(CAT, "rtp stream task: all pad items is empty"); + continue; + } + all_pad_items + } + }; + + drop(combined_jb_stream); + match rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .spawn_blocking(move || { + gst::log!(CAT, "Handling items for {} src pads", all_pad_items.len()); + self.push_all_pad_items_blocking(jb_streams, all_pad_items) + }) + .await + .unwrap() + { + ControlFlow::Continue((this, src_pad_jb_list_)) => { + self = this; + jb_streams = src_pad_jb_list_; + combined_jb_stream = Self::combine_jb_streams(&mut jb_streams); + } + ControlFlow::Break(_) => { + break; + } + } + } + + gst::debug!(CAT, "Leaving RecvSessionSrc task"); + } + + fn push_all_pad_items_blocking( + mut self, + mut jb_streams: JitterBufferStreams, + mut all_pad_items: Vec<( + gst::Pad, + Option, + JitterBufferPendingItems, + )>, + ) -> ControlFlow { + use futures::task; + + loop { + for (pad, _semaphore_permit, items) in all_pad_items.drain(..) { + for item in items { + gst::log!(CAT, obj = pad, "Pushing item {item:?}"); + + match item { + JitterBufferItem::PacketList(list) => { + let flow = pad.push_list(list); + gst::trace!(CAT, obj = pad, "Pushed buffer list, flow ret {flow:?}"); + let mut recv_flow_combiner = self.recv_flow_combiner.lock().unwrap(); + let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); + // TODO: store flow, return only on session pads? + } + JitterBufferItem::Packet(buffer) => { + let flow = pad.push(buffer); + gst::trace!(CAT, obj = pad, "Pushed buffer, flow ret {flow:?}"); + let mut recv_flow_combiner = self.recv_flow_combiner.lock().unwrap(); + let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); + // TODO: store flow, return only on session pads? + } + JitterBufferItem::Event(event) => { + let res = pad.push_event(event); + gst::trace!(CAT, obj = pad, "Pushed serialized event, result: {res}"); + } + JitterBufferItem::Query(mut query, tx) => { + // This is safe because the thread holding the original reference is waiting + // for us exclusively + let res = pad.peer_query(unsafe { query.as_mut() }); + let _ = tx.send(res); + } + } + } + } + + // Check whether there's anything else we can do before leaving this blocking task + + if let Ok(Some(cmd)) = self.cmd_rx.try_next() { + if Self::handle_cmd(&mut jb_streams, cmd).is_break() { + return ControlFlow::Break(self); + } + } + // else, let the async task deal with errors + + let mut combined_jb_stream = Self::combine_jb_streams(&mut jb_streams); + + // With MSRC >= 1.85.0, this could use `std::task` && `task::Waker::noop` + let mut cx = task::Context::from_waker(task::noop_waker_ref()); + let Poll::Ready(Some(all_pad_items_)) = combined_jb_stream.poll_next_unpin(&mut cx) + else { + drop(combined_jb_stream); + gst::trace!(CAT, "Done processing items"); + + return ControlFlow::Continue((self, jb_streams)); + }; + + all_pad_items = all_pad_items_; + gst::log!(CAT, "Got new items for {} src pads", all_pad_items.len()); + // iterate and process the new batch + } + } +} + #[derive(Debug, Default)] struct State { shared_state: Option, @@ -627,13 +862,9 @@ impl RtpRecv { }; if active { - session.start_rtp_task(pad)?; + session.activate_recv_src_pad(pad); } else { - session.stop_rtp_task(pad)?; - - gst::debug!(CAT, obj = pad, "Stopping task"); - - let _ = pad.stop_task(); + session.deactivate_recv_src_pad(pad); } Ok(()) @@ -969,6 +1200,11 @@ impl RtpRecv { // FIXME: Should block if too many packets are stored here because the source pad task // is blocked + + let _src_pad_permit = rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .block_on(buffer.recv_src_pad.semaphore.acquire()); + let mut jb_store = buffer.recv_src_pad.jitter_buffer_store.lock().unwrap(); let ret = jb_store.jitterbuffer.queue_packet( @@ -978,8 +1214,8 @@ impl RtpRecv { ); gst::trace!( CAT, - "{}: jb queue buffer pts {} rtp ts {} marker {}: {ret:?}", - buffer.recv_src_pad.pad.name(), + obj = buffer.recv_src_pad.pad, + "jb queue buffer pts {} rtp ts {} marker {}: {ret:?}", buffer.buffer.pts().display(), rtp.timestamp(), rtp.marker_bit(), @@ -988,6 +1224,23 @@ impl RtpRecv { jitterbuffer::QueueResult::Flushing => { // TODO: return flushing result upstream } + jitterbuffer::QueueResult::Forward(_) => { + drop(mapped); + + if let Some(session) = state.session_by_id(id) { + let flow_combiner = session.recv_flow_combiner.clone(); + drop(state); + + let res = buffer.recv_src_pad.pad.push(buffer.buffer); + let _ = flow_combiner + .lock() + .unwrap() + .update_pad_flow(&buffer.recv_src_pad.pad, res); + // TODO: store flow, return only on session pads? + + state = self.state.lock().unwrap(); + } + } jitterbuffer::QueueResult::Queued(id) => { drop(mapped); @@ -999,19 +1252,33 @@ impl RtpRecv { } } jitterbuffer::QueueResult::Late => { - gst::warning!(CAT, "Late buffer was dropped"); + gst::warning!( + CAT, + obj = buffer.recv_src_pad.pad, + "Late buffer was dropped" + ); } jitterbuffer::QueueResult::Duplicate => { - gst::warning!(CAT, "Duplicate buffer was dropped"); + gst::warning!( + CAT, + obj = buffer.recv_src_pad.pad, + "Duplicate buffer was dropped" + ); } } } - HeldRecvItem::BufferList(list) => { + HeldRecvItem::BufferList(mut list) => { // FIXME: Should block if too many packets are stored here because the source pad task // is blocked + + let _src_pad_permit = rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .block_on(list.recv_src_pad.semaphore.acquire()); + let mut jb_store = list.recv_src_pad.jitter_buffer_store.lock().unwrap(); - for buffer in list.list.iter_owned() { + let buf_list = list.list.make_mut(); + for buffer in buf_list.drain(..) { let mapped = buffer.map_readable().map_err(|e| { gst::error!(CAT, imp = self, "Failed to map input buffer {e:?}"); gst::FlowError::Error @@ -1033,11 +1300,35 @@ impl RtpRecv { buffer.pts().unwrap().nseconds(), now, ); - gst::trace!(CAT, "jb queue buffer in list: {ret:?}"); + gst::trace!( + CAT, + obj = list.recv_src_pad.pad, + "jb queue buffer in list: {ret:?}", + ); match ret { jitterbuffer::QueueResult::Flushing => { return Err(gst::FlowError::Flushing); } + jitterbuffer::QueueResult::Forward(_) => { + // TODO: group consecutive buffers and push them as a list + // See: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2346#note_2989287 + + drop(mapped); + + if let Some(session) = state.session_by_id(id) { + let flow_combiner = session.recv_flow_combiner.clone(); + drop(state); + + let res = list.recv_src_pad.pad.push(buffer); + let _ = flow_combiner + .lock() + .unwrap() + .update_pad_flow(&list.recv_src_pad.pad, res); + // TODO: store flow, return only on session pads? + + state = self.state.lock().unwrap(); + } + } jitterbuffer::QueueResult::Queued(id) => { drop(mapped); @@ -1048,10 +1339,18 @@ impl RtpRecv { } } jitterbuffer::QueueResult::Late => { - gst::warning!(CAT, "Late buffer was dropped"); + gst::warning!( + CAT, + obj = list.recv_src_pad.pad, + "Late buffer was dropped" + ); } jitterbuffer::QueueResult::Duplicate => { - gst::warning!(CAT, "Duplicate buffer was dropped"); + gst::warning!( + CAT, + obj = list.recv_src_pad.pad, + "Duplicate buffer was dropped" + ); } } } @@ -1360,16 +1659,16 @@ impl RtpRecv { let mut ret = true; if let Some(session) = state.session_by_id(id) { - let jb_stores: Vec>> = session + let recv_src_pads: RtpRecvSrcPads = session .rtp_recv_srcpads .iter() .filter(|r| state.pads_session_id_map.contains_key(&r.pad)) - .map(|p| p.jitter_buffer_store.clone()) + .cloned() .collect(); drop(state); - let query = std::ptr::NonNull::from(query); + let mut query = std::ptr::NonNull::from(query); // The idea here is to reproduce the default behavior of GstPad, where // queries will run sequentially on each internally linked source pad @@ -1385,39 +1684,65 @@ impl RtpRecv { // // Also note that if there were no internally linked pads, GstPad's // behavior is to return TRUE, we do this here too. - for jb_store in jb_stores { - let mut jitterbuffer_store = jb_store.lock().unwrap(); + for recv_src_pad in recv_src_pads { + let src_pad_permit = rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .block_on(recv_src_pad.semaphore.acquire()); - let jitterbuffer::QueueResult::Queued(id) = - jitterbuffer_store.jitterbuffer.queue_serialized_item() - else { - unreachable!() - }; + let mut jb_store = recv_src_pad.jitter_buffer_store.lock().unwrap(); - let (query_tx, query_rx) = std::sync::mpsc::sync_channel(1); + match jb_store.jitterbuffer.queue_serialized_item() { + jitterbuffer::QueueResult::Forward(id) => { + // SAFETY: the `query` `ptr::NonNull` was built above from + // the `query` argument with type `&ref mut gst::QueryRef`. + let query = unsafe { query.as_mut() }; - jitterbuffer_store - .store - .insert(id, JitterBufferItem::Query(query, query_tx)); + gst::trace!( + CAT, + obj = recv_src_pad.pad, + "querying ({id}) peer: {query:?}", + ); - if let Some(waker) = jitterbuffer_store.waker.take() { - waker.wake(); - } - - drop(jitterbuffer_store); - - // Now block until the jitterbuffer has processed the query - match query_rx.recv() { - Ok(res) => { - ret |= res; + ret |= recv_src_pad.pad.peer_query(query); if ret { break; } } - _ => { - // The sender was closed because of a state change - break; + jitterbuffer::QueueResult::Queued(id) => { + gst::trace!( + CAT, + obj = recv_src_pad.pad, + "jb queuing serialized query ({id}): {query:?}", + ); + + let (query_tx, query_rx) = std::sync::mpsc::sync_channel(1); + + jb_store + .store + .insert(id, JitterBufferItem::Query(query, query_tx)); + + if let Some(waker) = jb_store.waker.take() { + waker.wake(); + } + + drop(jb_store); + drop(src_pad_permit); + + // Now block until the jitterbuffer has processed the query + match query_rx.recv() { + Ok(res) => { + ret |= res; + if ret { + break; + } + } + _ => { + // The sender was closed because of a state change + break; + } + } } + _ => unreachable!(), } } } @@ -1433,26 +1758,48 @@ impl RtpRecv { // consistently ordered with the RTP packets once output on our source // pads fn rtp_sink_queue_serialized_event(&self, id: usize, event: gst::Event) -> bool { - let state = self.state.lock().unwrap(); - if let Some(session) = state.session_by_id(id) { - for srcpad in session + let recv_src_pads: RtpRecvSrcPads = { + let state = self.state.lock().unwrap(); + + let Some(session) = state.session_by_id(id) else { + return true; + }; + + session .rtp_recv_srcpads .iter() .filter(|r| state.pads_session_id_map.contains_key(&r.pad)) - { - let mut jitterbuffer_store = srcpad.jitter_buffer_store.lock().unwrap(); + .cloned() + .collect() + }; - let jitterbuffer::QueueResult::Queued(id) = - jitterbuffer_store.jitterbuffer.queue_serialized_item() - else { - unreachable!() - }; + for recv_src_pad in recv_src_pads { + let _src_pad_permit = rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .block_on(recv_src_pad.semaphore.acquire()); - jitterbuffer_store - .store - .insert(id, JitterBufferItem::Event(event.clone())); - if let Some(waker) = jitterbuffer_store.waker.take() { - waker.wake(); + let mut jb_store = recv_src_pad.jitter_buffer_store.lock().unwrap(); + + match jb_store.jitterbuffer.queue_serialized_item() { + jitterbuffer::QueueResult::Forward(id) => { + gst::trace!(CAT, obj = recv_src_pad.pad, "Forwarding {id}: {event:?}"); + + if !recv_src_pad.pad.push_event(event.clone()) { + gst::warning!(CAT, obj = recv_src_pad.pad, "Failed to push event"); + } + } + jitterbuffer::QueueResult::Queued(id) => { + gst::trace!(CAT, obj = recv_src_pad.pad, "Queuing as {id}: {event:?}"); + + jb_store + .store + .insert(id, JitterBufferItem::Event(event.clone())); + if let Some(waker) = jb_store.waker.take() { + waker.wake(); + } + } + _ => { + unreachable!(); } } } @@ -1560,22 +1907,18 @@ impl RtpRecv { true } gst::EventView::FlushStart(_fs) => { - let state = self.state.lock().unwrap(); - let mut pause_tasks = vec![]; - if let Some(session) = state.session_by_id(id) { - for recv_pad in session.rtp_recv_srcpads.iter() { - let mut store = recv_pad.jitter_buffer_store.lock().unwrap(); - store.jitterbuffer.set_flushing(true); - if let Some(waker) = store.waker.take() { - waker.wake(); - } - pause_tasks.push(recv_pad.pad.clone()); + let mut state = self.state.lock().unwrap(); + if let Some(session) = state.mut_session_by_id(id) { + let pads = session + .rtp_recv_srcpads + .iter() + .map(|r| r.pad.clone()) + .collect::>(); + for pad in pads { + session.deactivate_recv_src_pad(&pad); } } drop(state); - for pad in pause_tasks { - let _ = pad.pause_task(); - } gst::Pad::event_default(pad, Some(&*self.obj()), event) } gst::EventView::FlushStop(_fs) => { @@ -1587,8 +1930,7 @@ impl RtpRecv { .map(|r| r.pad.clone()) .collect::>(); for pad in pads { - // Will reset flushing to false and ensure task is woken up - let _ = session.start_rtp_task(&pad); + session.activate_recv_src_pad(&pad); } } drop(state); @@ -2153,3 +2495,313 @@ impl Drop for RtpRecv { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::rtpbin2::{self, jitterbuffer::QueueResult}; + use rtp_types::RtpPacket; + use std::{sync::mpsc, thread::sleep}; + use RecvSessionSrcTaskCommand::*; + + const LATENCY: Duration = Duration::from_millis(20); + const PACKET_DURATION: Duration = Duration::from_millis(10); + + enum BufferOrList { + Buffer(gst::Buffer), + BufferList(gst::BufferList), + } + + fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + rtpbin2::get_or_init_runtime().unwrap(); + }); + } + + fn runtime<'a>() -> &'a tokio::runtime::Runtime { + rtpbin2::get_or_init_runtime().unwrap() + } + + fn make_link_recv_src_pad( + session_id: usize, + pt: u8, + ssrc: u32, + ) -> (RtpRecvSrcPad, gst::Pad, mpsc::Receiver) { + let (buf_tx, buf_rx) = mpsc::sync_channel(0); + let peer = gst::Pad::builder(gst::PadDirection::Sink) + .chain_function({ + let buf_tx = buf_tx.clone(); + move |_, _, buf| { + buf_tx.send(BufferOrList::Buffer(buf)).unwrap(); + Ok(gst::FlowSuccess::Ok) + } + }) + .chain_list_function(move |_, _, list| { + buf_tx.send(BufferOrList::BufferList(list)).unwrap(); + Ok(gst::FlowSuccess::Ok) + }) + .build(); + + let rspad = RtpRecvSrcPad::new( + pt, + ssrc, + gst::Pad::builder(gst::PadDirection::Src) + .name(format!("rtp_src_{session_id}_{pt}_{ssrc}")) + .build(), + JitterBufferStore { + waker: None, + store: BTreeMap::new(), + jitterbuffer: JitterBuffer::new(LATENCY), + }, + ); + + rspad.pad.link(&peer).unwrap(); + rspad.pad.set_active(true).unwrap(); + peer.set_active(true).unwrap(); + + rspad + .jitter_buffer_store + .lock() + .unwrap() + .jitterbuffer + .set_flushing(false); + + (rspad, peer, buf_rx) + } + + fn push_initial_events(session_id: usize, rspad: &RtpRecvSrcPad) { + let mut jb_store = rspad.jitter_buffer_store.lock().unwrap(); + + assert_eq!( + jb_store.jitterbuffer.queue_serialized_item(), + QueueResult::Forward(0) + ); + assert!(rspad.pad.push_event( + gst::event::StreamStart::builder(&format!("{session_id}_{}_{}", rspad.pt, rspad.ssrc)) + .build() + )); + assert_eq!( + jb_store.jitterbuffer.queue_serialized_item(), + QueueResult::Forward(1) + ); + assert!(rspad + .0 + .pad + .push_event(gst::event::Segment::new(&gst::FormattedSegment::< + gst::format::Time, + >::new()))); + + if let Some(waker) = jb_store.waker.take() { + waker.wake() + } + } + + #[track_caller] + fn queue_packet(rspad: &RtpRecvSrcPad, seq_no: u16, now: Instant) -> u64 { + let mut jb_store = rspad.jitter_buffer_store.lock().unwrap(); + + let pts = PACKET_DURATION.as_nanos() as u64 * seq_no as u64; + let rtp_ts = pts.mul_div_floor(90_000, *gst::ClockTime::SECOND).unwrap() as u32; + let rtp_data = + crate::rtpbin2::session::tests::generate_rtp_packet(rspad.ssrc, seq_no, rtp_ts, 4); + let packet = RtpPacket::parse(&rtp_data).unwrap(); + let QueueResult::Queued(id) = jb_store.jitterbuffer.queue_packet(&packet, pts, now) else { + unreachable!() + }; + + let mut buf = gst::Buffer::from_mut_slice(rtp_data); + let buf_mut = buf.make_mut(); + buf_mut.set_pts(gst::ClockTime::from_nseconds(pts)); + + jb_store.store.insert(id, JitterBufferItem::Packet(buf)); + if let Some(waker) = jb_store.waker.take() { + waker.wake() + } + + pts + } + + #[test] + fn recv_session_src_task_add_push_remove_stop() { + const SESSION_ID: usize = 0; + + init(); + + let (task, cmd_tx) = + RecvSessionSrcTask::new(Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new()))); + + let task_hdl = runtime().spawn(task.start()); + + let (rspad, _peer, buf_rx) = make_link_recv_src_pad(SESSION_ID, 96, 1234); + assert_eq!(Arc::strong_count(&rspad.0), 1); + cmd_tx.unbounded_send(AddRecvSrcPad(rspad.clone())).unwrap(); + + push_initial_events(SESSION_ID, &rspad); + + let mut now = Instant::now(); + let pts0 = queue_packet(&rspad, 0, now); + now += PACKET_DURATION; + let pts1 = queue_packet(&rspad, 1, now); + + match buf_rx.recv().unwrap() { + BufferOrList::Buffer(buf) => { + assert_eq!(buf.pts().unwrap().nseconds(), pts0); + let BufferOrList::Buffer(buf) = buf_rx.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts1); + } + BufferOrList::BufferList(list) => { + assert_eq!(list.len(), 2); + assert_eq!(list.get(0).unwrap().pts().unwrap().nseconds(), pts0); + assert_eq!(list.get(1).unwrap().pts().unwrap().nseconds(), pts1); + } + } + let Err(mpsc::TryRecvError::Empty) = buf_rx.try_recv() else { + unreachable!(); + }; + + assert_eq!(Arc::strong_count(&rspad.0), 2); + + cmd_tx + .unbounded_send(RemoveRecvSrcPad(rspad.clone())) + .unwrap(); + + while Arc::strong_count(&rspad.0) > 1 { + sleep(Duration::from_millis(10)); + } + + cmd_tx.unbounded_send(Stop).unwrap(); + + futures::executor::block_on(task_hdl).unwrap(); + } + + #[test] + #[ignore] // See FIXME below + fn recv_session_src_task_two_pads() { + const SESSION_ID: usize = 1; + + init(); + + let (task, cmd_tx) = + RecvSessionSrcTask::new(Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new()))); + + let task_hdl = runtime().spawn(task.start()); + + let (rspad1, _peer1, buf_rx1) = make_link_recv_src_pad(SESSION_ID, 96, 2345); + cmd_tx + .unbounded_send(AddRecvSrcPad(rspad1.clone())) + .unwrap(); + push_initial_events(SESSION_ID, &rspad1); + + let (rspad2, _peer2, buf_rx2) = make_link_recv_src_pad(SESSION_ID, 97, 3456); + cmd_tx + .unbounded_send(AddRecvSrcPad(rspad2.clone())) + .unwrap(); + push_initial_events(SESSION_ID, &rspad2); + + let mut now = Instant::now(); + let pts10 = queue_packet(&rspad1, 0, now); + let pts20 = queue_packet(&rspad2, 0, now); + + now += PACKET_DURATION; + let pts11 = queue_packet(&rspad1, 1, now); + let pts21 = queue_packet(&rspad2, 1, now); + + match buf_rx1.recv().unwrap() { + BufferOrList::Buffer(buf) => { + assert_eq!(buf.pts().unwrap().nseconds(), pts10); + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts20); + + let BufferOrList::Buffer(buf) = buf_rx1.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts11); + } + BufferOrList::BufferList(list) => { + assert_eq!(list.len(), 2); + assert_eq!(list.get(0).unwrap().pts().unwrap().nseconds(), pts10); + assert_eq!(list.get(1).unwrap().pts().unwrap().nseconds(), pts11); + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts20); + } + } + + // packets 1 has reach its deadline + // rspad2 is blocked => the recv session task is stuck in the blocking handler + + // rspad1 jb is empty => skip one packet for packets to be queued again + // otherwise the jitter buffer will consider packet 2 can be forwarded + now += 2 * PACKET_DURATION; + let pts13 = queue_packet(&rspad1, 3, now); + let pts23 = queue_packet(&rspad2, 3, now); + + now += PACKET_DURATION; + let _pts14 = queue_packet(&rspad1, 4, now); + let pts24 = queue_packet(&rspad2, 4, now); + + // wait for packets 3 to reach their deadlines + sleep(LATENCY); + + // Unlock rspad2 + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts21); + + // Blocking handler can pull packets 3 + + // FIXME with slow hardware with low precision clock (e.g. Windows CI runner) + // we can get a BufferList here. Ignoring this test for now. + let BufferOrList::Buffer(buf) = buf_rx1.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts13); + + // Blocking handler is stuck waiting for rspad2 to complete pushing packet 3 + // Remove rspad1 before its packet 3 could be handled + cmd_tx + .unbounded_send(RemoveRecvSrcPad(rspad1.clone())) + .unwrap(); + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts23); + + // Back to the async handler + + let BufferOrList::Buffer(buf) = buf_rx2.recv().unwrap() else { + unreachable!(); + }; + assert_eq!(buf.pts().unwrap().nseconds(), pts24); + + cmd_tx + .unbounded_send(RemoveRecvSrcPad(rspad2.clone())) + .unwrap(); + + sleep(LATENCY); + let Err(mpsc::TryRecvError::Empty) = buf_rx1.try_recv() else { + unreachable!(); + }; + let Err(mpsc::TryRecvError::Empty) = buf_rx2.try_recv() else { + unreachable!(); + }; + + cmd_tx.unbounded_send(Stop).unwrap(); + + futures::executor::block_on(task_hdl).unwrap(); + } +}