mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-10 11:15:33 +00:00
jitterbuffer: remove mpsc channel for every packet
It is very slow. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
parent
327f563e80
commit
66306e32f2
2 changed files with 196 additions and 151 deletions
net/rtp/src/rtpbin2
|
@ -11,7 +11,6 @@ use futures::future::{AbortHandle, Abortable};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use gst::{glib, prelude::*, subclass::prelude::*};
|
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
use super::jitterbuffer::{self, JitterBuffer};
|
use super::jitterbuffer::{self, JitterBuffer};
|
||||||
use super::session::{
|
use super::session::{
|
||||||
|
@ -144,21 +143,21 @@ impl futures::stream::Stream for RtcpSendStream {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
|
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
|
||||||
struct JitterBufferStream {
|
struct JitterBufferStream {
|
||||||
session: Arc<Mutex<BinSessionInner>>,
|
store: Arc<Mutex<JitterBufferStore>>,
|
||||||
sleep: Pin<Box<tokio::time::Sleep>>,
|
sleep: Pin<Box<tokio::time::Sleep>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JitterBufferStream {
|
impl JitterBufferStream {
|
||||||
fn new(session: Arc<Mutex<BinSessionInner>>) -> Self {
|
fn new(store: Arc<Mutex<JitterBufferStore>>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
session,
|
store,
|
||||||
sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))),
|
sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl futures::stream::Stream for JitterBufferStream {
|
impl futures::stream::Stream for JitterBufferStream {
|
||||||
type Item = (JitterBufferItem, mpsc::Sender<JitterBufferItem>);
|
type Item = JitterBufferItem;
|
||||||
|
|
||||||
fn poll_next(
|
fn poll_next(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
|
@ -167,40 +166,42 @@ impl futures::stream::Stream for JitterBufferStream {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let mut lowest_wait = None;
|
let mut lowest_wait = None;
|
||||||
|
|
||||||
let mut session = self.session.lock().unwrap();
|
let mut jitterbuffer_store = self.store.lock().unwrap();
|
||||||
for pad in session.rtp_recv_srcpads.iter_mut() {
|
match jitterbuffer_store.jitterbuffer.poll(now) {
|
||||||
let mut jitterbuffer_store = pad.jitter_buffer_store.lock().unwrap();
|
jitterbuffer::PollResult::Flushing => {
|
||||||
match jitterbuffer_store.jitterbuffer.poll(now) {
|
return Poll::Ready(None);
|
||||||
jitterbuffer::PollResult::Forward { id, discont } => {
|
}
|
||||||
if let Some(ref tx) = pad.tx {
|
jitterbuffer::PollResult::Drop(id) => {
|
||||||
let mut item = jitterbuffer_store
|
jitterbuffer_store
|
||||||
.store
|
.store
|
||||||
.remove(&id)
|
.remove(&id)
|
||||||
.unwrap_or_else(|| panic!("Buffer with id {id} not in store!"));
|
.unwrap_or_else(|| panic!("Buffer with id {id} not in store!"));
|
||||||
|
}
|
||||||
if let JitterBufferItem::Packet(ref mut packet) = item {
|
jitterbuffer::PollResult::Forward { id, discont } => {
|
||||||
if discont {
|
let mut item = jitterbuffer_store
|
||||||
gst::debug!(CAT, obj: pad.pad, "Forwarding discont buffer");
|
.store
|
||||||
let packet_mut = packet.make_mut();
|
.remove(&id)
|
||||||
packet_mut.set_flags(gst::BufferFlags::DISCONT);
|
.unwrap_or_else(|| panic!("Buffer with id {id} not in store!"));
|
||||||
}
|
if let JitterBufferItem::Packet(ref mut packet) = item {
|
||||||
}
|
if discont {
|
||||||
return Poll::Ready(Some((item, tx.clone())));
|
gst::debug!(CAT, "Forwarding discont buffer");
|
||||||
|
let packet_mut = packet.make_mut();
|
||||||
|
packet_mut.set_flags(gst::BufferFlags::DISCONT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
jitterbuffer::PollResult::Timeout(timeout) => {
|
return Poll::Ready(Some(item));
|
||||||
if lowest_wait.map_or(true, |lowest_wait| timeout < lowest_wait) {
|
}
|
||||||
lowest_wait = Some(timeout);
|
jitterbuffer::PollResult::Timeout(timeout) => {
|
||||||
}
|
if lowest_wait.map_or(true, |lowest_wait| timeout < lowest_wait) {
|
||||||
}
|
lowest_wait = Some(timeout);
|
||||||
jitterbuffer::PollResult::Empty => {
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Will be woken up when necessary
|
||||||
|
jitterbuffer::PollResult::Empty => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
session.jitterbuffer_waker = Some(cx.waker().clone());
|
jitterbuffer_store.waker = Some(cx.waker().clone());
|
||||||
drop(session);
|
drop(jitterbuffer_store);
|
||||||
|
|
||||||
if let Some(timeout) = lowest_wait {
|
if let Some(timeout) = lowest_wait {
|
||||||
let this = self.get_mut();
|
let this = self.get_mut();
|
||||||
|
@ -218,7 +219,10 @@ impl futures::stream::Stream for JitterBufferStream {
|
||||||
enum JitterBufferItem {
|
enum JitterBufferItem {
|
||||||
Packet(gst::Buffer),
|
Packet(gst::Buffer),
|
||||||
Event(gst::Event),
|
Event(gst::Event),
|
||||||
Query(std::ptr::NonNull<gst::QueryRef>),
|
Query(
|
||||||
|
std::ptr::NonNull<gst::QueryRef>,
|
||||||
|
std::sync::mpsc::SyncSender<bool>,
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
// SAFETY: Need to be able to pass *mut gst::QueryRef
|
// SAFETY: Need to be able to pass *mut gst::QueryRef
|
||||||
|
@ -227,6 +231,7 @@ unsafe impl Send for JitterBufferItem {}
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct JitterBufferStore {
|
struct JitterBufferStore {
|
||||||
store: BTreeMap<usize, JitterBufferItem>,
|
store: BTreeMap<usize, JitterBufferItem>,
|
||||||
|
waker: Option<Waker>,
|
||||||
jitterbuffer: JitterBuffer,
|
jitterbuffer: JitterBuffer,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,7 +240,6 @@ struct RtpRecvSrcPad {
|
||||||
pt: u8,
|
pt: u8,
|
||||||
ssrc: u32,
|
ssrc: u32,
|
||||||
pad: gst::Pad,
|
pad: gst::Pad,
|
||||||
tx: Option<mpsc::Sender<JitterBufferItem>>,
|
|
||||||
jitter_buffer_store: Arc<Mutex<JitterBufferStore>>,
|
jitter_buffer_store: Arc<Mutex<JitterBufferStore>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,13 +322,8 @@ struct BinSessionInner {
|
||||||
rtp_recv_sink_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
|
rtp_recv_sink_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
|
||||||
rtp_recv_sink_seqnum: Option<gst::Seqnum>,
|
rtp_recv_sink_seqnum: Option<gst::Seqnum>,
|
||||||
|
|
||||||
// For replying to synchronized queries on the sink pad
|
|
||||||
query_tx: Arc<Mutex<Option<std::sync::mpsc::Sender<bool>>>>,
|
|
||||||
|
|
||||||
caps_map: HashMap<u8, HashMap<u32, gst::Caps>>,
|
caps_map: HashMap<u8, HashMap<u32, gst::Caps>>,
|
||||||
recv_store: Vec<HeldRecvBuffer>,
|
recv_store: Vec<HeldRecvBuffer>,
|
||||||
jitterbuffer_task: Option<JitterBufferTask>,
|
|
||||||
jitterbuffer_waker: Option<Waker>,
|
|
||||||
|
|
||||||
rtp_recv_srcpads: Vec<RtpRecvSrcPad>,
|
rtp_recv_srcpads: Vec<RtpRecvSrcPad>,
|
||||||
recv_flow_combiner: Arc<Mutex<gst_base::UniqueFlowCombiner>>,
|
recv_flow_combiner: Arc<Mutex<gst_base::UniqueFlowCombiner>>,
|
||||||
|
@ -350,12 +349,8 @@ impl BinSessionInner {
|
||||||
rtp_recv_sink_segment: None,
|
rtp_recv_sink_segment: None,
|
||||||
rtp_recv_sink_seqnum: None,
|
rtp_recv_sink_seqnum: None,
|
||||||
|
|
||||||
query_tx: Arc::new(Mutex::new(None)),
|
|
||||||
|
|
||||||
caps_map: HashMap::default(),
|
caps_map: HashMap::default(),
|
||||||
recv_store: vec![],
|
recv_store: vec![],
|
||||||
jitterbuffer_task: None,
|
|
||||||
jitterbuffer_waker: None,
|
|
||||||
|
|
||||||
rtp_recv_srcpads: vec![],
|
rtp_recv_srcpads: vec![],
|
||||||
recv_flow_combiner: Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new())),
|
recv_flow_combiner: Arc::new(Mutex::new(gst_base::UniqueFlowCombiner::new())),
|
||||||
|
@ -383,19 +378,22 @@ impl BinSessionInner {
|
||||||
fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
|
fn start_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
|
||||||
gst::debug!(CAT, obj: pad, "Starting rtp recv src task");
|
gst::debug!(CAT, obj: pad, "Starting rtp recv src task");
|
||||||
|
|
||||||
let (tx, mut rx) = mpsc::channel(1);
|
|
||||||
|
|
||||||
let recv_pad = self
|
let recv_pad = self
|
||||||
.rtp_recv_srcpads
|
.rtp_recv_srcpads
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.find(|recv| &recv.pad == pad)
|
.find(|recv| &recv.pad == pad)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
recv_pad.tx = Some(tx);
|
|
||||||
|
|
||||||
let pad_weak = pad.downgrade();
|
let pad_weak = pad.downgrade();
|
||||||
|
|
||||||
let recv_flow_combiner = self.recv_flow_combiner.clone();
|
let recv_flow_combiner = self.recv_flow_combiner.clone();
|
||||||
let query_tx = Arc::downgrade(&self.query_tx);
|
let store = recv_pad.jitter_buffer_store.clone();
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut store = store.lock().unwrap();
|
||||||
|
store.jitterbuffer.set_flushing(false);
|
||||||
|
store.waker.take();
|
||||||
|
}
|
||||||
|
|
||||||
// A task per received ssrc may be a bit excessive.
|
// A task per received ssrc may be a bit excessive.
|
||||||
// Other options are:
|
// Other options are:
|
||||||
// - Single task per received input stream rather than per output ssrc/pt
|
// - Single task per received input stream rather than per output ssrc/pt
|
||||||
|
@ -405,33 +403,33 @@ impl BinSessionInner {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some(item) = rx.blocking_recv() else {
|
let recv_flow_combiner = recv_flow_combiner.clone();
|
||||||
gst::debug!(CAT, obj: pad, "Pad channel was closed, pausing");
|
let store = store.clone();
|
||||||
let _ = pad.pause_task();
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
match item {
|
RUNTIME.block_on(async move {
|
||||||
JitterBufferItem::Packet(buffer) => {
|
let mut stream = JitterBufferStream::new(store);
|
||||||
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
|
while let Some(item) = stream.next().await {
|
||||||
let flow = pad.push(buffer);
|
match item {
|
||||||
gst::trace!(CAT, obj: pad, "Pushed buffer, flow ret {:?}", flow);
|
JitterBufferItem::Packet(buffer) => {
|
||||||
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow);
|
let flow = pad.push(buffer);
|
||||||
// TODO: store flow, return only on session pads?
|
gst::trace!(CAT, obj: pad, "Pushed buffer, flow ret {:?}", flow);
|
||||||
}
|
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
|
||||||
JitterBufferItem::Event(event) => {
|
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow);
|
||||||
let res = pad.push_event(event);
|
// TODO: store flow, return only on session pads?
|
||||||
gst::trace!(CAT, obj: pad, "Pushed serialized event, result: {}", res);
|
}
|
||||||
}
|
JitterBufferItem::Event(event) => {
|
||||||
JitterBufferItem::Query(mut query) => {
|
let res = pad.push_event(event);
|
||||||
// This is safe because the thread holding the original reference is waiting
|
gst::trace!(CAT, obj: pad, "Pushed serialized event, result: {}", res);
|
||||||
// for us exclusively
|
}
|
||||||
let res = pad.query(unsafe { query.as_mut() });
|
JitterBufferItem::Query(mut query, tx) => {
|
||||||
if let Some(query_tx) = query_tx.upgrade() {
|
// This is safe because the thread holding the original reference is waiting
|
||||||
let _ = query_tx.lock().unwrap().as_mut().unwrap().send(res);
|
// for us exclusively
|
||||||
|
let res = pad.query(unsafe { query.as_mut() });
|
||||||
|
let _ = tx.send(res);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
gst::debug!(CAT, obj: pad, "Task started");
|
gst::debug!(CAT, obj: pad, "Task started");
|
||||||
|
@ -439,18 +437,21 @@ impl BinSessionInner {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop_rtp_recv_task(&mut self, pad: &gst::Pad) {
|
fn stop_rtp_recv_task(&mut self, pad: &gst::Pad) -> Result<(), glib::BoolError> {
|
||||||
|
gst::debug!(CAT, obj: pad, "Stopping rtp recv src task");
|
||||||
let recv_pad = self
|
let recv_pad = self
|
||||||
.rtp_recv_srcpads
|
.rtp_recv_srcpads
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.find(|recv| &recv.pad == pad)
|
.find(|recv| &recv.pad == pad)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// Drop the sender so the task is unblocked and we don't deadlock below
|
|
||||||
drop(recv_pad.tx.take());
|
|
||||||
|
|
||||||
gst::debug!(CAT, obj: pad, "Stopping task");
|
let mut store = recv_pad.jitter_buffer_store.lock().unwrap();
|
||||||
|
store.jitterbuffer.set_flushing(true);
|
||||||
|
if let Some(waker) = store.waker.take() {
|
||||||
|
waker.wake();
|
||||||
|
}
|
||||||
|
|
||||||
let _ = pad.stop_task();
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_or_create_rtp_recv_src(
|
fn get_or_create_rtp_recv_src(
|
||||||
|
@ -513,8 +514,8 @@ impl BinSessionInner {
|
||||||
pt,
|
pt,
|
||||||
ssrc,
|
ssrc,
|
||||||
pad: srcpad.clone(),
|
pad: srcpad.clone(),
|
||||||
tx: None,
|
|
||||||
jitter_buffer_store: Arc::new(Mutex::new(JitterBufferStore {
|
jitter_buffer_store: Arc::new(Mutex::new(JitterBufferStore {
|
||||||
|
waker: None,
|
||||||
store: BTreeMap::new(),
|
store: BTreeMap::new(),
|
||||||
jitterbuffer: JitterBuffer::new(settings.latency.into()),
|
jitterbuffer: JitterBuffer::new(settings.latency.into()),
|
||||||
})),
|
})),
|
||||||
|
@ -717,11 +718,6 @@ struct RtcpTask {
|
||||||
abort_handle: AbortHandle,
|
abort_handle: AbortHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct JitterBufferTask {
|
|
||||||
abort_handle: AbortHandle,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RtpBin2 {
|
impl RtpBin2 {
|
||||||
fn rtp_recv_src_activatemode(
|
fn rtp_recv_src_activatemode(
|
||||||
&self,
|
&self,
|
||||||
|
@ -747,7 +743,12 @@ impl RtpBin2 {
|
||||||
if active {
|
if active {
|
||||||
session.start_rtp_recv_task(pad)?;
|
session.start_rtp_recv_task(pad)?;
|
||||||
} else {
|
} else {
|
||||||
session.stop_rtp_recv_task(pad);
|
session.stop_rtp_recv_task(pad)?;
|
||||||
|
drop(session);
|
||||||
|
|
||||||
|
gst::debug!(CAT, obj: pad, "Stopping task");
|
||||||
|
|
||||||
|
let _ = pad.stop_task();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -805,32 +806,6 @@ impl RtpBin2 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_jitterbuffer_task(&self, session: &BinSession, inner: &mut BinSessionInner) {
|
|
||||||
if inner.jitterbuffer_task.is_some() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// run the runtime from another task to prevent the "start a runtime from within a runtime" panic
|
|
||||||
// when the plugin is statically linked.
|
|
||||||
let (abort_handle, abort_registration) = AbortHandle::new_pair();
|
|
||||||
RUNTIME.spawn({
|
|
||||||
let inner = session.inner.clone();
|
|
||||||
async move {
|
|
||||||
let future = Abortable::new(Self::jitterbuffer_task(inner), abort_registration);
|
|
||||||
future.await
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
inner.jitterbuffer_task = Some(JitterBufferTask { abort_handle });
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn jitterbuffer_task(state: Arc<Mutex<BinSessionInner>>) {
|
|
||||||
let mut stream = JitterBufferStream::new(state);
|
|
||||||
while let Some((item, tx)) = stream.next().await {
|
|
||||||
let _ = tx.send(item).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
|
pub fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
|
||||||
gst::log!(CAT, obj: pad, "Handling query {query:?}");
|
gst::log!(CAT, obj: pad, "Handling query {query:?}");
|
||||||
|
|
||||||
|
@ -1007,9 +982,6 @@ impl RtpBin2 {
|
||||||
|
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
// Start jitterbuffer task now if not started yet
|
|
||||||
self.start_jitterbuffer_task(&session, &mut session_inner);
|
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let mut buffers_to_push = vec![];
|
let mut buffers_to_push = vec![];
|
||||||
loop {
|
loop {
|
||||||
|
@ -1107,12 +1079,18 @@ impl RtpBin2 {
|
||||||
held.buffer.pts().unwrap().nseconds(),
|
held.buffer.pts().unwrap().nseconds(),
|
||||||
now,
|
now,
|
||||||
) {
|
) {
|
||||||
|
jitterbuffer::QueueResult::Flushing => {
|
||||||
|
// TODO: return flushing result upstream
|
||||||
|
}
|
||||||
jitterbuffer::QueueResult::Queued(id) => {
|
jitterbuffer::QueueResult::Queued(id) => {
|
||||||
drop(mapped);
|
drop(mapped);
|
||||||
|
|
||||||
jitterbuffer_store
|
jitterbuffer_store
|
||||||
.store
|
.store
|
||||||
.insert(id, JitterBufferItem::Packet(held.buffer));
|
.insert(id, JitterBufferItem::Packet(held.buffer));
|
||||||
|
if let Some(waker) = jitterbuffer_store.waker.take() {
|
||||||
|
waker.wake()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
jitterbuffer::QueueResult::Late => {
|
jitterbuffer::QueueResult::Late => {
|
||||||
gst::warning!(CAT, "Late buffer was dropped");
|
gst::warning!(CAT, "Late buffer was dropped");
|
||||||
|
@ -1123,12 +1101,6 @@ impl RtpBin2 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let session_inner = session.inner.lock().unwrap();
|
|
||||||
if let Some(ref waker) = session_inner.jitterbuffer_waker {
|
|
||||||
waker.wake_by_ref();
|
|
||||||
}
|
|
||||||
drop(session_inner);
|
|
||||||
|
|
||||||
Ok(gst::FlowSuccess::Ok)
|
Ok(gst::FlowSuccess::Ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1339,10 +1311,6 @@ impl RtpBin2 {
|
||||||
if let Some(session) = state.session_by_id(id) {
|
if let Some(session) = state.session_by_id(id) {
|
||||||
let session = session.inner.lock().unwrap();
|
let session = session.inner.lock().unwrap();
|
||||||
|
|
||||||
let (query_tx, query_rx) = std::sync::mpsc::channel();
|
|
||||||
|
|
||||||
assert!(session.query_tx.lock().unwrap().replace(query_tx).is_none());
|
|
||||||
|
|
||||||
let jb_stores: Vec<Arc<Mutex<JitterBufferStore>>> = session
|
let jb_stores: Vec<Arc<Mutex<JitterBufferStore>>> = session
|
||||||
.rtp_recv_srcpads
|
.rtp_recv_srcpads
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -1377,9 +1345,11 @@ impl RtpBin2 {
|
||||||
unreachable!()
|
unreachable!()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let (query_tx, query_rx) = std::sync::mpsc::sync_channel(1);
|
||||||
|
|
||||||
jitterbuffer_store
|
jitterbuffer_store
|
||||||
.store
|
.store
|
||||||
.insert(id, JitterBufferItem::Query(query));
|
.insert(id, JitterBufferItem::Query(query, query_tx));
|
||||||
|
|
||||||
// Now block until the jitterbuffer has processed the query
|
// Now block until the jitterbuffer has processed the query
|
||||||
match query_rx.recv() {
|
match query_rx.recv() {
|
||||||
|
@ -1531,6 +1501,7 @@ impl RtpBin2 {
|
||||||
// FIXME: may need to delay sending eos under some circumstances
|
// FIXME: may need to delay sending eos under some circumstances
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
// TODO: need to handle FlushStart/FlushStop through the jitterbuffer queue
|
||||||
_ => {
|
_ => {
|
||||||
if event.is_serialized() {
|
if event.is_serialized() {
|
||||||
self.rtp_recv_sink_queue_serialized_event(id, event)
|
self.rtp_recv_sink_queue_serialized_event(id, event)
|
||||||
|
@ -2084,8 +2055,9 @@ impl ElementImpl for RtpBin2 {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn release_pad(&self, pad: &gst::Pad) {
|
fn release_pad(&self, pad: &gst::Pad) {
|
||||||
let mut state = self.state.lock().unwrap();
|
let state = self.state.lock().unwrap();
|
||||||
let mut removed_pads = vec![];
|
let mut removed_pads = vec![];
|
||||||
|
let mut removed_session_ids = vec![];
|
||||||
if let Some(&id) = state.pads_session_id_map.get(pad) {
|
if let Some(&id) = state.pads_session_id_map.get(pad) {
|
||||||
removed_pads.push(pad.clone());
|
removed_pads.push(pad.clone());
|
||||||
if let Some(session) = state.session_by_id(id) {
|
if let Some(session) = state.session_by_id(id) {
|
||||||
|
@ -2097,11 +2069,6 @@ impl ElementImpl for RtpBin2 {
|
||||||
session.recv_flow_combiner.lock().unwrap().clear();
|
session.recv_flow_combiner.lock().unwrap().clear();
|
||||||
session.rtp_recv_srcpads.clear();
|
session.rtp_recv_srcpads.clear();
|
||||||
session.recv_store.clear();
|
session.recv_store.clear();
|
||||||
|
|
||||||
if let Some(jitterbuffer_task) = session.jitterbuffer_task.take() {
|
|
||||||
jitterbuffer_task.abort_handle.abort();
|
|
||||||
}
|
|
||||||
session.jitterbuffer_waker = None;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if Some(pad) == session.rtp_send_sinkpad.as_ref() {
|
if Some(pad) == session.rtp_send_sinkpad.as_ref() {
|
||||||
|
@ -2124,23 +2091,39 @@ impl ElementImpl for RtpBin2 {
|
||||||
&& session.rtcp_recv_sinkpad.is_none()
|
&& session.rtcp_recv_sinkpad.is_none()
|
||||||
&& session.rtcp_send_srcpad.is_none()
|
&& session.rtcp_send_srcpad.is_none()
|
||||||
{
|
{
|
||||||
let id = session.id;
|
removed_session_ids.push(session.id);
|
||||||
drop(session);
|
|
||||||
state.sessions.retain(|s| s.id != id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for pad in removed_pads.iter() {
|
|
||||||
state.pads_session_id_map.remove(pad);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
for pad in removed_pads {
|
for pad in removed_pads.iter() {
|
||||||
let _ = pad.set_active(false);
|
let _ = pad.set_active(false);
|
||||||
// Pad might not have been added yet if it's a RTP recv srcpad
|
// Pad might not have been added yet if it's a RTP recv srcpad
|
||||||
if pad.has_as_parent(&*self.obj()) {
|
if pad.has_as_parent(&*self.obj()) {
|
||||||
let _ = self.obj().remove_pad(&pad);
|
let _ = self.obj().remove_pad(pad);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
for pad in removed_pads.iter() {
|
||||||
|
state.pads_session_id_map.remove(pad);
|
||||||
|
}
|
||||||
|
for id in removed_session_ids {
|
||||||
|
if let Some(session) = state.session_by_id(id) {
|
||||||
|
let session = session.inner.lock().unwrap();
|
||||||
|
if session.rtp_recv_sinkpad.is_none()
|
||||||
|
&& session.rtp_send_sinkpad.is_none()
|
||||||
|
&& session.rtcp_recv_sinkpad.is_none()
|
||||||
|
&& session.rtcp_send_srcpad.is_none()
|
||||||
|
{
|
||||||
|
let id = session.id;
|
||||||
|
drop(session);
|
||||||
|
state.sessions.retain(|s| s.id != id);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2177,6 +2160,7 @@ impl ElementImpl for RtpBin2 {
|
||||||
for session in &state.sessions {
|
for session in &state.sessions {
|
||||||
let mut session = session.inner.lock().unwrap();
|
let mut session = session.inner.lock().unwrap();
|
||||||
removed_pads.extend(session.rtp_recv_srcpads.iter().map(|r| r.pad.clone()));
|
removed_pads.extend(session.rtp_recv_srcpads.iter().map(|r| r.pad.clone()));
|
||||||
|
|
||||||
session.recv_flow_combiner.lock().unwrap().clear();
|
session.recv_flow_combiner.lock().unwrap().clear();
|
||||||
session.rtp_recv_srcpads.clear();
|
session.rtp_recv_srcpads.clear();
|
||||||
session.recv_store.clear();
|
session.recv_store.clear();
|
||||||
|
@ -2187,26 +2171,23 @@ impl ElementImpl for RtpBin2 {
|
||||||
session.rtp_recv_sink_group_id = None;
|
session.rtp_recv_sink_group_id = None;
|
||||||
|
|
||||||
session.caps_map.clear();
|
session.caps_map.clear();
|
||||||
session.query_tx.lock().unwrap().take();
|
|
||||||
|
|
||||||
if let Some(jitterbuffer_task) = session.jitterbuffer_task.take() {
|
|
||||||
jitterbuffer_task.abort_handle.abort();
|
|
||||||
}
|
|
||||||
session.jitterbuffer_waker = None;
|
|
||||||
}
|
|
||||||
for pad in removed_pads.iter() {
|
|
||||||
state.pads_session_id_map.remove(pad);
|
|
||||||
}
|
}
|
||||||
state.sync_context = None;
|
state.sync_context = None;
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
for pad in removed_pads {
|
for pad in removed_pads.iter() {
|
||||||
let _ = pad.set_active(false);
|
let _ = pad.set_active(false);
|
||||||
// Pad might not have been added yet if it's a RTP recv srcpad
|
// Pad might not have been added yet if it's a RTP recv srcpad
|
||||||
if pad.has_as_parent(&*self.obj()) {
|
if pad.has_as_parent(&*self.obj()) {
|
||||||
let _ = self.obj().remove_pad(&pad);
|
let _ = self.obj().remove_pad(pad);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
for pad in removed_pads {
|
||||||
|
state.pads_session_id_map.remove(&pad);
|
||||||
|
}
|
||||||
|
drop(state);
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,13 +39,16 @@ pub struct JitterBuffer {
|
||||||
extended_seqnum: ExtendedSeqnum,
|
extended_seqnum: ExtendedSeqnum,
|
||||||
last_input_ts: Option<u64>,
|
last_input_ts: Option<u64>,
|
||||||
stats: Stats,
|
stats: Stats,
|
||||||
|
flushing: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub enum PollResult {
|
pub enum PollResult {
|
||||||
Forward { id: usize, discont: bool },
|
Forward { id: usize, discont: bool },
|
||||||
|
Drop(usize),
|
||||||
Timeout(Instant),
|
Timeout(Instant),
|
||||||
Empty,
|
Empty,
|
||||||
|
Flushing,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
@ -53,6 +56,7 @@ pub enum QueueResult {
|
||||||
Queued(usize),
|
Queued(usize),
|
||||||
Late,
|
Late,
|
||||||
Duplicate,
|
Duplicate,
|
||||||
|
Flushing,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Eq, Debug)]
|
#[derive(Eq, Debug)]
|
||||||
|
@ -104,6 +108,7 @@ impl JitterBuffer {
|
||||||
num_duplicates: 0,
|
num_duplicates: 0,
|
||||||
num_pushed: 0,
|
num_pushed: 0,
|
||||||
},
|
},
|
||||||
|
flushing: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,7 +126,17 @@ impl JitterBuffer {
|
||||||
QueueResult::Queued(id)
|
QueueResult::Queued(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_flushing(&mut self, flushing: bool) {
|
||||||
|
trace!("Flush changed from {} to {flushing}", self.flushing);
|
||||||
|
self.flushing = flushing;
|
||||||
|
self.last_output_seqnum = None;
|
||||||
|
}
|
||||||
|
|
||||||
pub fn queue_packet(&mut self, rtp: &RtpPacket, mut pts: u64, now: Instant) -> QueueResult {
|
pub fn queue_packet(&mut self, rtp: &RtpPacket, mut pts: u64, now: Instant) -> QueueResult {
|
||||||
|
if self.flushing {
|
||||||
|
return QueueResult::Flushing;
|
||||||
|
}
|
||||||
|
|
||||||
// From this point on we always work with extended sequence numbers
|
// From this point on we always work with extended sequence numbers
|
||||||
let seqnum = self.extended_seqnum.next(rtp.sequence_number());
|
let seqnum = self.extended_seqnum.next(rtp.sequence_number());
|
||||||
|
|
||||||
|
@ -185,6 +200,14 @@ impl JitterBuffer {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn poll(&mut self, now: Instant) -> PollResult {
|
pub fn poll(&mut self, now: Instant) -> PollResult {
|
||||||
|
if self.flushing {
|
||||||
|
if let Some(item) = self.items.pop_first() {
|
||||||
|
return PollResult::Drop(item.id);
|
||||||
|
} else {
|
||||||
|
return PollResult::Flushing;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
trace!("Polling at {:?}", now);
|
trace!("Polling at {:?}", now);
|
||||||
|
|
||||||
let Some((base_instant, base_ts)) = self.base_times else {
|
let Some((base_instant, base_ts)) = self.base_times else {
|
||||||
|
@ -262,6 +285,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn empty() {
|
fn empty() {
|
||||||
let mut jb = JitterBuffer::new(Duration::from_secs(1));
|
let mut jb = JitterBuffer::new(Duration::from_secs(1));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
|
@ -271,6 +295,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn receive_one_packet_no_latency() {
|
fn receive_one_packet_no_latency() {
|
||||||
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
|
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
|
||||||
let packet = RtpPacket::parse(&rtp_data).unwrap();
|
let packet = RtpPacket::parse(&rtp_data).unwrap();
|
||||||
|
@ -287,6 +312,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn receive_one_packet_with_latency() {
|
fn receive_one_packet_with_latency() {
|
||||||
let mut jb = JitterBuffer::new(Duration::from_secs(1));
|
let mut jb = JitterBuffer::new(Duration::from_secs(1));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
|
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
|
||||||
let packet = RtpPacket::parse(&rtp_data).unwrap();
|
let packet = RtpPacket::parse(&rtp_data).unwrap();
|
||||||
|
@ -318,6 +344,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn ordered_packets_no_latency() {
|
fn ordered_packets_no_latency() {
|
||||||
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
|
@ -353,6 +380,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn ordered_packets_no_latency_with_gap() {
|
fn ordered_packets_no_latency_with_gap() {
|
||||||
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
|
@ -387,6 +415,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn misordered_packets_no_latency() {
|
fn misordered_packets_no_latency() {
|
||||||
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
|
@ -425,6 +454,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn ordered_packets_with_latency() {
|
fn ordered_packets_with_latency() {
|
||||||
let mut jb = JitterBuffer::new(Duration::from_secs(1));
|
let mut jb = JitterBuffer::new(Duration::from_secs(1));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
let mut now = Instant::now();
|
let mut now = Instant::now();
|
||||||
|
|
||||||
|
@ -494,6 +524,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn stats() {
|
fn stats() {
|
||||||
let mut jb = JitterBuffer::new(Duration::from_secs(1));
|
let mut jb = JitterBuffer::new(Duration::from_secs(1));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
let mut now = Instant::now();
|
let mut now = Instant::now();
|
||||||
|
|
||||||
|
@ -549,6 +580,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn serialized_items() {
|
fn serialized_items() {
|
||||||
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
|
@ -602,4 +634,36 @@ mod tests {
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn flushing_queue() {
|
||||||
|
let mut jb = JitterBuffer::new(Duration::from_secs(0));
|
||||||
|
jb.set_flushing(false);
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
let rtp_data = generate_rtp_packet(0x12345678, 0, 0, 4);
|
||||||
|
let packet = RtpPacket::parse(&rtp_data).unwrap();
|
||||||
|
|
||||||
|
let QueueResult::Queued(id_first_serialized_item) = jb.queue_serialized_item() else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
let QueueResult::Queued(id_first) = jb.queue_packet(&packet, 0, now) else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Everything after this should eventually return flushing, poll() will instruct to drop
|
||||||
|
// everything stored and then return flushing indefinitely.
|
||||||
|
jb.set_flushing(true);
|
||||||
|
assert_eq!(jb.queue_packet(&packet, 0, now), QueueResult::Flushing);
|
||||||
|
|
||||||
|
assert_eq!(jb.poll(now), PollResult::Drop(id_first_serialized_item));
|
||||||
|
assert_eq!(jb.poll(now), PollResult::Drop(id_first));
|
||||||
|
assert_eq!(jb.poll(now), PollResult::Flushing);
|
||||||
|
assert_eq!(jb.poll(now), PollResult::Flushing);
|
||||||
|
|
||||||
|
jb.set_flushing(false);
|
||||||
|
assert_eq!(jb.poll(now), PollResult::Empty);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue