jitterbuffer: handle flush-start/stop

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
This commit is contained in:
Matthew Waters 2024-01-16 16:52:21 +11:00
parent 66306e32f2
commit 48e7a2ed06
2 changed files with 108 additions and 2 deletions

View file

@ -167,7 +167,9 @@ impl futures::stream::Stream for JitterBufferStream {
let mut lowest_wait = None; let mut lowest_wait = None;
let mut jitterbuffer_store = self.store.lock().unwrap(); let mut jitterbuffer_store = self.store.lock().unwrap();
match jitterbuffer_store.jitterbuffer.poll(now) { let ret = jitterbuffer_store.jitterbuffer.poll(now);
gst::trace!(CAT, "jitterbuffer poll ret: {ret:?}");
match ret {
jitterbuffer::PollResult::Flushing => { jitterbuffer::PollResult::Flushing => {
return Poll::Ready(None); return Poll::Ready(None);
} }
@ -176,6 +178,7 @@ impl futures::stream::Stream for JitterBufferStream {
.store .store
.remove(&id) .remove(&id)
.unwrap_or_else(|| panic!("Buffer with id {id} not in store!")); .unwrap_or_else(|| panic!("Buffer with id {id} not in store!"));
cx.waker().wake_by_ref();
} }
jitterbuffer::PollResult::Forward { id, discont } => { jitterbuffer::PollResult::Forward { id, discont } => {
let mut item = jitterbuffer_store let mut item = jitterbuffer_store
@ -1397,6 +1400,9 @@ impl RtpBin2 {
jitterbuffer_store jitterbuffer_store
.store .store
.insert(id, JitterBufferItem::Event(event.clone())); .insert(id, JitterBufferItem::Event(event.clone()));
if let Some(waker) = jitterbuffer_store.waker.take() {
waker.wake();
}
} }
} }
@ -1498,10 +1504,48 @@ impl RtpBin2 {
waker.wake(); waker.wake();
} }
} }
drop(state);
// FIXME: may need to delay sending eos under some circumstances // FIXME: may need to delay sending eos under some circumstances
self.rtp_recv_sink_queue_serialized_event(id, event);
true true
} }
// TODO: need to handle FlushStart/FlushStop through the jitterbuffer queue gst::EventView::FlushStart(_fs) => {
let state = self.state.lock().unwrap();
let mut pause_tasks = vec![];
if let Some(session) = state.session_by_id(id) {
let session = session.inner.lock().unwrap();
for recv_pad in session.rtp_recv_srcpads.iter() {
let mut store = recv_pad.jitter_buffer_store.lock().unwrap();
store.jitterbuffer.set_flushing(true);
if let Some(waker) = store.waker.take() {
waker.wake();
}
pause_tasks.push(recv_pad.pad.clone());
}
}
drop(state);
for pad in pause_tasks {
let _ = pad.pause_task();
}
gst::Pad::event_default(pad, Some(&*self.obj()), event)
}
gst::EventView::FlushStop(_fs) => {
let state = self.state.lock().unwrap();
if let Some(session) = state.session_by_id(id) {
let mut session = session.inner.lock().unwrap();
let pads = session
.rtp_recv_srcpads
.iter()
.map(|r| r.pad.clone())
.collect::<Vec<_>>();
for pad in pads {
// Will reset flushing to false and ensure task is woken up
let _ = session.start_rtp_recv_task(&pad);
}
}
drop(state);
self.rtp_recv_sink_queue_serialized_event(id, event)
}
_ => { _ => {
if event.is_serialized() { if event.is_serialized() {
self.rtp_recv_sink_queue_serialized_event(id, event) self.rtp_recv_sink_queue_serialized_event(id, event)

View file

@ -177,3 +177,65 @@ fn test_receive() {
TEST_SSRC as i32 TEST_SSRC as i32
); );
} }
#[test]
fn test_receive_flush() {
init();
let h = Arc::new(Mutex::new(Harness::with_padnames(
"rtpbin2",
Some("rtp_recv_sink_0"),
None,
)));
let weak_h = Arc::downgrade(&h);
let mut inner = h.lock().unwrap();
inner
.element()
.unwrap()
.connect_pad_added(move |_elem, pad| {
weak_h
.upgrade()
.unwrap()
.lock()
.unwrap()
.add_element_src_pad(pad)
});
inner.play();
let caps = Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", TEST_PT as i32)
.field("clock-rate", TEST_CLOCK_RATE as i32)
.field("encoding-name", "custom-test")
.build();
inner.set_src_caps(caps);
// Cannot push with harness lock as the 'pad-added' handler needs to add the newly created pad to
// the harness and needs to also take the harness lock. Workaround by pushing from the
// internal harness pad directly.
let push_pad = inner
.element()
.unwrap()
.static_pad("rtp_recv_sink_0")
.unwrap()
.peer()
.unwrap();
drop(inner);
push_pad.push(generate_rtp_buffer(500, 20, 9)).unwrap();
push_pad.push(generate_rtp_buffer(501, 30, 11)).unwrap();
let mut inner = h.lock().unwrap();
let seqnum = gst::Seqnum::next();
inner.push_event(gst::event::FlushStart::builder().seqnum(seqnum).build());
inner.push_event(gst::event::FlushStop::builder(false).seqnum(seqnum).build());
let event = inner.pull_event().unwrap();
let gst::EventView::FlushStart(fs) = event.view() else {
unreachable!();
};
assert_eq!(fs.seqnum(), seqnum);
let event = inner.pull_event().unwrap();
let gst::EventView::FlushStop(fs) = event.view() else {
unreachable!();
};
assert_eq!(fs.seqnum(), seqnum);
}