jitterbuffer: don't try to lock in query handlers

Instead, add position and latency fields to the PadSrcHandler

Fixes #93
This commit is contained in:
Mathieu Duponchelle 2020-01-21 18:26:07 +01:00
parent 5047ed480d
commit 1823ca525e

View file

@ -38,6 +38,9 @@ use lazy_static::lazy_static;
use std::cmp::{max, min, Ordering}; use std::cmp::{max, min, Ordering};
use std::collections::{BTreeSet, VecDeque}; use std::collections::{BTreeSet, VecDeque};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
@ -260,8 +263,23 @@ impl PadSinkHandler for JitterBufferPadSinkHandler {
} }
} }
#[derive(Debug)]
struct JitterBufferPadSrcHandlerInner {
latency: gst::ClockTime,
position: AtomicU64,
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct JitterBufferPadSrcHandler; struct JitterBufferPadSrcHandler(Arc<JitterBufferPadSrcHandlerInner>);
impl JitterBufferPadSrcHandler {
fn new(latency: gst::ClockTime) -> Self {
JitterBufferPadSrcHandler(Arc::new(JitterBufferPadSrcHandlerInner {
latency,
position: AtomicU64::new(std::u64::MAX),
}))
}
}
impl PadSrcHandler for JitterBufferPadSrcHandler { impl PadSrcHandler for JitterBufferPadSrcHandler {
type ElementImpl = JitterBuffer; type ElementImpl = JitterBuffer;
@ -285,11 +303,7 @@ impl PadSrcHandler for JitterBufferPadSrcHandler {
if ret { if ret {
let (_, mut min_latency, _) = peer_query.get_result(); let (_, mut min_latency, _) = peer_query.get_result();
let our_latency = runtime::executor::block_on(jitterbuffer.settings.lock()) min_latency += self.0.latency;
.latency_ms as u64
* gst::MSECOND;
min_latency += our_latency;
let max_latency = gst::CLOCK_TIME_NONE; let max_latency = gst::CLOCK_TIME_NONE;
q.set(true, min_latency, max_latency); q.set(true, min_latency, max_latency);
@ -301,11 +315,8 @@ impl PadSrcHandler for JitterBufferPadSrcHandler {
if q.get_format() != gst::Format::Time { if q.get_format() != gst::Format::Time {
jitterbuffer.sink_pad.gst_pad().peer_query(query) jitterbuffer.sink_pad.gst_pad().peer_query(query)
} else { } else {
q.set( let position = self.0.position.load(Relaxed);
runtime::executor::block_on(jitterbuffer.state.lock()) q.set(gst::ClockTime(Some(position)));
.segment
.get_position(),
);
true true
} }
} }
@ -369,6 +380,7 @@ struct State {
task_queue_abort_handle: Option<AbortHandle>, task_queue_abort_handle: Option<AbortHandle>,
wakeup_abort_handle: Option<AbortHandle>, wakeup_abort_handle: Option<AbortHandle>,
wakeup_join_handle: Option<JoinHandle<Result<(), Aborted>>>, wakeup_join_handle: Option<JoinHandle<Result<(), Aborted>>>,
src_pad_handler: JitterBufferPadSrcHandler,
} }
impl Default for State { impl Default for State {
@ -398,6 +410,9 @@ impl Default for State {
task_queue_abort_handle: None, task_queue_abort_handle: None,
wakeup_abort_handle: None, wakeup_abort_handle: None,
wakeup_join_handle: None, wakeup_join_handle: None,
src_pad_handler: JitterBufferPadSrcHandler::new(
DEFAULT_LATENCY_MS as u64 * gst::MSECOND,
),
} }
} }
} }
@ -905,6 +920,9 @@ impl JitterBuffer {
} }
state.last_popped_pts = buffer.get_pts(); state.last_popped_pts = buffer.get_pts();
if let Some(pts) = state.last_popped_pts.nseconds() {
state.src_pad_handler.0.position.store(pts, Relaxed);
}
state.last_popped_seqnum = seq; state.last_popped_seqnum = seq;
if discont { if discont {
@ -1303,15 +1321,21 @@ impl ElementImpl for JitterBuffer {
match transition { match transition {
gst::StateChange::NullToReady => runtime::executor::block_on(async { gst::StateChange::NullToReady => runtime::executor::block_on(async {
let _state = self.state.lock().await; let mut state = self.state.lock().await;
let context = { let (context, latency) = {
let settings = self.settings.lock().await; let settings = self.settings.lock().await;
Context::acquire(&settings.context, settings.context_wait).unwrap() let context =
Context::acquire(&settings.context, settings.context_wait).unwrap();
let latency = settings.latency_ms as u64 * gst::MSECOND;
(context, latency)
}; };
state.src_pad_handler = JitterBufferPadSrcHandler::new(latency);
let _ = self let _ = self
.src_pad .src_pad
.prepare(context, &JitterBufferPadSrcHandler) .prepare(context, &state.src_pad_handler)
.await .await
.map_err(|err| { .map_err(|err| {
gst_error_msg!( gst_error_msg!(