From 327f563e8090d8eff5e601a1136076260600f83d Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Wed, 10 Jan 2024 11:03:40 +0100 Subject: [PATCH] jitterbuffer: implement support for serialized events / queries Part-of: --- net/rtp/src/rtpbin2/imp.rs | 202 +++++++++++++++++++++++++--- net/rtp/src/rtpbin2/jitterbuffer.rs | 119 +++++++++++++--- 2 files changed, 279 insertions(+), 42 deletions(-) diff --git a/net/rtp/src/rtpbin2/imp.rs b/net/rtp/src/rtpbin2/imp.rs index 4d983cd6..53e4f75e 100644 --- a/net/rtp/src/rtpbin2/imp.rs +++ b/net/rtp/src/rtpbin2/imp.rs @@ -158,7 +158,7 @@ impl JitterBufferStream { } impl futures::stream::Stream for JitterBufferStream { - type Item = (gst::Buffer, mpsc::Sender); + type Item = (JitterBufferItem, mpsc::Sender); fn poll_next( self: Pin<&mut Self>, @@ -173,16 +173,19 @@ impl futures::stream::Stream for JitterBufferStream { match jitterbuffer_store.jitterbuffer.poll(now) { jitterbuffer::PollResult::Forward { id, discont } => { if let Some(ref tx) = pad.tx { - let mut packet = jitterbuffer_store + let mut item = jitterbuffer_store .store .remove(&id) .unwrap_or_else(|| panic!("Buffer with id {id} not in store!")); - if discont { - gst::debug!(CAT, obj: pad.pad, "Forwarding discont buffer"); - let packet_mut = packet.make_mut(); - packet_mut.set_flags(gst::BufferFlags::DISCONT); + + if let JitterBufferItem::Packet(ref mut packet) = item { + if discont { + gst::debug!(CAT, obj: pad.pad, "Forwarding discont buffer"); + let packet_mut = packet.make_mut(); + packet_mut.set_flags(gst::BufferFlags::DISCONT); + } } - return Poll::Ready(Some((packet, tx.clone()))); + return Poll::Ready(Some((item, tx.clone()))); } } jitterbuffer::PollResult::Timeout(timeout) => { @@ -211,9 +214,19 @@ impl futures::stream::Stream for JitterBufferStream { } } +#[derive(Debug)] +enum JitterBufferItem { + Packet(gst::Buffer), + Event(gst::Event), + Query(std::ptr::NonNull), +} + +// SAFETY: Need to be able to pass *mut gst::QueryRef +unsafe impl Send for JitterBufferItem {} + #[derive(Debug)] struct JitterBufferStore { - store: BTreeMap, + store: BTreeMap, jitterbuffer: JitterBuffer, } @@ -222,7 +235,7 @@ struct RtpRecvSrcPad { pt: u8, ssrc: u32, pad: gst::Pad, - tx: Option>, + tx: Option>, jitter_buffer_store: Arc>, } @@ -305,6 +318,9 @@ struct BinSessionInner { rtp_recv_sink_segment: Option>, rtp_recv_sink_seqnum: Option, + // For replying to synchronized queries on the sink pad + query_tx: Arc>>>, + caps_map: HashMap>, recv_store: Vec, jitterbuffer_task: Option, @@ -334,6 +350,8 @@ impl BinSessionInner { rtp_recv_sink_segment: None, rtp_recv_sink_seqnum: None, + query_tx: Arc::new(Mutex::new(None)), + caps_map: HashMap::default(), recv_store: vec![], jitterbuffer_task: None, @@ -377,6 +395,7 @@ impl BinSessionInner { let pad_weak = pad.downgrade(); let recv_flow_combiner = self.recv_flow_combiner.clone(); + let query_tx = Arc::downgrade(&self.query_tx); // A task per received ssrc may be a bit excessive. // Other options are: // - Single task per received input stream rather than per output ssrc/pt @@ -386,17 +405,33 @@ impl BinSessionInner { return; }; - let Some(buffer) = rx.blocking_recv() else { + let Some(item) = rx.blocking_recv() else { gst::debug!(CAT, obj: pad, "Pad channel was closed, pausing"); let _ = pad.pause_task(); return; }; - let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); - let flow = pad.push(buffer); - gst::trace!(CAT, obj: pad, "Pushed buffer, flow ret {:?}", flow); - let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); - // TODO: store flow, return only on session pads? + match item { + JitterBufferItem::Packet(buffer) => { + let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); + let flow = pad.push(buffer); + gst::trace!(CAT, obj: pad, "Pushed buffer, flow ret {:?}", flow); + let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); + // TODO: store flow, return only on session pads? + } + JitterBufferItem::Event(event) => { + let res = pad.push_event(event); + gst::trace!(CAT, obj: pad, "Pushed serialized event, result: {}", res); + } + JitterBufferItem::Query(mut query) => { + // This is safe because the thread holding the original reference is waiting + // for us exclusively + let res = pad.query(unsafe { query.as_mut() }); + if let Some(query_tx) = query_tx.upgrade() { + let _ = query_tx.lock().unwrap().as_mut().unwrap().send(res); + } + } + } })?; gst::debug!(CAT, obj: pad, "Task started"); @@ -791,8 +826,8 @@ impl RtpBin2 { async fn jitterbuffer_task(state: Arc>) { let mut stream = JitterBufferStream::new(state); - while let Some((buffer, tx)) = stream.next().await { - let _ = tx.send(buffer).await; + while let Some((item, tx)) = stream.next().await { + let _ = tx.send(item).await; } } @@ -1067,7 +1102,7 @@ impl RtpBin2 { // is blocked let mut jitterbuffer_store = held.pad.jitter_buffer_store.lock().unwrap(); - match jitterbuffer_store.jitterbuffer.queue( + match jitterbuffer_store.jitterbuffer.queue_packet( &rtp, held.buffer.pts().unwrap().nseconds(), now, @@ -1075,7 +1110,9 @@ impl RtpBin2 { jitterbuffer::QueueResult::Queued(id) => { drop(mapped); - jitterbuffer_store.store.insert(id, held.buffer); + jitterbuffer_store + .store + .insert(id, JitterBufferItem::Packet(held.buffer)); } jitterbuffer::QueueResult::Late => { gst::warning!(CAT, "Late buffer was dropped"); @@ -1287,6 +1324,115 @@ impl RtpBin2 { } } + pub fn rtp_recv_sink_query( + &self, + pad: &gst::Pad, + query: &mut gst::QueryRef, + id: usize, + ) -> bool { + gst::log!(CAT, obj: pad, "Handling query {query:?}"); + + if query.is_serialized() { + let state = self.state.lock().unwrap(); + let mut ret = true; + + if let Some(session) = state.session_by_id(id) { + let session = session.inner.lock().unwrap(); + + let (query_tx, query_rx) = std::sync::mpsc::channel(); + + assert!(session.query_tx.lock().unwrap().replace(query_tx).is_none()); + + let jb_stores: Vec>> = session + .rtp_recv_srcpads + .iter() + .filter(|r| state.pads_session_id_map.contains_key(&r.pad)) + .map(|p| p.jitter_buffer_store.clone()) + .collect(); + + drop(session); + + let query = std::ptr::NonNull::from(query); + + // The idea here is to reproduce the default behavior of GstPad, where + // queries will run sequentially on each internally linked source pad + // until one succeeds. + // + // We however jump through hoops here in order to keep the query + // reasonably synchronized with the data flow. + // + // While the GstPad behavior makes complete sense for allocation + // queries (can't have it succeed for two downstream branches as they + // need to modify the query), we could in the future decide to have + // the drain query run on all relevant source pads no matter what. + // + // Also note that if there were no internally linked pads, GstPad's + // behavior is to return TRUE, we do this here too. + for jb_store in jb_stores { + let mut jitterbuffer_store = jb_store.lock().unwrap(); + + let jitterbuffer::QueueResult::Queued(id) = + jitterbuffer_store.jitterbuffer.queue_serialized_item() + else { + unreachable!() + }; + + jitterbuffer_store + .store + .insert(id, JitterBufferItem::Query(query)); + + // Now block until the jitterbuffer has processed the query + match query_rx.recv() { + Ok(res) => { + ret |= res; + if ret { + break; + } + } + _ => { + // The sender was closed because of a state change + break; + } + } + } + } + + ret + } else { + gst::Pad::query_default(pad, Some(pad), query) + } + } + + // Serialized events received on our sink pads have to navigate + // through the relevant jitterbuffers in order to remain (reasonably) + // consistently ordered with the RTP packets once output on our source + // pads + fn rtp_recv_sink_queue_serialized_event(&self, id: usize, event: gst::Event) -> bool { + let state = self.state.lock().unwrap(); + if let Some(session) = state.session_by_id(id) { + let session = session.inner.lock().unwrap(); + for srcpad in session + .rtp_recv_srcpads + .iter() + .filter(|r| state.pads_session_id_map.contains_key(&r.pad)) + { + let mut jitterbuffer_store = srcpad.jitter_buffer_store.lock().unwrap(); + + let jitterbuffer::QueueResult::Queued(id) = + jitterbuffer_store.jitterbuffer.queue_serialized_item() + else { + unreachable!() + }; + + jitterbuffer_store + .store + .insert(id, JitterBufferItem::Event(event.clone())); + } + } + + true + } + fn rtp_recv_sink_event(&self, pad: &gst::Pad, mut event: gst::Event, id: usize) -> bool { match event.view() { gst::EventView::StreamStart(stream_start) => { @@ -1346,7 +1492,7 @@ impl RtpBin2 { drop(state); - gst::Pad::event_default(pad, Some(&*self.obj()), event) + self.rtp_recv_sink_queue_serialized_event(id, event) } gst::EventView::Eos(_eos) => { let now = Instant::now(); @@ -1385,7 +1531,13 @@ impl RtpBin2 { // FIXME: may need to delay sending eos under some circumstances true } - _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + _ => { + if event.is_serialized() { + self.rtp_recv_sink_queue_serialized_event(id, event) + } else { + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + } } } @@ -1808,6 +1960,13 @@ impl ElementImpl for RtpBin2 { |this| this.rtp_recv_sink_event(pad, event, id), ) }) + .query_function(move |pad, parent, query| { + RtpBin2::catch_panic_pad_function( + parent, + || false, + |this| this.rtp_recv_sink_query(pad, query, id), + ) + }) .name(format!("rtp_recv_sink_{}", id)) .build(); session.rtp_recv_sinkpad = Some(sinkpad.clone()); @@ -2028,6 +2187,7 @@ impl ElementImpl for RtpBin2 { session.rtp_recv_sink_group_id = None; session.caps_map.clear(); + session.query_tx.lock().unwrap().take(); if let Some(jitterbuffer_task) = session.jitterbuffer_task.take() { jitterbuffer_task.abort_handle.abort(); diff --git a/net/rtp/src/rtpbin2/jitterbuffer.rs b/net/rtp/src/rtpbin2/jitterbuffer.rs index c716dac9..1a626b43 100644 --- a/net/rtp/src/rtpbin2/jitterbuffer.rs +++ b/net/rtp/src/rtpbin2/jitterbuffer.rs @@ -65,7 +65,13 @@ struct Item { impl Ord for Item { fn cmp(&self, other: &Self) -> Ordering { - self.seqnum.cmp(&other.seqnum) + self.seqnum + .cmp(&other.seqnum) + .then(match (self.pts, other.pts) { + (None, Some(_)) => Ordering::Greater, + (Some(_), None) => Ordering::Less, + _ => Ordering::Equal, + }) } } @@ -101,7 +107,21 @@ impl JitterBuffer { } } - pub fn queue(&mut self, rtp: &RtpPacket, mut pts: u64, now: Instant) -> QueueResult { + pub fn queue_serialized_item(&mut self) -> QueueResult { + let id = self.packet_counter; + self.packet_counter += 1; + let item = Item { + id, + pts: None, + seqnum: (*self.seqnums.last().unwrap_or(&0)), + }; + self.items.insert(item); + trace!("Queued serialized item and assigned ID {id}"); + + QueueResult::Queued(id) + } + + pub fn queue_packet(&mut self, rtp: &RtpPacket, mut pts: u64, now: Instant) -> QueueResult { // From this point on we always work with extended sequence numbers let seqnum = self.extended_seqnum.next(rtp.sequence_number()); @@ -257,7 +277,7 @@ mod tests { let now = Instant::now(); - let QueueResult::Queued(id) = jb.queue(&packet, 0, now) else { + let QueueResult::Queued(id) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; @@ -273,7 +293,7 @@ mod tests { let mut now = Instant::now(); - let QueueResult::Queued(id) = jb.queue(&packet, 0, now) else { + let QueueResult::Queued(id) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; @@ -304,13 +324,13 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_first) = jb.queue(&packet, 0, now) else { + let QueueResult::Queued(id_first) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_second) = jb.queue(&packet, 0, now) else { + let QueueResult::Queued(id_second) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; @@ -338,13 +358,13 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_first) = jb.queue(&packet, 0, now) else { + let QueueResult::Queued(id_first) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; let rtp_data = generate_rtp_packet(0x12345678, 2, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_second) = jb.queue(&packet, 0, now) else { + let QueueResult::Queued(id_second) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; @@ -372,7 +392,7 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id) = jb.queue(&packet, 0, now) else { + let QueueResult::Queued(id) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; @@ -380,17 +400,17 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - assert_eq!(jb.queue(&packet, 0, now), QueueResult::Late); + assert_eq!(jb.queue_packet(&packet, 0, now), QueueResult::Late); // Try and push a duplicate let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - assert_eq!(jb.queue(&packet, 0, now), QueueResult::Duplicate); + assert_eq!(jb.queue_packet(&packet, 0, now), QueueResult::Duplicate); // We do accept future sequence numbers up to a distance of at least std::i16::MAX let rtp_data = generate_rtp_packet(0x12345678, std::i16::MAX as u16 + 1, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id) = jb.queue(&packet, 0, now) else { + let QueueResult::Queued(id) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; @@ -399,7 +419,7 @@ mod tests { // But no further let rtp_data = generate_rtp_packet(0x12345678, 2, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - assert_eq!(jb.queue(&packet, 0, now), QueueResult::Late); + assert_eq!(jb.queue_packet(&packet, 0, now), QueueResult::Late); } #[test] @@ -410,7 +430,7 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_first) = jb.queue(&packet, 0, now) else { + let QueueResult::Queued(id_first) = jb.queue_packet(&packet, 0, now) else { unreachable!() }; @@ -421,7 +441,7 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 1, 180000, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - let QueueResult::Queued(id_second) = jb.queue(&packet, 2_000_000_000, now) else { + let QueueResult::Queued(id_second) = jb.queue_packet(&packet, 2_000_000_000, now) else { unreachable!() }; @@ -479,13 +499,13 @@ mod tests { let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - jb.queue(&packet, 0, now); + jb.queue_packet(&packet, 0, now); assert_stats(&jb, 0, 0, 0, 0); // At this point pushing the same packet in before it gets output // results in an increment of the duplicate stat - jb.queue(&packet, 0, now); + jb.queue_packet(&packet, 0, now); assert_stats(&jb, 0, 0, 1, 0); now += Duration::from_secs(1); @@ -495,14 +515,14 @@ mod tests { // Pushing it after the first version got output also results in // an increment of the duplicate stat - jb.queue(&packet, 0, now); + jb.queue_packet(&packet, 0, now); assert_stats(&jb, 0, 0, 2, 1); // Then after a packet with seqnum 2 goes through, the lost // stat must be incremented by 1 (as packet with seqnum 1 went missing) let rtp_data = generate_rtp_packet(0x12345678, 2, 9000, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - jb.queue(&packet, 100_000_000, now); + jb.queue_packet(&packet, 100_000_000, now); now += Duration::from_millis(100); let _ = jb.poll(now); @@ -512,7 +532,7 @@ mod tests { // considered both late and lost let rtp_data = generate_rtp_packet(0x12345678, 1, 4500, 4); let packet = RtpPacket::parse(&rtp_data).unwrap(); - jb.queue(&packet, 50_000_000, now); + jb.queue_packet(&packet, 50_000_000, now); let _ = jb.poll(now); assert_stats(&jb, 1, 1, 2, 2); @@ -520,9 +540,66 @@ mod tests { // Finally if it arrives again it should be considered a duplicate, // and will have achieved the dubious honor of simultaneously being // lost, late and duplicated - jb.queue(&packet, 50_000_000, now); + jb.queue_packet(&packet, 50_000_000, now); let _ = jb.poll(now); assert_stats(&jb, 1, 1, 3, 2); } + + #[test] + fn serialized_items() { + let mut jb = JitterBuffer::new(Duration::from_secs(0)); + + let now = Instant::now(); + + let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4); + let packet = RtpPacket::parse(&rtp_data).unwrap(); + + let QueueResult::Queued(id_first_serialized_item) = jb.queue_serialized_item() else { + unreachable!() + }; + + let QueueResult::Queued(id_first) = jb.queue_packet(&packet, 0, now) else { + unreachable!() + }; + + let QueueResult::Queued(id_second_serialized_item) = jb.queue_serialized_item() else { + unreachable!() + }; + + let rtp_data = generate_rtp_packet(0x12345678, 1, 0, 4); + let packet = RtpPacket::parse(&rtp_data).unwrap(); + let QueueResult::Queued(id_second) = jb.queue_packet(&packet, 0, now) else { + unreachable!() + }; + + assert_eq!( + jb.poll(now), + PollResult::Forward { + id: id_first_serialized_item, + discont: false + } + ); + assert_eq!( + jb.poll(now), + PollResult::Forward { + id: id_first, + discont: true + } + ); + assert_eq!( + jb.poll(now), + PollResult::Forward { + id: id_second_serialized_item, + discont: false + } + ); + assert_eq!( + jb.poll(now), + PollResult::Forward { + id: id_second, + discont: false + } + ); + } }