rtp/recv: support pushing buffer lists from the jitterbuffer

Multiple concurrent buffers produced by the jitterbuffer will be
combined into a single buffer list which will be sent downstream.

Events or queries that interrupt the buffer flow will cause a split in
the output buffer list.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1618>
This commit is contained in:
Matthew Waters 2024-06-14 16:07:50 +10:00 committed by GStreamer Marge Bot
parent d036abb7d2
commit 10a31a397e

View file

@ -55,6 +55,7 @@ impl Default for Settings {
struct JitterBufferStream { struct JitterBufferStream {
store: Arc<Mutex<JitterBufferStore>>, store: Arc<Mutex<JitterBufferStore>>,
sleep: Pin<Box<tokio::time::Sleep>>, sleep: Pin<Box<tokio::time::Sleep>>,
pending_item: Option<JitterBufferItem>,
} }
impl JitterBufferStream { impl JitterBufferStream {
@ -62,6 +63,7 @@ impl JitterBufferStream {
Self { Self {
store, store,
sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))), sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))),
pending_item: None,
} }
} }
} }
@ -70,13 +72,20 @@ impl futures::stream::Stream for JitterBufferStream {
type Item = JitterBufferItem; type Item = JitterBufferItem;
fn poll_next( fn poll_next(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> { ) -> std::task::Poll<Option<Self::Item>> {
let now = Instant::now(); let now = Instant::now();
let mut lowest_wait = None; let mut lowest_wait = None;
if let Some(item) = self.pending_item.take() {
return Poll::Ready(Some(item));
}
let mut jitterbuffer_store = self.store.lock().unwrap(); let mut jitterbuffer_store = self.store.lock().unwrap();
let mut pending_item = None;
let mut next_pending_item = None;
loop {
let ret = jitterbuffer_store.jitterbuffer.poll(now); let ret = jitterbuffer_store.jitterbuffer.poll(now);
gst::trace!(CAT, "jitterbuffer poll ret: {ret:?}"); gst::trace!(CAT, "jitterbuffer poll ret: {ret:?}");
match ret { match ret {
@ -88,7 +97,6 @@ impl futures::stream::Stream for JitterBufferStream {
.store .store
.remove(&id) .remove(&id)
.unwrap_or_else(|| panic!("Buffer with id {id} not in store!")); .unwrap_or_else(|| panic!("Buffer with id {id} not in store!"));
cx.waker().wake_by_ref();
} }
jitterbuffer::PollResult::Forward { id, discont } => { jitterbuffer::PollResult::Forward { id, discont } => {
let mut item = jitterbuffer_store let mut item = jitterbuffer_store
@ -102,20 +110,64 @@ impl futures::stream::Stream for JitterBufferStream {
packet_mut.set_flags(gst::BufferFlags::DISCONT); packet_mut.set_flags(gst::BufferFlags::DISCONT);
} }
} }
match item {
// we don't currently push packet lists into the jitterbuffer
JitterBufferItem::PacketList(_list) => unreachable!(),
// forward events and queries as-is
JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _) => {
if pending_item.is_some() {
// but only after sending the previous pending item
next_pending_item = Some(item);
break
}
}
JitterBufferItem::Packet(ref packet) => {
match pending_item {
Some(JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _)) => unreachable!(),
Some(JitterBufferItem::Packet(pending_buffer)) => {
let mut list = gst::BufferList::new();
let list_mut = list.make_mut();
list_mut.add(pending_buffer);
list_mut.add(packet.clone());
pending_item = Some(JitterBufferItem::PacketList(list));
}
Some(JitterBufferItem::PacketList(mut pending_list)) => {
let list_mut = pending_list.make_mut();
list_mut.add(packet.clone());
pending_item = Some(JitterBufferItem::PacketList(pending_list));
}
None => {
pending_item = Some(item);
}
}
continue;
}
}
return Poll::Ready(Some(item)); return Poll::Ready(Some(item));
} }
jitterbuffer::PollResult::Timeout(timeout) => { jitterbuffer::PollResult::Timeout(timeout) => {
if lowest_wait.map_or(true, |lowest_wait| timeout < lowest_wait) { if lowest_wait.map_or(true, |lowest_wait| timeout < lowest_wait) {
lowest_wait = Some(timeout); lowest_wait = Some(timeout);
} }
break;
} }
// Will be woken up when necessary // Will be woken up when necessary
jitterbuffer::PollResult::Empty => (), jitterbuffer::PollResult::Empty => break,
}
} }
jitterbuffer_store.waker = Some(cx.waker().clone()); jitterbuffer_store.waker = Some(cx.waker().clone());
drop(jitterbuffer_store); drop(jitterbuffer_store);
if next_pending_item.is_some() {
self.pending_item = next_pending_item;
}
if pending_item.is_some() {
return Poll::Ready(pending_item);
}
if let Some(timeout) = lowest_wait { if let Some(timeout) = lowest_wait {
let this = self.get_mut(); let this = self.get_mut();
this.sleep.as_mut().reset(timeout.into()); this.sleep.as_mut().reset(timeout.into());
@ -131,6 +183,7 @@ impl futures::stream::Stream for JitterBufferStream {
#[derive(Debug)] #[derive(Debug)]
enum JitterBufferItem { enum JitterBufferItem {
Packet(gst::Buffer), Packet(gst::Buffer),
PacketList(gst::BufferList),
Event(gst::Event), Event(gst::Event),
Query( Query(
std::ptr::NonNull<gst::QueryRef>, std::ptr::NonNull<gst::QueryRef>,
@ -297,6 +350,13 @@ impl RecvSession {
let mut stream = JitterBufferStream::new(store); let mut stream = JitterBufferStream::new(store);
while let Some(item) = stream.next().await { while let Some(item) = stream.next().await {
match item { match item {
JitterBufferItem::PacketList(list) => {
let flow = pad.push_list(list);
gst::trace!(CAT, obj: pad, "Pushed buffer list, flow ret {:?}", flow);
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow);
// TODO: store flow, return only on session pads?
}
JitterBufferItem::Packet(buffer) => { JitterBufferItem::Packet(buffer) => {
let flow = pad.push(buffer); let flow = pad.push(buffer);
gst::trace!(CAT, obj: pad, "Pushed buffer, flow ret {:?}", flow); gst::trace!(CAT, obj: pad, "Pushed buffer, flow ret {:?}", flow);