diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index 866ec0fc..2642039f 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -55,6 +55,7 @@ impl Default for Settings { struct JitterBufferStream { store: Arc>, sleep: Pin>, + pending_item: Option, } impl JitterBufferStream { @@ -62,6 +63,7 @@ impl JitterBufferStream { Self { store, sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))), + pending_item: None, } } } @@ -70,52 +72,102 @@ impl futures::stream::Stream for JitterBufferStream { type Item = JitterBufferItem; fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { let now = Instant::now(); let mut lowest_wait = None; + if let Some(item) = self.pending_item.take() { + return Poll::Ready(Some(item)); + } + let mut jitterbuffer_store = self.store.lock().unwrap(); - let ret = jitterbuffer_store.jitterbuffer.poll(now); - gst::trace!(CAT, "jitterbuffer poll ret: {ret:?}"); - match ret { - jitterbuffer::PollResult::Flushing => { - return Poll::Ready(None); - } - jitterbuffer::PollResult::Drop(id) => { - jitterbuffer_store - .store - .remove(&id) - .unwrap_or_else(|| panic!("Buffer with id {id} not in store!")); - cx.waker().wake_by_ref(); - } - jitterbuffer::PollResult::Forward { id, discont } => { - let mut item = jitterbuffer_store - .store - .remove(&id) - .unwrap_or_else(|| panic!("Buffer with id {id} not in store!")); - if let JitterBufferItem::Packet(ref mut packet) = item { - if discont { - gst::debug!(CAT, "Forwarding discont buffer"); - let packet_mut = packet.make_mut(); - packet_mut.set_flags(gst::BufferFlags::DISCONT); + let mut pending_item = None; + let mut next_pending_item = None; + loop { + let ret = jitterbuffer_store.jitterbuffer.poll(now); + gst::trace!(CAT, "jitterbuffer poll ret: {ret:?}"); + match ret { + jitterbuffer::PollResult::Flushing => { + return Poll::Ready(None); + } + jitterbuffer::PollResult::Drop(id) => { + jitterbuffer_store + .store + .remove(&id) + .unwrap_or_else(|| panic!("Buffer with id {id} not in store!")); + } + jitterbuffer::PollResult::Forward { id, discont } => { + let mut item = jitterbuffer_store + .store + .remove(&id) + .unwrap_or_else(|| panic!("Buffer with id {id} not in store!")); + if let JitterBufferItem::Packet(ref mut packet) = item { + if discont { + gst::debug!(CAT, "Forwarding discont buffer"); + let packet_mut = packet.make_mut(); + packet_mut.set_flags(gst::BufferFlags::DISCONT); + } } + + match item { + // we don't currently push packet lists into the jitterbuffer + JitterBufferItem::PacketList(_list) => unreachable!(), + // forward events and queries as-is + JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _) => { + if pending_item.is_some() { + // but only after sending the previous pending item + next_pending_item = Some(item); + break + } + } + JitterBufferItem::Packet(ref packet) => { + match pending_item { + Some(JitterBufferItem::Event(_) | JitterBufferItem::Query(_, _)) => unreachable!(), + Some(JitterBufferItem::Packet(pending_buffer)) => { + let mut list = gst::BufferList::new(); + let list_mut = list.make_mut(); + list_mut.add(pending_buffer); + list_mut.add(packet.clone()); + pending_item = Some(JitterBufferItem::PacketList(list)); + } + Some(JitterBufferItem::PacketList(mut pending_list)) => { + let list_mut = pending_list.make_mut(); + list_mut.add(packet.clone()); + pending_item = Some(JitterBufferItem::PacketList(pending_list)); + } + None => { + pending_item = Some(item); + } + } + continue; + } + } + return Poll::Ready(Some(item)); } - return Poll::Ready(Some(item)); - } - jitterbuffer::PollResult::Timeout(timeout) => { - if lowest_wait.map_or(true, |lowest_wait| timeout < lowest_wait) { - lowest_wait = Some(timeout); + jitterbuffer::PollResult::Timeout(timeout) => { + if lowest_wait.map_or(true, |lowest_wait| timeout < lowest_wait) { + lowest_wait = Some(timeout); + } + break; } + // Will be woken up when necessary + jitterbuffer::PollResult::Empty => break, } - // Will be woken up when necessary - jitterbuffer::PollResult::Empty => (), } jitterbuffer_store.waker = Some(cx.waker().clone()); drop(jitterbuffer_store); + if next_pending_item.is_some() { + self.pending_item = next_pending_item; + } + + if pending_item.is_some() { + return Poll::Ready(pending_item); + } + if let Some(timeout) = lowest_wait { let this = self.get_mut(); this.sleep.as_mut().reset(timeout.into()); @@ -131,6 +183,7 @@ impl futures::stream::Stream for JitterBufferStream { #[derive(Debug)] enum JitterBufferItem { Packet(gst::Buffer), + PacketList(gst::BufferList), Event(gst::Event), Query( std::ptr::NonNull, @@ -297,6 +350,13 @@ impl RecvSession { let mut stream = JitterBufferStream::new(store); while let Some(item) = stream.next().await { match item { + JitterBufferItem::PacketList(list) => { + let flow = pad.push_list(list); + gst::trace!(CAT, obj: pad, "Pushed buffer list, flow ret {:?}", flow); + let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); + let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); + // TODO: store flow, return only on session pads? + } JitterBufferItem::Packet(buffer) => { let flow = pad.push(buffer); gst::trace!(CAT, obj: pad, "Pushed buffer, flow ret {:?}", flow);