From 1823ca525e79b4f361b046e341626716da3de65e Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 21 Jan 2020 18:26:07 +0100 Subject: [PATCH] jitterbuffer: don't try to lock in query handlers Instead, add position and latency fields to the PadSrcHandler Fixes #93 --- .../src/jitterbuffer/jitterbuffer.rs | 54 +++++++++++++------ 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs index 622739427..b9bf436d4 100644 --- a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs @@ -38,6 +38,9 @@ use lazy_static::lazy_static; use std::cmp::{max, min, Ordering}; use std::collections::{BTreeSet, VecDeque}; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::Arc; use std::time::Duration; use crate::runtime::prelude::*; @@ -260,8 +263,23 @@ impl PadSinkHandler for JitterBufferPadSinkHandler { } } +#[derive(Debug)] +struct JitterBufferPadSrcHandlerInner { + latency: gst::ClockTime, + position: AtomicU64, +} + #[derive(Clone, Debug)] -struct JitterBufferPadSrcHandler; +struct JitterBufferPadSrcHandler(Arc); + +impl JitterBufferPadSrcHandler { + fn new(latency: gst::ClockTime) -> Self { + JitterBufferPadSrcHandler(Arc::new(JitterBufferPadSrcHandlerInner { + latency, + position: AtomicU64::new(std::u64::MAX), + })) + } +} impl PadSrcHandler for JitterBufferPadSrcHandler { type ElementImpl = JitterBuffer; @@ -285,11 +303,7 @@ impl PadSrcHandler for JitterBufferPadSrcHandler { if ret { let (_, mut min_latency, _) = peer_query.get_result(); - let our_latency = runtime::executor::block_on(jitterbuffer.settings.lock()) - .latency_ms as u64 - * gst::MSECOND; - - min_latency += our_latency; + min_latency += self.0.latency; let max_latency = gst::CLOCK_TIME_NONE; q.set(true, min_latency, max_latency); @@ -301,11 +315,8 @@ impl PadSrcHandler for JitterBufferPadSrcHandler { if q.get_format() != gst::Format::Time { jitterbuffer.sink_pad.gst_pad().peer_query(query) } else { - q.set( - runtime::executor::block_on(jitterbuffer.state.lock()) - .segment - .get_position(), - ); + let position = self.0.position.load(Relaxed); + q.set(gst::ClockTime(Some(position))); true } } @@ -369,6 +380,7 @@ struct State { task_queue_abort_handle: Option, wakeup_abort_handle: Option, wakeup_join_handle: Option>>, + src_pad_handler: JitterBufferPadSrcHandler, } impl Default for State { @@ -398,6 +410,9 @@ impl Default for State { task_queue_abort_handle: None, wakeup_abort_handle: None, wakeup_join_handle: None, + src_pad_handler: JitterBufferPadSrcHandler::new( + DEFAULT_LATENCY_MS as u64 * gst::MSECOND, + ), } } } @@ -905,6 +920,9 @@ impl JitterBuffer { } state.last_popped_pts = buffer.get_pts(); + if let Some(pts) = state.last_popped_pts.nseconds() { + state.src_pad_handler.0.position.store(pts, Relaxed); + } state.last_popped_seqnum = seq; if discont { @@ -1303,15 +1321,21 @@ impl ElementImpl for JitterBuffer { match transition { gst::StateChange::NullToReady => runtime::executor::block_on(async { - let _state = self.state.lock().await; + let mut state = self.state.lock().await; - let context = { + let (context, latency) = { let settings = self.settings.lock().await; - Context::acquire(&settings.context, settings.context_wait).unwrap() + let context = + Context::acquire(&settings.context, settings.context_wait).unwrap(); + let latency = settings.latency_ms as u64 * gst::MSECOND; + (context, latency) }; + + state.src_pad_handler = JitterBufferPadSrcHandler::new(latency); + let _ = self .src_pad - .prepare(context, &JitterBufferPadSrcHandler) + .prepare(context, &state.src_pad_handler) .await .map_err(|err| { gst_error_msg!(