From a15d60105b72c782dc24b5dd2e4cf90321c8653b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Mon, 6 Jan 2020 13:19:11 +0100 Subject: [PATCH] ts: fix FlushStart / FlushStop events handling --- gst-plugin-threadshare/src/appsrc.rs | 157 +++-- .../src/jitterbuffer/jitterbuffer.rs | 134 ++-- gst-plugin-threadshare/src/proxy.rs | 619 ++++++++++++------ gst-plugin-threadshare/src/queue.rs | 232 ++++--- .../src/runtime/executor.rs | 20 +- gst-plugin-threadshare/src/runtime/mod.rs | 2 +- gst-plugin-threadshare/src/runtime/pad.rs | 378 +++++++---- .../src/runtime/pad_context.rs | 15 +- gst-plugin-threadshare/src/runtime/task.rs | 6 +- gst-plugin-threadshare/src/tcpclientsrc.rs | 172 +++-- gst-plugin-threadshare/src/udpsrc.rs | 168 +++-- gst-plugin-threadshare/tests/pad.rs | 485 ++++++++++---- 12 files changed, 1604 insertions(+), 784 deletions(-) diff --git a/gst-plugin-threadshare/src/appsrc.rs b/gst-plugin-threadshare/src/appsrc.rs index 8bb007fd..ae621c05 100644 --- a/gst-plugin-threadshare/src/appsrc.rs +++ b/gst-plugin-threadshare/src/appsrc.rs @@ -19,7 +19,7 @@ use either::Either; use futures::channel::mpsc; use futures::future::BoxFuture; -use futures::lock::{Mutex, MutexGuard}; +use futures::lock::Mutex; use futures::prelude::*; use glib; @@ -32,18 +32,17 @@ use gst; use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace}; -use gst::{EventView, QueryView}; use lazy_static::lazy_static; use rand; use std::convert::TryInto; -use std::sync::Arc; +use std::sync::{self, Arc}; use std::u32; use crate::runtime::prelude::*; -use crate::runtime::{self, Context, PadSrc, PadSrcRef}; +use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef}; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: u32 = 0; @@ -139,33 +138,32 @@ enum StreamItem { } #[derive(Debug)] -struct AppSrcPadHandlerInner { +struct AppSrcPadHandlerState { need_initial_events: bool, + caps: Option, configured_caps: Option, } -impl Default for AppSrcPadHandlerInner { +impl Default for AppSrcPadHandlerState { fn default() -> Self { - AppSrcPadHandlerInner { + AppSrcPadHandlerState { need_initial_events: true, + caps: None, configured_caps: None, } } } -#[derive(Clone, Debug)] -struct AppSrcPadHandler(Arc>); +#[derive(Debug, Default)] +struct AppSrcPadHandlerInner { + state: sync::RwLock, + flush_join_handle: sync::Mutex>>>, +} + +#[derive(Clone, Debug, Default)] +struct AppSrcPadHandler(Arc); impl AppSrcPadHandler { - fn new() -> Self { - AppSrcPadHandler(Arc::new(Mutex::new(AppSrcPadHandlerInner::default()))) - } - - #[inline] - async fn lock(&self) -> MutexGuard<'_, AppSrcPadHandlerInner> { - self.0.lock().await - } - async fn start_task( &self, pad: PadSrcRef<'_>, @@ -206,8 +204,13 @@ impl AppSrcPadHandler { let mut events = Vec::new(); { - let mut inner = self.lock().await; - if inner.need_initial_events { + // Only `read` the state in the hot path + if self.0.state.read().unwrap().need_initial_events { + // We will need to `write` and we also want to prevent + // any changes on the state while we are handling initial events + let mut state = self.0.state.write().unwrap(); + assert!(state.need_initial_events); + gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let stream_id = @@ -217,17 +220,17 @@ impl AppSrcPadHandler { .group_id(gst::util_group_id_next()) .build(), ); - let appsrc = AppSrc::from_instance(element); - if let Some(ref caps) = appsrc.settings.lock().await.caps { + + if let Some(ref caps) = state.caps { events.push(gst::Event::new_caps(&caps).build()); - inner.configured_caps = Some(caps.clone()); + state.configured_caps = Some(caps.clone()); } events.push( gst::Event::new_segment(&gst::FormattedSegment::::new()) .build(), ); - inner.need_initial_events = false; + state.need_initial_events = false; } } @@ -238,11 +241,11 @@ impl AppSrcPadHandler { let res = match item { StreamItem::Buffer(buffer) => { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding buffer {:?}", buffer); + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); pad.push(buffer).await } StreamItem::Event(event) => { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); pad.push_event(event).await; Ok(gst::FlowSuccess::Ok) } @@ -270,26 +273,69 @@ impl PadSrcHandler for AppSrcPadHandler { fn src_event( &self, - pad: PadSrcRef, - app_src: &AppSrc, + pad: &PadSrcRef, + _app_src: &AppSrc, element: &gst::Element, event: gst::Event, ) -> Either> { - gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + use gst::EventView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { EventView::FlushStart(..) => { - let _ = runtime::executor::block_on(app_src.pause(element)); + let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); + if flush_join_handle.is_none() { + let element = element.clone(); + let pad_weak = pad.downgrade(); + + *flush_join_handle = Some(pad.spawn(async move { + let res = AppSrc::from_instance(&element).pause(&element).await; + let pad = pad_weak.upgrade().unwrap(); + if res.is_ok() { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete"); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed"); + } + + res + })); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); + } + true } EventView::FlushStop(..) => { - let (res, state, pending) = element.get_state(0.into()); - if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing - || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing - { - let _ = runtime::executor::block_on(app_src.start(element)); + let element = element.clone(); + let inner_weak = Arc::downgrade(&self.0); + let pad_weak = pad.downgrade(); + + let fut = async move { + let mut ret = false; + + let pad = pad_weak.upgrade().unwrap(); + let inner_weak = inner_weak.upgrade().unwrap(); + let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take(); + if let Some(flush_join_handle) = flush_join_handle { + if let Ok(Ok(())) = flush_join_handle.await { + ret = AppSrc::from_instance(&element) + .start(&element) + .await + .is_ok(); + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop complete"); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed"); + } + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); + } + + ret } - true + .boxed(); + + return Either::Right(fut); } EventView::Reconfigure(..) => true, EventView::Latency(..) => true, @@ -297,9 +343,9 @@ impl PadSrcHandler for AppSrcPadHandler { }; if ret { - gst_log!(CAT, obj: pad.gst_pad(), "Handled event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event); } else { - gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); } Either::Left(ret) @@ -307,12 +353,14 @@ impl PadSrcHandler for AppSrcPadHandler { fn src_query( &self, - pad: PadSrcRef, + pad: &PadSrcRef, _app_src: &AppSrc, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + use gst::QueryView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); let ret = match query.view_mut() { QueryView::Latency(ref mut q) => { q.set(true, 0.into(), 0.into()); @@ -324,8 +372,8 @@ impl PadSrcHandler for AppSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let inner = runtime::executor::block_on(self.lock()); - let caps = if let Some(ref caps) = inner.configured_caps { + let state = self.0.state.read().unwrap(); + let caps = if let Some(ref caps) = state.configured_caps { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -343,9 +391,9 @@ impl PadSrcHandler for AppSrcPadHandler { }; if ret { - gst_log!(CAT, obj: pad.gst_pad(), "Handled query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query); } else { - gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query); } ret } @@ -420,6 +468,9 @@ impl AppSrc { let context = { let settings = self.settings.lock().await; + + self.src_pad_handler.0.state.write().unwrap().caps = settings.caps.clone(); + Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -449,7 +500,12 @@ impl AppSrc { self.src_pad.stop_task().await; let _ = self.src_pad.unprepare().await; - self.src_pad_handler.lock().await.configured_caps = None; + self.src_pad_handler + .0 + .state + .write() + .unwrap() + .configured_caps = None; gst_debug!(CAT, obj: element, "Unprepared"); @@ -565,7 +621,7 @@ impl ObjectSubclass for AppSrc { Self { src_pad, - src_pad_handler: AppSrcPadHandler::new(), + src_pad_handler: AppSrcPadHandler::default(), state: Mutex::new(State::default()), settings: Mutex::new(Settings::default()), } @@ -663,9 +719,12 @@ impl ElementImpl for AppSrc { .map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - runtime::executor::block_on(async { - self.src_pad_handler.lock().await.need_initial_events = true; - }); + self.src_pad_handler + .0 + .state + .write() + .unwrap() + .need_initial_events = true; } _ => (), } diff --git a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs index 33f6e290..62273942 100644 --- a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs @@ -32,7 +32,6 @@ use gst; use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_error_msg, gst_info, gst_log, gst_trace}; -use gst::{EventView, QueryView}; use gst_rtp::RTPBuffer; use lazy_static::lazy_static; @@ -43,7 +42,7 @@ use std::time::Duration; use crate::runtime::prelude::*; use crate::runtime::{ - self, Context, JoinHandle, PadContext, PadContextRef, PadSink, PadSinkRef, PadSrc, PadSrcRef, + self, Context, JoinHandle, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, }; use super::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; @@ -160,7 +159,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler { fn sink_chain( &self, - pad: PadSinkRef, + pad: &PadSinkRef, _jitterbuffer: &JitterBuffer, element: &gst::Element, buffer: gst::Buffer, @@ -170,7 +169,7 @@ impl PadSinkHandler for JitterBufferPadSinkHandler { async move { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - gst_debug!(CAT, obj: pad.gst_pad(), "Handling buffer {:?}", buffer); + gst_debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); let jitterbuffer = JitterBuffer::from_instance(&element); jitterbuffer .enqueue_item(pad.gst_pad(), &element, Some(buffer)) @@ -181,69 +180,74 @@ impl PadSinkHandler for JitterBufferPadSinkHandler { fn sink_event( &self, - pad: PadSinkRef, + pad: &PadSinkRef, jitterbuffer: &JitterBuffer, element: &gst::Element, event: gst::Event, ) -> Either> { + use gst::EventView; + if event.is_serialized() { let pad_weak = pad.downgrade(); let element = element.clone(); - Either::Right(async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + Either::Right( + async move { + let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - let mut forward = true; + let mut forward = true; - gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); - let jitterbuffer = JitterBuffer::from_instance(&element); - match event.view() { - EventView::Segment(e) => { - let mut state = jitterbuffer.state.lock().await; - state.segment = e - .get_segment() - .clone() - .downcast::() - .unwrap(); - } - EventView::Eos(..) => { - let mut state = jitterbuffer.state.lock().await; - jitterbuffer.drain(&mut state, &element).await; - } - EventView::CustomDownstreamSticky(e) => { - if PadContext::is_pad_context_sticky_event(&e) { - forward = false; + let jitterbuffer = JitterBuffer::from_instance(&element); + match event.view() { + EventView::FlushStop(..) => { + jitterbuffer.flush(&element).await; } + EventView::Segment(e) => { + let mut state = jitterbuffer.state.lock().await; + state.segment = e + .get_segment() + .clone() + .downcast::() + .unwrap(); + } + EventView::Eos(..) => { + let mut state = jitterbuffer.state.lock().await; + jitterbuffer.drain(&mut state, &element).await; + } + EventView::CustomDownstreamSticky(e) => { + if PadContext::is_pad_context_sticky_event(&e) { + forward = false; + } + } + _ => (), + }; + + if forward { + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event); + jitterbuffer.src_pad.push_event(event).await + } else { + true } - _ => (), - }; - - if forward { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized event {:?}", event); - jitterbuffer.src_pad.push_event(event).await - } else { - true } - }.boxed()) + .boxed(), + ) } else { - if let EventView::FlushStop(..) = event.view() { - runtime::executor::block_on(jitterbuffer.flush(element)); - } - - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event); - + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); Either::Left(jitterbuffer.src_pad.gst_pad().push_event(event)) } } fn sink_query( &self, - pad: PadSinkRef, + pad: &PadSinkRef, jitterbuffer: &JitterBuffer, element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query); + use gst::QueryView; + + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); match query.view_mut() { QueryView::Drain(..) => { @@ -264,12 +268,14 @@ impl PadSrcHandler for JitterBufferPadSrcHandler { fn src_query( &self, - pad: PadSrcRef, + pad: &PadSrcRef, jitterbuffer: &JitterBuffer, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query); + use gst::QueryView; + + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); match query.view_mut() { QueryView::Latency(ref mut q) => { @@ -433,7 +439,7 @@ impl JitterBuffer { ) -> Result { let s = caps.get_structure(0).ok_or(gst::FlowError::Error)?; - gst_info!(CAT, obj: element, "Parsing caps: {:?}", caps); + gst_info!(CAT, obj: element, "Parsing {:?}", caps); let payload = s .get_some::("payload") @@ -907,7 +913,7 @@ impl JitterBuffer { state.num_pushed += 1; - gst_debug!(CAT, obj: self.src_pad.gst_pad(), "Pushing buffer {:?} with seq {}", buffer, seq); + gst_debug!(CAT, obj: self.src_pad.gst_pad(), "Pushing {:?} with seq {}", buffer, seq); self.src_pad.push(buffer.to_owned()).await } @@ -954,9 +960,6 @@ impl JitterBuffer { let _ = wakeup_join_handle.await; } - let pad_src_state = self.src_pad.lock_state().await; - let pad_ctx = pad_src_state.pad_context().unwrap(); - gst_debug!(CAT, obj: element, "Scheduling wakeup in {}", delay); let (wakeup_fut, abort_handle) = abortable(Self::wakeup_fut( @@ -964,9 +967,9 @@ impl JitterBuffer { latency_ns, context_wait_ns, &element, - &pad_ctx, + self.src_pad.downgrade(), )); - state.wakeup_join_handle = Some(pad_ctx.spawn(wakeup_fut)); + state.wakeup_join_handle = Some(self.src_pad.spawn(wakeup_fut)); state.wakeup_abort_handle = Some(abort_handle); } @@ -975,17 +978,22 @@ impl JitterBuffer { latency_ns: gst::ClockTime, context_wait_ns: gst::ClockTime, element: &gst::Element, - pad_ctx: &PadContextRef, + pad_src_weak: PadSrcWeak, ) -> BoxFuture<'static, ()> { let element = element.clone(); - let pad_ctx_weak = pad_ctx.downgrade(); async move { runtime::time::delay_for(delay).await; let jb = Self::from_instance(&element); let mut state = jb.state.lock().await; - let pad_ctx = match pad_ctx_weak.upgrade() { + let pad_src = match pad_src_weak.upgrade() { + Some(pad_src) => pad_src, + None => return, + }; + + let pad_ctx = pad_src.pad_context(); + let pad_ctx = match pad_ctx.upgrade() { Some(pad_ctx) => pad_ctx, None => return, }; @@ -1013,7 +1021,7 @@ impl JitterBuffer { let (abortable_drain, abort_handle) = abortable(drain_fut); state.task_queue_abort_handle = Some(abort_handle); - pad_ctx.spawn(abortable_drain.map(drop)); + pad_src.spawn(abortable_drain.map(drop)); } else { state.task_queue_abort_handle = None; } @@ -1190,34 +1198,42 @@ impl ObjectImpl for JitterBuffer { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; - let mut settings = runtime::executor::block_on(self.settings.lock()); match *prop { subclass::Property("latency", ..) => { - settings.latency_ms = value.get_some().expect("type checked upstream"); + let latency_ms = { + let mut settings = runtime::executor::block_on(self.settings.lock()); + settings.latency_ms = value.get_some().expect("type checked upstream"); + settings.latency_ms as u64 + }; runtime::executor::block_on(self.state.lock()) .jbuf .borrow() - .set_delay(settings.latency_ms as u64 * gst::MSECOND); + .set_delay(latency_ms * gst::MSECOND); /* TODO: post message */ } subclass::Property("do-lost", ..) => { + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.do_lost = value.get_some().expect("type checked upstream"); } subclass::Property("max-dropout-time", ..) => { + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.max_dropout_time = value.get_some().expect("type checked upstream"); } subclass::Property("max-misorder-time", ..) => { + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.max_misorder_time = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { + let mut settings = runtime::executor::block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs index 43cdf2ff..68d4e140 100644 --- a/gst-plugin-threadshare/src/proxy.rs +++ b/gst-plugin-threadshare/src/proxy.rs @@ -32,21 +32,22 @@ use gst; use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace}; -use gst::{EventView, QueryView}; use lazy_static::lazy_static; use std::collections::{HashMap, VecDeque}; -use std::sync::{Arc, Weak}; +use std::sync::{self, Arc, Weak}; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::runtime::{ + self, Context, JoinHandle, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak, +}; use super::dataqueue::{DataQueue, DataQueueItem}; lazy_static! { - static ref CONTEXTS: Mutex>>> = + static ref PROXY_CONTEXTS: Mutex>>> = Mutex::new(HashMap::new()); } @@ -184,114 +185,156 @@ impl PendingQueue { } } +/// ProxySrc fields which are necessary for ProxySink #[derive(Debug)] -struct SharedQueueInner { +struct SharedSrc { + ts_ctx: Context, + src_pad: PadSrcWeak, +} + +/// ProxySink fields which are necessary for ProxySrc +#[derive(Debug)] +struct SharedSink { + sink_pad: PadSinkWeak, +} + +#[derive(Debug)] +struct ProxyContextInner { name: String, dataqueue: Option, last_res: Result, pending_queue: Option, have_sink: bool, have_src: bool, + shared_src_tx: Option>, + shared_src_rx: Option>, + shared_sink_tx: Option>, + shared_sink_rx: Option>, } -impl SharedQueueInner { +impl ProxyContextInner { async fn unprepare(&self) { - let mut contexts = CONTEXTS.lock().await; - contexts.remove(&self.name); + let mut proxy_ctxs = PROXY_CONTEXTS.lock().await; + proxy_ctxs.remove(&self.name); } } -impl Drop for SharedQueueInner { +impl Drop for ProxyContextInner { fn drop(&mut self) { - // Check invariants which can't be held automatically in `SharedQueue` + // Check invariants which can't be held automatically in `ProxyContext` // because `drop` can't be `async` if self.pending_queue.is_some() || self.dataqueue.is_some() { - panic!("Missing call to `SharedQueue::unprepare`"); + panic!("Missing call to `ProxyContext::unprepare`"); } } } #[derive(Debug)] -struct SharedQueue { - context: Arc>, +struct ProxyContext { + shared: Arc>, as_sink: bool, } -impl SharedQueue { +impl ProxyContext { #[inline] - async fn lock(&self) -> MutexGuard<'_, SharedQueueInner> { - self.context.lock().await + async fn lock_shared(&self) -> MutexGuard<'_, ProxyContextInner> { + self.shared.lock().await } async fn get(name: &str, as_sink: bool) -> Option { - let mut contexts = CONTEXTS.lock().await; + let mut proxy_ctxs = PROXY_CONTEXTS.lock().await; - let mut queue = None; - if let Some(context) = contexts.get(name) { - if let Some(context) = context.upgrade() { + let mut proxy_ctx = None; + if let Some(shared_weak) = proxy_ctxs.get(name) { + if let Some(shared) = shared_weak.upgrade() { { - let inner = context.lock().await; - if (inner.have_sink && as_sink) || (inner.have_src && !as_sink) { + let shared = shared.lock().await; + if (shared.have_sink && as_sink) || (shared.have_src && !as_sink) { return None; } } - let share_queue = SharedQueue { context, as_sink }; - { - let mut inner = share_queue.context.lock().await; - if as_sink { - inner.have_sink = true; - } else { - inner.have_src = true; + proxy_ctx = Some({ + let proxy_ctx = ProxyContext { shared, as_sink }; + { + let mut shared = proxy_ctx.lock_shared().await; + if as_sink { + shared.have_sink = true; + } else { + shared.have_src = true; + } } - } - queue = Some(share_queue); + proxy_ctx + }); } } - if queue.is_none() { - let context = Arc::new(Mutex::new(SharedQueueInner { + if proxy_ctx.is_none() { + let (shared_src_tx, shared_src_rx) = oneshot::channel(); + let (shared_sink_tx, shared_sink_rx) = oneshot::channel(); + let shared = Arc::new(Mutex::new(ProxyContextInner { name: name.into(), dataqueue: None, last_res: Err(gst::FlowError::Flushing), pending_queue: None, have_sink: as_sink, have_src: !as_sink, + shared_src_tx: Some(shared_src_tx), + shared_src_rx: Some(shared_src_rx), + shared_sink_tx: Some(shared_sink_tx), + shared_sink_rx: Some(shared_sink_rx), })); - contexts.insert(name.into(), Arc::downgrade(&context)); + proxy_ctxs.insert(name.into(), Arc::downgrade(&shared)); - queue = Some(SharedQueue { context, as_sink }); + proxy_ctx = Some(ProxyContext { shared, as_sink }); } - queue + proxy_ctx } async fn unprepare(&self) { - let mut inner = self.context.lock().await; + let mut shared_ctx = self.lock_shared().await; if self.as_sink { - assert!(inner.have_sink); - inner.have_sink = false; - let _ = inner.pending_queue.take(); + assert!(shared_ctx.have_sink); + shared_ctx.have_sink = false; + let _ = shared_ctx.pending_queue.take(); } else { - assert!(inner.have_src); - inner.have_src = false; - let _ = inner.dataqueue.take(); + assert!(shared_ctx.have_src); + shared_ctx.have_src = false; + let _ = shared_ctx.dataqueue.take(); } - inner.unprepare().await; + shared_ctx.unprepare().await; } } +#[derive(Debug)] +struct ProxySinkPadHandlerInner { + flush_join_handle: sync::Mutex>>>, + ts_ctx: Context, + src_pad: PadSrcWeak, +} + #[derive(Clone, Debug)] -struct ProxySinkPadHandler; +struct ProxySinkPadHandler(Arc); + +impl ProxySinkPadHandler { + fn new(ts_ctx: Context, src_pad: PadSrcWeak) -> Self { + ProxySinkPadHandler(Arc::new(ProxySinkPadHandlerInner { + flush_join_handle: sync::Mutex::new(None), + ts_ctx, + src_pad, + })) + } +} impl PadSinkHandler for ProxySinkPadHandler { type ElementImpl = ProxySink; fn sink_chain( &self, - pad: PadSinkRef, + pad: &PadSinkRef, _proxysink: &ProxySink, element: &gst::Element, buffer: gst::Buffer, @@ -301,7 +344,7 @@ impl PadSinkHandler for ProxySinkPadHandler { async move { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling buffer {:?}", buffer); + gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); let proxysink = ProxySink::from_instance(&element); proxysink .enqueue_item(&element, DataQueueItem::Buffer(buffer)) @@ -312,7 +355,7 @@ impl PadSinkHandler for ProxySinkPadHandler { fn sink_chain_list( &self, - pad: PadSinkRef, + pad: &PadSinkRef, _proxysink: &ProxySink, element: &gst::Element, list: gst::BufferList, @@ -321,7 +364,7 @@ impl PadSinkHandler for ProxySinkPadHandler { let element = element.clone(); async move { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list); + gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list); let proxysink = ProxySink::from_instance(&element); proxysink .enqueue_item(&element, DataQueueItem::BufferList(list)) @@ -332,26 +375,49 @@ impl PadSinkHandler for ProxySinkPadHandler { fn sink_event( &self, - pad: PadSinkRef, - proxysink: &ProxySink, + pad: &PadSinkRef, + _proxysink: &ProxySink, element: &gst::Element, event: gst::Event, ) -> Either> { + use gst::EventView; + if event.is_serialized() { let pad_weak = pad.downgrade(); let element = element.clone(); + let inner_weak = Arc::downgrade(&self.0); + Either::Right( async move { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", event); let proxysink = ProxySink::from_instance(&element); - if let EventView::Eos(..) = event.view() { - let _ = element - .post_message(&gst::Message::new_eos().src(Some(&element)).build()); + + match event.view() { + EventView::Eos(..) => { + let _ = element + .post_message(&gst::Message::new_eos().src(Some(&element)).build()); + } + EventView::FlushStop(..) => { + let inner = inner_weak.upgrade().unwrap(); + + let flush_join_handle = inner.flush_join_handle.lock().unwrap().take(); + if let Some(flush_join_handle) = flush_join_handle { + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete"); + if let Ok(Ok(())) = flush_join_handle.await { + let _ = proxysink.start(&element).await; + } else { + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: FlushStart failed to complete"); + } + } else { + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); + } + } + _ => (), } - gst_log!(SINK_CAT, obj: pad.gst_pad(), "Queuing event {:?}", event); + gst_log!(SINK_CAT, obj: pad.gst_pad(), "Queuing {:?}", event); let _ = proxysink .enqueue_item(&element, DataQueueItem::Event(event)) .await; @@ -361,48 +427,48 @@ impl PadSinkHandler for ProxySinkPadHandler { .boxed(), ) } else { - match event.view() { - EventView::FlushStart(..) => { - let _ = runtime::executor::block_on(proxysink.stop(element)); + if let EventView::FlushStart(..) = event.view() { + let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); + if flush_join_handle.is_none() { + gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + let element = element.clone(); + *flush_join_handle = Some(self.0.ts_ctx.spawn(async move { + ProxySink::from_instance(&element).stop(&element).await + })); + } else { + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); } - EventView::FlushStop(..) => { - let (res, state, pending) = element.get_state(0.into()); - if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Paused - || res == Ok(gst::StateChangeSuccess::Async) - && pending == gst::State::Paused - { - let _ = runtime::executor::block_on(proxysink.start(&element)); - } - } - _ => (), } - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event); - // FIXME proxysink can't forward directly to the src_pad of the proxysrc - let _ = runtime::executor::block_on( - proxysink.enqueue_item(&element, DataQueueItem::Event(event)), - ); + gst_log!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized {:?}", event); - Either::Left(true) + Either::Left( + self.0 + .src_pad + .upgrade() + .expect("PadSrc no longer available") + .gst_pad() + .push_event(event), + ) } } } #[derive(Debug)] struct StateSink { - queue: Option, + proxy_ctx: Option, } impl StateSink { #[inline] - async fn lock_queue(&self) -> MutexGuard<'_, SharedQueueInner> { - self.queue.as_ref().unwrap().lock().await + fn proxy_ctx(&self) -> &ProxyContext { + self.proxy_ctx.as_ref().unwrap() } } impl Default for StateSink { - fn default() -> StateSink { - StateSink { queue: None } + fn default() -> Self { + StateSink { proxy_ctx: None } } } @@ -439,19 +505,19 @@ impl ProxySink { let more_queue_space_receiver = { let state = sink.state.lock().await; - let queue_opt = state.queue.as_ref(); - if queue_opt.is_none() { + let proxy_ctx = state.proxy_ctx.as_ref(); + if proxy_ctx.is_none() { return; } - let mut queue = queue_opt.unwrap().context.lock().await; + let mut shared_ctx = proxy_ctx.unwrap().lock_shared().await; gst_log!(SINK_CAT, obj: &element, "Trying to empty pending queue"); - let SharedQueueInner { + let ProxyContextInner { pending_queue: ref mut pq, ref dataqueue, .. - } = *queue; + } = *shared_ctx; if let Some(ref mut pending_queue) = *pq { if let Some(ref dataqueue) = dataqueue { @@ -500,15 +566,15 @@ impl ProxySink { let wait_fut = { let state = self.state.lock().await; - let queue = state.queue.as_ref().ok_or(gst::FlowError::Error)?; - let mut queue = queue.lock().await; + let proxy_ctx = state.proxy_ctx.as_ref().ok_or(gst::FlowError::Error)?; + let mut shared_ctx = proxy_ctx.lock_shared().await; let item = { - let SharedQueueInner { + let ProxyContextInner { ref mut pending_queue, ref dataqueue, .. - } = *queue; + } = *shared_ctx; match (pending_queue, dataqueue) { (None, Some(ref dataqueue)) => dataqueue.push(item).await, @@ -537,17 +603,17 @@ impl ProxySink { }; if let Err(item) = item { - if queue + if shared_ctx .pending_queue .as_ref() .map(|pending_queue| !pending_queue.scheduled) .unwrap_or(true) { - if queue.pending_queue.is_none() { - queue.pending_queue = Some(PendingQueue::default()); + if shared_ctx.pending_queue.is_none() { + shared_ctx.pending_queue = Some(PendingQueue::default()); } - let pending_queue = queue.pending_queue.as_mut().unwrap(); + let pending_queue = shared_ctx.pending_queue.as_mut().unwrap(); let schedule_now = match item { DataQueueItem::Event(ref ev) if ev.get_type() != gst::EventType::Eos => { @@ -573,7 +639,12 @@ impl ProxySink { None } } else { - queue.pending_queue.as_mut().unwrap().items.push_back(item); + shared_ctx + .pending_queue + .as_mut() + .unwrap() + .items + .push_back(item); None } @@ -592,41 +663,86 @@ impl ProxySink { } let state = self.state.lock().await; - let queue = state.queue.as_ref().unwrap().lock().await; - queue.last_res + let shared_ctx = state.proxy_ctx().lock_shared().await; + shared_ctx.last_res } async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().await; gst_debug!(SINK_CAT, obj: element, "Preparing"); - state.queue = match SharedQueue::get(&self.settings.lock().await.proxy_context, true).await - { - Some(queue) => Some(queue), - None => { - return Err(gst_error_msg!( + let proxy_ctx = ProxyContext::get(&self.settings.lock().await.proxy_context, true) + .await + .ok_or_else(|| { + gst_error_msg!( gst::ResourceError::OpenRead, - ["Failed to create or get queue"] - )); - } - }; + ["Failed to create or get ProxyContext"] + ) + })?; - self.sink_pad.prepare(&ProxySinkPadHandler {}).await; + { + let mut shared_ctx = proxy_ctx.lock_shared().await; + assert!(shared_ctx.shared_src_rx.is_some()); + shared_ctx + .shared_sink_tx + .take() + .unwrap() + .send(SharedSink { + sink_pad: self.sink_pad.downgrade(), + }) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to send SharedSink: {:?}", err] + ) + })?; + } + + state.proxy_ctx = Some(proxy_ctx); gst_debug!(SINK_CAT, obj: element, "Prepared"); Ok(()) } + async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + let state = self.state.lock().await; + + let shared_src_rx = state.proxy_ctx().lock_shared().await.shared_src_rx.take(); + if shared_src_rx.is_none() { + gst_log!(SINK_CAT, obj: element, "Preparation already completed"); + return Ok(()); + } + + gst_debug!(SINK_CAT, obj: element, "Completing preparation"); + + let SharedSrc { ts_ctx, src_pad } = shared_src_rx.unwrap().await.map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to receive SharedSrc: {:?}", err] + ) + })?; + + self.sink_pad + .prepare(&ProxySinkPadHandler::new(ts_ctx, src_pad)) + .await; + + gst_debug!(SINK_CAT, obj: element, "Preparation completed"); + + Ok(()) + } + async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { let mut state = self.state.lock().await; gst_debug!(SINK_CAT, obj: element, "Unpreparing"); - state.queue.as_ref().unwrap().unprepare().await; - *state = StateSink::default(); + let proxy_ctx = state.proxy_ctx.take().unwrap(); + proxy_ctx.unprepare().await; self.sink_pad.unprepare().await; + *state = StateSink::default(); + gst_debug!(SINK_CAT, obj: element, "Unprepared"); Ok(()) } @@ -635,8 +751,8 @@ impl ProxySink { let state = self.state.lock().await; gst_debug!(SINK_CAT, obj: element, "Starting"); - let mut queue = state.lock_queue().await; - queue.last_res = Ok(gst::FlowSuccess::Ok); + let mut shared_ctx = state.proxy_ctx().lock_shared().await; + shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); gst_debug!(SINK_CAT, obj: element, "Started"); @@ -647,9 +763,9 @@ impl ProxySink { let state = self.state.lock().await; gst_debug!(SINK_CAT, obj: element, "Stopping"); - let mut queue = state.lock_queue().await; - let _ = queue.pending_queue.take(); - queue.last_res = Err(gst::FlowError::Flushing); + let mut shared_ctx = state.proxy_ctx().lock_shared().await; + let _ = shared_ctx.pending_queue.take(); + shared_ctx.last_res = Err(gst::FlowError::Flushing); gst_debug!(SINK_CAT, obj: element, "Stopped"); @@ -705,9 +821,9 @@ impl ObjectImpl for ProxySink { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES_SINK[id]; + let mut settings = runtime::executor::block_on(self.settings.lock()); match *prop { subclass::Property("proxy-context", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); settings.proxy_context = value .get() .expect("type checked upstream") @@ -720,11 +836,9 @@ impl ObjectImpl for ProxySink { fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { let prop = &PROPERTIES_SINK[id]; + let settings = runtime::executor::block_on(self.settings.lock()); match *prop { - subclass::Property("proxy-context", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); - Ok(settings.proxy_context.to_value()) - } + subclass::Property("proxy-context", ..) => Ok(settings.proxy_context.to_value()), _ => unimplemented!(), } } @@ -754,6 +868,10 @@ impl ElementImpl for ProxySink { gst::StateChangeError })?; } + gst::StateChange::ReadyToPaused => { + runtime::executor::block_on(self.complete_preparation(element)) + .map_err(|_| gst::StateChangeError)?; + } gst::StateChange::PausedToReady => { runtime::executor::block_on(self.stop(element)) .map_err(|_| gst::StateChangeError)?; @@ -775,10 +893,23 @@ impl ElementImpl for ProxySink { } } +#[derive(Debug)] +struct ProxySrcPadHandlerInner { + flush_join_handle: sync::Mutex>>>, + sink_pad: PadSinkWeak, +} + #[derive(Clone, Debug)] -struct ProxySrcPadHandler; +struct ProxySrcPadHandler(Arc); impl ProxySrcPadHandler { + fn new(sink_pad: PadSinkWeak) -> Self { + ProxySrcPadHandler(Arc::new(ProxySrcPadHandlerInner { + flush_join_handle: sync::Mutex::new(None), + sink_pad, + })) + } + async fn start_task(pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) { let pad_weak = pad.downgrade(); let element = element.clone(); @@ -811,23 +942,23 @@ impl ProxySrcPadHandler { { let state = proxysrc.state.lock().await; - let mut queue = state.queue.as_ref().unwrap().lock().await; - if let Some(ref mut pending_queue) = queue.pending_queue { + let mut shared_ctx = state.proxy_ctx().lock_shared().await; + if let Some(ref mut pending_queue) = shared_ctx.pending_queue { pending_queue.notify_more_queue_space(); } } let res = match item { DataQueueItem::Buffer(buffer) => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding buffer {:?}", buffer); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); pad.push(buffer).await.map(drop) } DataQueueItem::BufferList(list) => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding buffer list {:?}", list); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding {:?}", list); pad.push_list(list).await.map(drop) } DataQueueItem::Event(event) => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding event {:?}", event); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); pad.push_event(event).await; Ok(()) } @@ -837,22 +968,22 @@ impl ProxySrcPadHandler { Ok(_) => { gst_log!(SRC_CAT, obj: pad.gst_pad(), "Successfully pushed item"); let state = proxysrc.state.lock().await; - let mut queue = state.queue.as_ref().unwrap().lock().await; - queue.last_res = Ok(gst::FlowSuccess::Ok); + let mut shared_ctx = state.proxy_ctx().lock_shared().await; + shared_ctx.last_res = Ok(gst::FlowSuccess::Ok); } Err(gst::FlowError::Flushing) => { gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing"); let state = proxysrc.state.lock().await; pad.pause_task().await; - let mut queue = state.queue.as_ref().unwrap().lock().await; - queue.last_res = Err(gst::FlowError::Flushing); + let mut shared_ctx = state.proxy_ctx().lock_shared().await; + shared_ctx.last_res = Err(gst::FlowError::Flushing); } Err(gst::FlowError::Eos) => { gst_debug!(SRC_CAT, obj: pad.gst_pad(), "EOS"); let state = proxysrc.state.lock().await; pad.pause_task().await; - let mut queue = state.queue.as_ref().unwrap().lock().await; - queue.last_res = Err(gst::FlowError::Eos); + let mut shared_ctx = state.proxy_ctx().lock_shared().await; + shared_ctx.last_res = Err(gst::FlowError::Eos); } Err(err) => { gst_error!(SRC_CAT, obj: pad.gst_pad(), "Got error {}", err); @@ -863,8 +994,8 @@ impl ProxySrcPadHandler { ["streaming stopped, reason {}", err] ); let state = proxysrc.state.lock().await; - let mut queue = state.queue.as_ref().unwrap().lock().await; - queue.last_res = Err(err); + let mut shared_ctx = state.proxy_ctx().lock_shared().await; + shared_ctx.last_res = Err(err); } } } @@ -875,50 +1006,104 @@ impl PadSrcHandler for ProxySrcPadHandler { fn src_event( &self, - pad: PadSrcRef, - proxysrc: &ProxySrc, + pad: &PadSrcRef, + _proxysrc: &ProxySrc, element: &gst::Element, event: gst::Event, ) -> Either> { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + use gst::EventView; - let ret = match event.view() { - EventView::FlushStart(..) => { - let _ = runtime::executor::block_on(proxysrc.pause(element)); - true - } - EventView::FlushStop(..) => { - let (res, state, pending) = element.get_state(0.into()); - if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing - || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing - { - let _ = runtime::executor::block_on(proxysrc.start(element)); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + if event.is_serialized() { + let element = element.clone(); + let inner_weak = Arc::downgrade(&self.0); + let src_pad_weak = pad.downgrade(); + let sink_pad_weak = self.0.sink_pad.clone(); + + Either::Right( + async move { + let ret = if let EventView::FlushStop(..) = event.view() { + let mut ret = false; + + let src_pad = src_pad_weak.upgrade().unwrap(); + let inner_weak = inner_weak.upgrade().unwrap(); + let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take(); + if let Some(flush_join_handle) = flush_join_handle { + gst_debug!(SRC_CAT, obj: src_pad.gst_pad(), "Waiting for FlushStart to complete"); + if let Ok(Ok(())) = flush_join_handle.await { + ret = ProxySrc::from_instance(&element) + .start(&element) + .await + .is_ok(); + gst_log!(SRC_CAT, obj: src_pad.gst_pad(), "FlushStop complete"); + } else { + gst_debug!(SRC_CAT, obj: src_pad.gst_pad(), "FlushStop aborted: FlushStart failed"); + } + } else { + gst_debug!(SRC_CAT, obj: src_pad.gst_pad(), "FlushStop ignored: no Flush in progress"); + } + + ret + } else { + true + }; + + if ret { + let src_pad = src_pad_weak.upgrade().expect("PadSrc no longer exists"); + gst_log!(SRC_CAT, obj: src_pad.gst_pad(), "Forwarding serialized {:?}", event); + sink_pad_weak.upgrade().expect("PadSink no longer available").gst_pad().push_event(event) + } else { + false + } } - true - } - EventView::Reconfigure(..) => true, - EventView::Latency(..) => true, - _ => false, - }; - - if ret { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled event {:?}", event); + .boxed(), + ) } else { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle event {:?}", event); - } + if let EventView::FlushStart(..) = event.view() { + let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); + if flush_join_handle.is_none() { + let element = element.clone(); + let pad_weak = pad.downgrade(); - // FIXME can't forward to sink_pad - Either::Left(ret) + *flush_join_handle = Some(pad.spawn(async move { + let res = ProxySrc::from_instance(&element).pause(&element).await; + let pad = pad_weak.upgrade().unwrap(); + if res.is_ok() { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStart complete"); + } else { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed"); + } + + res + })); + } else { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); + } + } + + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); + Either::Left( + self.0 + .sink_pad + .upgrade() + .expect("PadSink no longer available") + .gst_pad() + .push_event(event), + ) + } } fn src_query( &self, - pad: PadSrcRef, + pad: &PadSrcRef, _proxysrc: &ProxySrc, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + use gst::QueryView; + + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query); let ret = match query.view_mut() { QueryView::Latency(ref mut q) => { q.set(true, 0.into(), 0.into()); @@ -948,9 +1133,9 @@ impl PadSrcHandler for ProxySrcPadHandler { }; if ret { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled query {:?}", query); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", query); } else { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle query {:?}", query); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query); } ret @@ -959,19 +1144,23 @@ impl PadSrcHandler for ProxySrcPadHandler { #[derive(Debug)] struct StateSrc { - queue: Option, + proxy_ctx: Option, + ts_ctx: Option, } impl StateSrc { #[inline] - async fn lock_queue(&self) -> MutexGuard<'_, SharedQueueInner> { - self.queue.as_ref().unwrap().lock().await + fn proxy_ctx(&self) -> &ProxyContext { + self.proxy_ctx.as_ref().unwrap() } } impl Default for StateSrc { - fn default() -> StateSrc { - StateSrc { queue: None } + fn default() -> Self { + StateSrc { + proxy_ctx: None, + ts_ctx: None, + } } } @@ -997,12 +1186,22 @@ impl ProxySrc { let settings = self.settings.lock().await; - let queue = SharedQueue::get(&settings.proxy_context, false) + let proxy_ctx = ProxyContext::get(&settings.proxy_context, false) .await .ok_or_else(|| { - gst_error_msg!(gst::ResourceError::OpenRead, ["Failed to create get queue"]) + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to create get shared_state"] + ) })?; + let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to acquire Context: {}", err] + ) + })?; + let dataqueue = DataQueue::new( &element.clone().upcast(), self.src_pad.gst_pad(), @@ -1023,19 +1222,58 @@ impl ProxySrc { }, ); - queue.lock().await.dataqueue = Some(dataqueue); - state.queue = Some(queue); + { + let mut shared_ctx = proxy_ctx.lock_shared().await; + assert!(shared_ctx.shared_sink_rx.is_some()); + shared_ctx + .shared_src_tx + .take() + .unwrap() + .send(SharedSrc { + ts_ctx: ts_ctx.clone(), + src_pad: self.src_pad.downgrade(), + }) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to send SharedSrc: {:?}", err] + ) + })?; - let context = - Context::acquire(&settings.context, settings.context_wait).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to acquire Context: {}", err] - ) - })?; + shared_ctx.dataqueue = Some(dataqueue); + } + + state.ts_ctx = Some(ts_ctx); + state.proxy_ctx = Some(proxy_ctx); + + gst_debug!(SRC_CAT, obj: element, "Prepared"); + + Ok(()) + } + + async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().await; + + let shared_sink_rx = state.proxy_ctx().lock_shared().await.shared_sink_rx.take(); + if shared_sink_rx.is_none() { + gst_log!(SRC_CAT, obj: element, "Preparation already completed"); + return Ok(()); + } + + gst_debug!(SRC_CAT, obj: element, "Completing preparation"); + + let SharedSink { sink_pad } = shared_sink_rx.unwrap().await.map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to receive SharedSink: {:?}", err] + ) + })?; self.src_pad - .prepare(context, &ProxySrcPadHandler) + .prepare( + state.ts_ctx.take().unwrap(), + &ProxySrcPadHandler::new(sink_pad), + ) .await .map_err(|err| { gst_error_msg!( @@ -1044,7 +1282,7 @@ impl ProxySrc { ) })?; - gst_debug!(SRC_CAT, obj: element, "Prepared"); + gst_debug!(SRC_CAT, obj: element, "Preparation completed"); Ok(()) } @@ -1056,8 +1294,8 @@ impl ProxySrc { self.src_pad.stop_task().await; let _ = self.src_pad.unprepare().await; - if let Some(queue) = state.queue.take() { - queue.unprepare().await; + if let Some(proxy_ctx) = state.proxy_ctx.take() { + proxy_ctx.unprepare().await; } *state = StateSrc::default(); @@ -1071,7 +1309,14 @@ impl ProxySrc { let state = self.state.lock().await; gst_debug!(SRC_CAT, obj: element, "Starting"); - let dataqueue = state.lock_queue().await.dataqueue.as_ref().unwrap().clone(); + let dataqueue = state + .proxy_ctx() + .lock_shared() + .await + .dataqueue + .as_ref() + .unwrap() + .clone(); dataqueue.start().await; ProxySrcPadHandler::start_task(self.src_pad.as_ref(), element, dataqueue).await; @@ -1088,7 +1333,7 @@ impl ProxySrc { let pause_completion = self.src_pad.pause_task().await; - if let Some(dataqueue) = state.lock_queue().await.dataqueue.as_ref() { + if let Some(dataqueue) = state.proxy_ctx().lock_shared().await.dataqueue.as_ref() { dataqueue.pause().await; dataqueue.clear().await; dataqueue.stop().await; @@ -1228,6 +1473,10 @@ impl ElementImpl for ProxySrc { gst::StateChangeError })?; } + gst::StateChange::ReadyToPaused => { + runtime::executor::block_on(self.complete_preparation(element)) + .map_err(|_| gst::StateChangeError)?; + } gst::StateChange::PlayingToPaused => { runtime::executor::block_on(self.pause(element)) .map_err(|_| gst::StateChangeError)?; diff --git a/gst-plugin-threadshare/src/queue.rs b/gst-plugin-threadshare/src/queue.rs index 7523bab1..03eb0993 100644 --- a/gst-plugin-threadshare/src/queue.rs +++ b/gst-plugin-threadshare/src/queue.rs @@ -32,15 +32,15 @@ use gst; use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace}; -use gst::{EventView, QueryView}; use lazy_static::lazy_static; use std::collections::VecDeque; +use std::sync::{self, Arc}; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::runtime::{self, Context, JoinHandle, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use super::dataqueue::{DataQueue, DataQueueItem}; @@ -140,15 +140,30 @@ impl PendingQueue { } } +#[derive(Debug)] +struct QueuePadSinkHandlerInner { + flush_join_handle: sync::Mutex>>>, + context: Context, +} + #[derive(Clone, Debug)] -struct QueuePadSinkHandler; +struct QueuePadSinkHandler(Arc); + +impl QueuePadSinkHandler { + fn new(context: Context) -> Self { + QueuePadSinkHandler(Arc::new(QueuePadSinkHandlerInner { + flush_join_handle: sync::Mutex::new(None), + context, + })) + } +} impl PadSinkHandler for QueuePadSinkHandler { type ElementImpl = Queue; fn sink_chain( &self, - pad: PadSinkRef, + pad: &PadSinkRef, _queue: &Queue, element: &gst::Element, buffer: gst::Buffer, @@ -157,7 +172,7 @@ impl PadSinkHandler for QueuePadSinkHandler { let element = element.clone(); async move { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - gst_log!(CAT, obj: pad.gst_pad(), "Handling buffer {:?}", buffer); + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); let queue = Queue::from_instance(&element); queue .enqueue_item(&element, DataQueueItem::Buffer(buffer)) @@ -168,7 +183,7 @@ impl PadSinkHandler for QueuePadSinkHandler { fn sink_chain_list( &self, - pad: PadSinkRef, + pad: &PadSinkRef, _queue: &Queue, element: &gst::Element, list: gst::BufferList, @@ -177,7 +192,7 @@ impl PadSinkHandler for QueuePadSinkHandler { let element = element.clone(); async move { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - gst_log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list); + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list); let queue = Queue::from_instance(&element); queue .enqueue_item(&element, DataQueueItem::BufferList(list)) @@ -188,39 +203,42 @@ impl PadSinkHandler for QueuePadSinkHandler { fn sink_event( &self, - pad: PadSinkRef, + pad: &PadSinkRef, queue: &Queue, element: &gst::Element, event: gst::Event, ) -> Either> { + use gst::EventView; + if event.is_serialized() { let pad_weak = pad.downgrade(); let element = element.clone(); + let inner_weak = Arc::downgrade(&self.0); Either::Right( async move { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); let queue = Queue::from_instance(&element); - match event.view() { - EventView::FlushStart(..) => { - let _ = queue.stop(&element).await; - } - EventView::FlushStop(..) => { - let (res, state, pending) = element.get_state(0.into()); - if res == Ok(gst::StateChangeSuccess::Success) - && state == gst::State::Paused - || res == Ok(gst::StateChangeSuccess::Async) - && pending == gst::State::Paused - { + + if let EventView::FlushStop(..) = event.view() { + let inner = inner_weak.upgrade().unwrap(); + + let flush_join_handle = inner.flush_join_handle.lock().unwrap().take(); + if let Some(flush_join_handle) = flush_join_handle { + gst_debug!(CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete"); + if let Ok(Ok(())) = flush_join_handle.await { let _ = queue.start(&element).await; + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: FlushStart failed to complete"); } + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); } - _ => (), } - gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); queue .enqueue_item(&element, DataQueueItem::Event(event)) .await @@ -229,7 +247,21 @@ impl PadSinkHandler for QueuePadSinkHandler { .boxed(), ) } else { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event); + if let EventView::FlushStart(..) = event.view() { + let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); + if flush_join_handle.is_none() { + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + let element = element.clone(); + *flush_join_handle = + Some(self.0.context.spawn(async move { + Queue::from_instance(&element).stop(&element).await + })); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); + } + } + + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); Either::Left(queue.src_pad.gst_pad().push_event(event)) } @@ -237,26 +269,31 @@ impl PadSinkHandler for QueuePadSinkHandler { fn sink_query( &self, - pad: PadSinkRef, + pad: &PadSinkRef, queue: &Queue, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); if query.is_serialized() { // FIXME: How can we do this? - gst_log!(CAT, obj: pad.gst_pad(), "Dropping serialized query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Dropping serialized {:?}", query); false } else { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); queue.src_pad.gst_pad().peer_query(query) } } } -#[derive(Clone, Debug)] -struct QueuePadSrcHandler; +#[derive(Debug, Default)] +struct QueuePadSrcHandlerInner { + flush_join_handle: sync::Mutex>>>, +} + +#[derive(Clone, Debug, Default)] +struct QueuePadSrcHandler(Arc); impl QueuePadSrcHandler { async fn start_task(pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) { @@ -295,15 +332,15 @@ impl QueuePadSrcHandler { let res = match item { DataQueueItem::Buffer(buffer) => { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding buffer {:?}", buffer); + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); pad.push(buffer).await.map(drop) } DataQueueItem::BufferList(list) => { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding buffer list {:?}", list); + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", list); pad.push_list(list).await.map(drop) } DataQueueItem::Event(event) => { - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); pad.push_event(event).await; Ok(()) } @@ -347,57 +384,98 @@ impl PadSrcHandler for QueuePadSrcHandler { fn src_event( &self, - pad: PadSrcRef, + pad: &PadSrcRef, queue: &Queue, element: &gst::Element, event: gst::Event, ) -> Either> { - gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + use gst::EventView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); if event.is_serialized() { - let pad_weak = pad.downgrade(); let element = element.clone(); + let inner_weak = Arc::downgrade(&self.0); + let pad_weak = pad.downgrade(); Either::Right( async move { - let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized event {:?}", event); + let ret = if let EventView::FlushStop(..) = event.view() { + let mut ret = false; - let queue = Queue::from_instance(&element); - queue.sink_pad.gst_pad().push_event(event) + let pad = pad_weak.upgrade().unwrap(); + let inner_weak = inner_weak.upgrade().unwrap(); + let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take(); + if let Some(flush_join_handle) = flush_join_handle { + gst_debug!(CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete"); + if let Ok(Ok(())) = flush_join_handle.await { + ret = Queue::from_instance(&element) + .start(&element) + .await + .is_ok(); + gst_log!(CAT, obj: pad.gst_pad(), "FlushStop complete"); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed"); + } + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); + } + + ret + } else { + true + }; + + if ret { + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event); + + let queue = Queue::from_instance(&element); + queue.sink_pad.gst_pad().push_event(event) + } else { + false + } } .boxed(), ) } else { - match event.view() { - EventView::FlushStart(..) => { - let _ = runtime::executor::block_on(queue.stop(element)); - } - EventView::FlushStop(..) => { - let (res, state, pending) = element.get_state(0.into()); - if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing - || res == Ok(gst::StateChangeSuccess::Async) - && pending == gst::State::Playing - { - let _ = runtime::executor::block_on(queue.start(element)); - } - } - _ => (), - }; + if let EventView::FlushStart(..) = event.view() { + let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); + if flush_join_handle.is_none() { + let element = element.clone(); + let pad_weak = pad.downgrade(); - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized event {:?}", event); + *flush_join_handle = Some(pad.spawn(async move { + let res = Queue::from_instance(&element).stop(&element).await; + let pad = pad_weak.upgrade().unwrap(); + if res.is_ok() { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete"); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed"); + } + + res + })); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); + } + } + + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); Either::Left(queue.sink_pad.gst_pad().push_event(event)) } } fn src_query( &self, - pad: PadSrcRef, + pad: &PadSrcRef, queue: &Queue, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + use gst::QueryView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); if let QueryView::Scheduling(ref mut q) = query.view_mut() { let mut new_query = gst::Query::new_scheduling(); @@ -422,7 +500,7 @@ impl PadSrcHandler for QueuePadSrcHandler { return true; } - gst_log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); queue.sink_pad.gst_pad().peer_query(query) } } @@ -670,7 +748,7 @@ impl Queue { })?; self.src_pad - .prepare(context, &QueuePadSrcHandler) + .prepare(context.clone(), &QueuePadSrcHandler::default()) .await .map_err(|err| { gst_error_msg!( @@ -678,7 +756,9 @@ impl Queue { ["Error joining Context: {:?}", err] ) })?; - self.sink_pad.prepare(&QueuePadSinkHandler).await; + self.sink_pad + .prepare(&QueuePadSinkHandler::new(context)) + .await; gst_debug!(CAT, obj: element, "Prepared"); @@ -809,28 +889,24 @@ impl ObjectImpl for Queue { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; + let mut settings = runtime::executor::block_on(self.settings.lock()); match *prop { subclass::Property("max-size-buffers", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); settings.max_size_buffers = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-bytes", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); settings.max_size_bytes = value.get_some().expect("type checked upstream"); } subclass::Property("max-size-time", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); settings.max_size_time = value.get_some().expect("type checked upstream"); } subclass::Property("context", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); settings.context = value .get() .expect("type checked upstream") .unwrap_or_else(|| "".into()); } subclass::Property("context-wait", ..) => { - let mut settings = runtime::executor::block_on(self.settings.lock()); settings.context_wait = value.get_some().expect("type checked upstream"); } _ => unimplemented!(), @@ -840,27 +916,13 @@ impl ObjectImpl for Queue { fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { let prop = &PROPERTIES[id]; + let settings = runtime::executor::block_on(self.settings.lock()); match *prop { - subclass::Property("max-size-buffers", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); - Ok(settings.max_size_buffers.to_value()) - } - subclass::Property("max-size-bytes", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); - Ok(settings.max_size_bytes.to_value()) - } - subclass::Property("max-size-time", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); - Ok(settings.max_size_time.to_value()) - } - subclass::Property("context", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); - Ok(settings.context.to_value()) - } - subclass::Property("context-wait", ..) => { - let settings = runtime::executor::block_on(self.settings.lock()); - Ok(settings.context_wait.to_value()) - } + subclass::Property("max-size-buffers", ..) => Ok(settings.max_size_buffers.to_value()), + subclass::Property("max-size-bytes", ..) => Ok(settings.max_size_bytes.to_value()), + subclass::Property("max-size-time", ..) => Ok(settings.max_size_time.to_value()), + subclass::Property("context", ..) => Ok(settings.context.to_value()), + subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()), _ => unimplemented!(), } } diff --git a/gst-plugin-threadshare/src/runtime/executor.rs b/gst-plugin-threadshare/src/runtime/executor.rs index 25446ca4..0b6c79c1 100644 --- a/gst-plugin-threadshare/src/runtime/executor.rs +++ b/gst-plugin-threadshare/src/runtime/executor.rs @@ -25,17 +25,14 @@ //! //! * Waiting for an incoming packet on a Socket. //! * Waiting for an asynchronous `Mutex` `lock` to succeed. -//! * Waiting for a `Timeout` to be elapsed. -//! -//! [`Context`]s instantiators define the minimum time between two iterations of the [`Context`] -//! loop, which acts as a throttle, saving CPU usage when no operations are to be executed. +//! * Waiting for a time related `Future`. //! //! `Element` implementations should use [`PadSrc`] & [`PadSink`] which provides high-level features. //! -//! [`threadshare`]: ../index.html +//! [`threadshare`]: ../../index.html //! [`Context`]: struct.Context.html -//! [`PadSrc`]: struct.PadSrc.html -//! [`PadSink`]: struct.PadSink.html +//! [`PadSrc`]: ../pad/struct.PadSrc.html +//! [`PadSink`]: ../pad/struct.PadSink.html use futures::channel::oneshot; use futures::future::BoxFuture; @@ -82,8 +79,10 @@ thread_local!(static CURRENT_THREAD_CONTEXT: RefCell> = RefC /// Blocks on `future`. /// -/// This function must NOT be called within a [`Context`] thread. +/// IO & time related `Future`s must be handled within their own [`Context`]. +/// Wait for the result using a [`JoinHandle`] or a `channel`. /// +/// This function must NOT be called within a [`Context`] thread. /// The reason is this would prevent any task operating on the /// [`Context`] from making progress. /// @@ -92,6 +91,7 @@ thread_local!(static CURRENT_THREAD_CONTEXT: RefCell> = RefC /// This function panics if called within a [`Context`] thread. /// /// [`Context`]: struct.Context.html +/// [`JoinHandle`]: enum.JoinHandle.html pub fn block_on(future: Fut) -> Fut::Output { if Context::is_context_thread() { panic!("Attempt to `block_on` within a `Context` thread"); @@ -290,8 +290,8 @@ impl ContextWeak { /// /// See the [module-level documentation](index.html) for more. /// -/// [`PadSrc`]: ../struct.PadSrc.html -/// [`PadSink`]: ../struct.PadSink.html +/// [`PadSrc`]: ../pad/struct.PadSrc.html +/// [`PadSink`]: ../pad/struct.PadSink.html #[derive(Clone, Debug)] pub struct Context(Arc); diff --git a/gst-plugin-threadshare/src/runtime/mod.rs b/gst-plugin-threadshare/src/runtime/mod.rs index cd835a10..8cae1e04 100644 --- a/gst-plugin-threadshare/src/runtime/mod.rs +++ b/gst-plugin-threadshare/src/runtime/mod.rs @@ -47,7 +47,7 @@ pub mod executor; pub use executor::{Context, JoinHandle, TaskOutput}; pub mod pad; -pub use pad::{PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak}; +pub use pad::{PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak}; pub mod pad_context; pub use pad_context::{PadContext, PadContextRef, PadContextWeak}; diff --git a/gst-plugin-threadshare/src/runtime/pad.rs b/gst-plugin-threadshare/src/runtime/pad.rs index c6ec95a7..d11dcc1b 100644 --- a/gst-plugin-threadshare/src/runtime/pad.rs +++ b/gst-plugin-threadshare/src/runtime/pad.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019 François Laignel +// Copyright (C) 2019-2020 François Laignel // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public @@ -81,8 +81,8 @@ use std::marker::PhantomData; use std::sync; use std::sync::{Arc, Weak}; -use super::executor::{self, Context}; -use super::pad_context::{PadContext, PadContextRef, PadContextWeak}; +use super::executor::{self, Context, JoinHandle, TaskOutput}; +use super::pad_context::{PadContext, PadContextWeak}; use super::task::Task; use super::RUNTIME_CAT; @@ -144,7 +144,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { fn src_activate( &self, - pad: PadSrcRef, + pad: &PadSrcRef, _imp: &Self::ElementImpl, _element: &gst::Element, ) -> Result<(), gst::LoggableError> { @@ -174,7 +174,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { fn src_activatemode( &self, - _pad: PadSrcRef, + _pad: &PadSrcRef, _imp: &Self::ElementImpl, _element: &gst::Element, _mode: gst::PadMode, @@ -185,7 +185,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { fn src_event( &self, - pad: PadSrcRef, + pad: &PadSrcRef, _imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, @@ -197,21 +197,21 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { Either::Right( async move { let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); pad.gst_pad().event_default(Some(&element), event) } .boxed(), ) } else { - gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); Either::Left(pad.gst_pad().event_default(Some(element), event)) } } fn src_event_full( &self, - pad: PadSrcRef, + pad: &PadSrcRef, imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, @@ -225,12 +225,12 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { fn src_query( &self, - pad: PadSrcRef, + pad: &PadSrcRef, _imp: &Self::ElementImpl, element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); if query.is_serialized() { // FIXME serialized queries should be handled with the dataflow // but we can't return a `Future` because we couldn't honor QueryRef's lifetime @@ -244,37 +244,13 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { #[derive(Default, Debug)] pub struct PadSrcState { is_initialized: bool, - pad_context: Option, -} - -impl PadSrcState { - fn pad_context_priv(&self) -> &PadContext { - self.pad_context - .as_ref() - .expect("PadContext not initialized") - } - - pub fn pad_context(&self) -> Option> { - self.pad_context - .as_ref() - .map(|pad_context| pad_context.as_ref()) - } -} - -impl Drop for PadSrcState { - fn drop(&mut self) { - // Check invariant which can't be held automatically in `PadSrc` - // because `drop` can't be `async` - if self.pad_context.is_some() { - panic!("Missing call to `PadSrc::unprepare`"); - } - } } #[derive(Debug)] struct PadSrcInner { state: Mutex, gst_pad: gst::Pad, + pad_context: sync::RwLock>, task: Task, } @@ -287,9 +263,24 @@ impl PadSrcInner { PadSrcInner { state: Mutex::new(PadSrcState::default()), gst_pad, + pad_context: sync::RwLock::new(None), task: Task::default(), } } + + fn has_pad_context(&self) -> bool { + self.pad_context.read().unwrap().as_ref().is_some() + } +} + +impl Drop for PadSrcInner { + fn drop(&mut self) { + // Check invariant which can't be held automatically in `PadSrc` + // because `drop` can't be `async` + if self.has_pad_context() { + panic!("Missing call to `PadSrc::unprepare`"); + } + } } /// A [`PadSrc`] which can be moved in [`handler`]s functions and `Future`s. @@ -343,6 +334,25 @@ impl<'a> PadSrcRef<'a> { self.strong.lock_state().await } + pub fn pad_context(&self) -> PadContextWeak { + self.strong.pad_context() + } + + /// Spawns `future` using current [`PadContext`]. + /// + /// # Panics + /// + /// This function panics if the `PadSrc` is not prepared. + /// + /// [`PadContext`]: ../struct.PadContext.html + pub fn spawn(&self, future: Fut) -> JoinHandle + where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + { + self.strong.spawn(future) + } + pub fn downgrade(&self) -> PadSrcWeak { self.strong.downgrade() } @@ -421,45 +431,96 @@ impl PadSrcStrong { } #[inline] - pub async fn lock_state(&self) -> MutexGuard<'_, PadSrcState> { + async fn lock_state(&self) -> MutexGuard<'_, PadSrcState> { self.0.state.lock().await } + #[inline] + fn pad_context_priv(&self) -> sync::RwLockReadGuard<'_, Option> { + self.0.pad_context.read().unwrap() + } + + #[inline] + fn pad_context(&self) -> PadContextWeak { + self.pad_context_priv() + .as_ref() + .expect("PadContext not initialized") + .downgrade() + } + + #[inline] + pub fn spawn(&self, future: Fut) -> JoinHandle + where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + { + let pad_ctx = self.pad_context_priv(); + pad_ctx + .as_ref() + .expect("PadContext not initialized") + .spawn(future) + } + #[inline] fn downgrade(&self) -> PadSrcWeak { PadSrcWeak(Arc::downgrade(&self.0)) } - fn push_prelude(&self, state: &mut MutexGuard<'_, PadSrcState>) { - let must_send_task_context = if state.is_initialized { - self.gst_pad().check_reconfigure() - } else { - // Get rid of reconfigure flag - self.gst_pad().check_reconfigure(); - state.is_initialized = true; + fn push_prelude( + &self, + state: &mut MutexGuard<'_, PadSrcState>, + ) -> Result { + if !state.is_initialized || self.gst_pad().check_reconfigure() { + if !self.push_pad_context_event() { + return Err(FlowError::Error); + } - true - }; - - if must_send_task_context { - let pad_ctx = state.pad_context_priv(); - gst_log!( - RUNTIME_CAT, - obj: self.gst_pad(), - "Pushing PadContext Event {}", - pad_ctx, - ); - - self.gst_pad().push_event(pad_ctx.new_sticky_event()); + if !state.is_initialized { + // Get rid of reconfigure flag + self.gst_pad().check_reconfigure(); + state.is_initialized = true; + } } + + Ok(FlowSuccess::Ok) + } + + #[inline] + fn push_pad_context_event(&self) -> bool { + let pad_ctx = self.pad_context_priv(); + let pad_ctx = pad_ctx.as_ref().unwrap(); + gst_log!( + RUNTIME_CAT, + obj: self.gst_pad(), + "Pushing PadContext Event {}", + pad_ctx, + ); + + let ret = self.gst_pad().push_event(pad_ctx.new_sticky_event()); + if !ret { + gst_error!(RUNTIME_CAT, + obj: self.gst_pad(), + "Failed to push PadContext sticky event to PadSrc", + ); + } + + ret + } + + fn drain_pending_tasks(&self) -> Option> { + self.pad_context_priv() + .as_ref() + .unwrap() + .drain_pending_tasks() } #[inline] async fn push(&self, buffer: gst::Buffer) -> Result { let mut state = self.lock_state().await; - self.push_prelude(&mut state); + gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer); + + self.push_prelude(&mut state)?; - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing buffer"); let success = self.gst_pad().push(buffer).map_err(|err| { gst_error!(RUNTIME_CAT, obj: self.gst_pad(), @@ -469,7 +530,7 @@ impl PadSrcStrong { err })?; - if let Some(pending_tasks) = state.pad_context_priv().drain_pending_tasks() { + if let Some(pending_tasks) = self.drain_pending_tasks() { gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing pending tasks (push)"); pending_tasks.await?; } @@ -480,21 +541,22 @@ impl PadSrcStrong { #[inline] async fn push_list(&self, list: gst::BufferList) -> Result { let mut state = self.lock_state().await; - self.push_prelude(&mut state); + gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list); + + self.push_prelude(&mut state)?; - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing buffer_list"); let success = self.gst_pad().push_list(list).map_err(|err| { gst_error!( RUNTIME_CAT, obj: self.gst_pad(), "Failed to push BufferList to PadSrc: {:?} ({})", err, - state.pad_context_priv(), + self.pad_context_priv().as_ref().unwrap(), ); err })?; - if let Some(pending_tasks) = state.pad_context_priv().drain_pending_tasks() { + if let Some(pending_tasks) = self.drain_pending_tasks() { gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing pending tasks (push_list)"); pending_tasks.await?; } @@ -505,12 +567,28 @@ impl PadSrcStrong { #[inline] async fn push_event(&self, event: gst::Event) -> bool { let mut state = self.lock_state().await; - self.push_prelude(&mut state); - gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing event"); - let was_handled = self.gst_pad().push_event(event); + let was_handled = if PadContext::is_pad_context_event(&event) { + // Push our own PadContext + if !self.push_pad_context_event() { + return false; + } - if let Some(pending_tasks) = state.pad_context_priv().drain_pending_tasks() { + // Get rid of reconfigure flag + self.gst_pad().check_reconfigure(); + state.is_initialized = true; + + true + } else { + gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", event); + + if self.push_prelude(&mut state).is_err() { + return false; + } + self.gst_pad().push_event(event) + }; + + if let Some(pending_tasks) = self.drain_pending_tasks() { gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Processing pending tasks (push_event)"); if pending_tasks.await.is_err() { return false; @@ -596,6 +674,25 @@ impl PadSrc { self.0.lock_state().await } + pub fn pad_context(&self) -> PadContextWeak { + self.0.pad_context() + } + + /// Spawns `future` using current [`PadContext`]. + /// + /// # Panics + /// + /// This function panics if the `PadSrc` is not prepared. + /// + /// [`PadContext`]: ../struct.PadContext.html + pub fn spawn(&self, future: Fut) -> JoinHandle + where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + { + self.0.spawn(future) + } + fn init_pad_functions(&self, handler: &H) { let handler_clone = handler.clone(); let this_weak = self.downgrade(); @@ -611,7 +708,7 @@ impl PadSrc { }, move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSrc no longer exists"); - handler.src_activate(this_ref, imp, element) + handler.src_activate(&this_ref, imp, element) }, ) }); @@ -634,8 +731,7 @@ impl PadSrc { move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSrc no longer exists"); this_ref.activate_mode_hook(mode, active)?; - - handler.src_activatemode(this_ref, imp, element, mode, active) + handler.src_activatemode(&this_ref, imp, element, mode, active) }, ) }); @@ -653,7 +749,7 @@ impl PadSrc { || Err(FlowError::Error), move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSrc no longer exists"); - match handler.src_event_full(this_ref, imp, &element, event) { + match handler.src_event_full(&this_ref, imp, &element, event) { Either::Left(res) => res, Either::Right(_fut) => { // See these threads: @@ -678,7 +774,7 @@ impl PadSrc { move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSrc no longer exists"); if !query.is_serialized() { - handler.src_query(this_ref, imp, &element, query) + handler.src_query(&this_ref, imp, &element, query) } else { gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported"); false @@ -693,10 +789,10 @@ impl PadSrc { context: Context, handler: &H, ) -> Result<(), PadContextError> { - let mut state = self.lock_state().await; + let _state = self.lock_state().await; gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing"); - if state.pad_context.is_some() { + if (self.0).0.has_pad_context() { return Err(PadContextError::ActiveContext); } @@ -707,7 +803,7 @@ impl PadSrc { .await .map_err(|_| PadContextError::ActiveTask)?; - state.pad_context = Some(PadContext::new(context)); + *(self.0).0.pad_context.write().unwrap() = Some(PadContext::new(context.clone())); self.init_pad_functions(handler); @@ -716,7 +812,7 @@ impl PadSrc { /// Releases the resources held by this `PadSrc`. pub async fn unprepare(&self) -> Result<(), PadContextError> { - let mut state = self.lock_state().await; + let _state = self.lock_state().await; gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Unpreparing"); (self.0) @@ -737,7 +833,7 @@ impl PadSrc { self.gst_pad() .set_query_function(move |_gst_pad, _parent, _query| false); - state.pad_context = None; + *(self.0).0.pad_context.write().unwrap() = None; Ok(()) } @@ -787,7 +883,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_activate( &self, - pad: PadSinkRef, + pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, ) -> Result<(), gst::LoggableError> { @@ -817,7 +913,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_activatemode( &self, - _pad: PadSinkRef, + _pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, _mode: gst::PadMode, @@ -828,7 +924,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_chain( &self, - _pad: PadSinkRef, + _pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, _buffer: gst::Buffer, @@ -838,7 +934,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_chain_list( &self, - _pad: PadSinkRef, + _pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, _buffer_list: gst::BufferList, @@ -848,7 +944,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_event( &self, - pad: PadSinkRef, + pad: &PadSinkRef, _imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, @@ -860,21 +956,21 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { Either::Right( async move { let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); pad.gst_pad().event_default(Some(&element), event) } .boxed(), ) } else { - gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); Either::Left(pad.gst_pad().event_default(Some(element), event)) } } fn sink_event_full( &self, - pad: PadSinkRef, + pad: &PadSinkRef, imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, @@ -888,12 +984,12 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_query( &self, - pad: PadSinkRef, + pad: &PadSinkRef, _imp: &Self::ElementImpl, element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); if query.is_serialized() { // FIXME serialized queries should be handled with the dataflow // but we can't return a `Future` because we couldn't honor QueryRef's lifetime @@ -907,6 +1003,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { #[derive(Debug)] struct PadSinkInner { gst_pad: gst::Pad, + pad_context: sync::RwLock>, } impl PadSinkInner { @@ -915,7 +1012,10 @@ impl PadSinkInner { panic!("Wrong pad direction for PadSink"); } - PadSinkInner { gst_pad } + PadSinkInner { + gst_pad, + pad_context: sync::RwLock::new(None), + } } } @@ -965,6 +1065,10 @@ impl<'a> PadSinkRef<'a> { self.strong.gst_pad() } + pub fn pad_context(&self) -> Option { + self.strong.pad_context() + } + pub fn downgrade(&self) -> PadSinkWeak { self.strong.downgrade() } @@ -988,6 +1092,33 @@ impl<'a> PadSinkRef<'a> { Ok(()) } + + fn handle_future( + &self, + fut: impl Future> + Send + 'static, + ) -> Result { + if Context::is_context_thread() { + self.pad_context() + .as_ref() + .and_then(|pad_ctx_weak| pad_ctx_weak.upgrade()) + .expect("Operating on a Context without a valid PadContext") + .add_pending_task(fut.map(|res| res.map(drop))); + Ok(FlowSuccess::Ok) + } else { + // Not on a context thread: execute the Future immediately. + // + // - If there is no PadContext, we don't have any other options. + // - If there is a PadContext, it means that we received it from + // an upstream element, but there is at least one non-ts element + // operating on another thread in between, so we can't take + // advantage of the task queue. + // + // Note: we don't use `crate::runtime::executor::block_on` here + // because `Context::is_context_thread()` is checked in the `if` + // statement above. + futures::executor::block_on(fut) + } + } } #[derive(Debug)] @@ -1002,6 +1133,10 @@ impl PadSinkStrong { &self.0.gst_pad } + fn pad_context(&self) -> Option { + self.0.pad_context.read().unwrap().clone() + } + fn downgrade(&self) -> PadSinkWeak { PadSinkWeak(Arc::downgrade(&self.0)) } @@ -1047,6 +1182,10 @@ impl PadSink { self.0.gst_pad() } + pub fn pad_context(&self) -> Option { + self.0.pad_context() + } + pub fn downgrade(&self) -> PadSinkWeak { self.0.downgrade() } @@ -1069,7 +1208,7 @@ impl PadSink { }, move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSink no longer exists"); - handler.sink_activate(this_ref, imp, element) + handler.sink_activate(&this_ref, imp, element) }, ) }); @@ -1093,48 +1232,42 @@ impl PadSink { let this_ref = this_weak.upgrade().expect("PadSink no longer exists"); this_ref.activate_mode_hook(mode, active)?; - handler.sink_activatemode(this_ref, imp, element, mode, active) + handler.sink_activatemode(&this_ref, imp, element, mode, active) }, ) }); - // Functions depending on the `PadContext` - let pad_ctx = Arc::new(sync::Mutex::new(None)); - let handler_clone = handler.clone(); let this_weak = self.downgrade(); - let pad_ctx_clone = pad_ctx.clone(); self.gst_pad() .set_chain_function(move |_gst_pad, parent, buffer| { let handler = handler_clone.clone(); let this_weak = this_weak.clone(); - let pad_ctx = pad_ctx_clone.clone(); H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSink no longer exists"); - let chain_fut = handler.sink_chain(this_ref, imp, &element, buffer); - Self::handle_future(pad_ctx, chain_fut) + let chain_fut = handler.sink_chain(&this_ref, imp, &element, buffer); + this_ref.handle_future(chain_fut) }, ) }); let handler_clone = handler.clone(); let this_weak = self.downgrade(); - let pad_ctx_clone = pad_ctx.clone(); self.gst_pad() .set_chain_list_function(move |_gst_pad, parent, list| { let handler = handler_clone.clone(); let this_weak = this_weak.clone(); - let pad_ctx = pad_ctx_clone.clone(); H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSink no longer exists"); - let chain_list_fut = handler.sink_chain_list(this_ref, imp, &element, list); - Self::handle_future(pad_ctx, chain_list_fut) + let chain_list_fut = + handler.sink_chain_list(&this_ref, imp, &element, list); + this_ref.handle_future(chain_list_fut) }, ) }); @@ -1147,25 +1280,19 @@ impl PadSink { .set_event_full_function(move |gst_pad, parent, event| { let handler = handler_clone.clone(); let this_weak = this_weak.clone(); - let pad_ctx = pad_ctx.clone(); H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSink no longer exists"); if let Some(received_pc) = PadContext::check_pad_context_event(&event) { - gst_log!( - RUNTIME_CAT, - obj: gst_pad, - "Received PadContext Event {:?}", - received_pc - ); - *pad_ctx.lock().unwrap() = Some(received_pc); + gst_log!(RUNTIME_CAT, obj: gst_pad, "Received {:?}", received_pc); + *this_ref.strong.0.pad_context.write().unwrap() = Some(received_pc); } - match handler.sink_event_full(this_ref, imp, &element, event) { + match handler.sink_event_full(&this_ref, imp, &element, event) { Either::Left(ret) => ret, - Either::Right(fut) => Self::handle_future(pad_ctx, fut), + Either::Right(fut) => this_ref.handle_future(fut), } }, ) @@ -1183,7 +1310,7 @@ impl PadSink { move |imp, element| { let this_ref = this_weak.upgrade().expect("PadSink no longer exists"); if !query.is_serialized() { - handler.sink_query(this_ref, imp, &element, query) + handler.sink_query(&this_ref, imp, &element, query) } else { gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported"); false @@ -1193,31 +1320,6 @@ impl PadSink { }); } - fn handle_future( - pad_ctx: Arc>>, - fut: impl Future> + Send + 'static, - ) -> Result { - match pad_ctx - .lock() - .unwrap() - .as_ref() - .and_then(|pad_ctx_weak| pad_ctx_weak.upgrade()) - { - Some(pad_ctx) => { - pad_ctx.add_pending_task(fut.map(|res| res.map(drop))); - Ok(FlowSuccess::Ok) - } - None => match Context::current() { - None => executor::block_on(fut), - Some(context) => { - // Don't block the Context thread - context.spawn(fut); - Ok(FlowSuccess::Ok) - } - }, - } - } - pub async fn prepare(&self, handler: &H) { gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing"); self.init_pad_functions(handler); diff --git a/gst-plugin-threadshare/src/runtime/pad_context.rs b/gst-plugin-threadshare/src/runtime/pad_context.rs index 26522297..438cb28d 100644 --- a/gst-plugin-threadshare/src/runtime/pad_context.rs +++ b/gst-plugin-threadshare/src/runtime/pad_context.rs @@ -15,11 +15,7 @@ // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. -//! A wrapper on a [`Context`] with additional features for [`PadSrc`] & [`PadSink`]. -//! -//! [`Context`]: ../executor/struct.Context.html -//! [`PadSrc`]: ../pad/struct.PadSrc.html -//! [`PadSink`]: ../pad/struct.PadSink.html +//! Types that allow `Pad`s to operate within the threadshare runtime. use futures::prelude::*; @@ -216,6 +212,15 @@ impl PadContext { event.get_structure().unwrap().get_name() == "ts-pad-context" } + #[inline] + pub fn is_pad_context_event(event: &gst::Event) -> bool { + if let gst::EventView::CustomDownstreamSticky(e) = event.view() { + return Self::is_pad_context_sticky_event(&e); + } + + false + } + pub fn check_pad_context_event(event: &gst::Event) -> Option { if let gst::EventView::CustomDownstreamSticky(e) = event.view() { if Self::is_pad_context_sticky_event(&e) { diff --git a/gst-plugin-threadshare/src/runtime/task.rs b/gst-plugin-threadshare/src/runtime/task.rs index dcee39fa..fd580d7c 100644 --- a/gst-plugin-threadshare/src/runtime/task.rs +++ b/gst-plugin-threadshare/src/runtime/task.rs @@ -15,9 +15,7 @@ // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. -//! An execution loop to run asynchronous processing on a [`Context`]. -//! -//! [`Context`]: ../executor/struct.Context.html +//! An execution loop to run asynchronous processing. use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture}; use futures::lock::Mutex; @@ -77,7 +75,7 @@ impl Drop for TaskInner { /// A `Task` operating on a `threadshare` [`Context`]. /// -/// [`Context`]: struct.Context.html +/// [`Context`]: ../executor/struct.Context.html #[derive(Debug)] pub struct Task(Arc>); diff --git a/gst-plugin-threadshare/src/tcpclientsrc.rs b/gst-plugin-threadshare/src/tcpclientsrc.rs index fea4aef4..de183f40 100644 --- a/gst-plugin-threadshare/src/tcpclientsrc.rs +++ b/gst-plugin-threadshare/src/tcpclientsrc.rs @@ -18,7 +18,7 @@ use either::Either; use futures::future::BoxFuture; -use futures::lock::{Mutex, MutexGuard}; +use futures::lock::Mutex; use futures::prelude::*; use glib; @@ -31,7 +31,6 @@ use gst; use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace}; -use gst::{EventView, QueryView}; use lazy_static::lazy_static; @@ -39,7 +38,7 @@ use rand; use std::io; use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; +use std::sync::{self, Arc}; use std::u16; use tokio::io::AsyncReadExt; @@ -175,35 +174,34 @@ impl SocketRead for TcpClientReader { } } -struct TcpClientSrcPadHandlerInner { - socket_stream: Option>, +#[derive(Debug)] +struct TcpClientSrcPadHandlerState { need_initial_events: bool, + caps: Option, configured_caps: Option, } -impl Default for TcpClientSrcPadHandlerInner { +impl Default for TcpClientSrcPadHandlerState { fn default() -> Self { - TcpClientSrcPadHandlerInner { - socket_stream: None, + TcpClientSrcPadHandlerState { need_initial_events: true, + caps: None, configured_caps: None, } } } -#[derive(Clone)] -struct TcpClientSrcPadHandler(Arc>); +#[derive(Debug, Default)] +struct TcpClientSrcPadHandlerInner { + state: sync::RwLock, + socket_stream: Mutex>>, + flush_join_handle: sync::Mutex>>>, +} + +#[derive(Clone, Debug, Default)] +struct TcpClientSrcPadHandler(Arc); impl TcpClientSrcPadHandler { - fn new() -> Self { - TcpClientSrcPadHandler(Arc::new(Mutex::new(TcpClientSrcPadHandlerInner::default()))) - } - - #[inline] - async fn lock(&self) -> MutexGuard<'_, TcpClientSrcPadHandlerInner> { - self.0.lock().await - } - async fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) { let this = self.clone(); let pad_weak = pad.downgrade(); @@ -214,9 +212,10 @@ impl TcpClientSrcPadHandler { let element = element.clone(); async move { let item = this + .0 + .socket_stream .lock() .await - .socket_stream .as_mut() .expect("Missing SocketStream") .next() @@ -265,8 +264,13 @@ impl TcpClientSrcPadHandler { { let mut events = Vec::new(); { - let mut inner = self.lock().await; - if inner.need_initial_events { + // Only `read` the state in the hot path + if self.0.state.read().unwrap().need_initial_events { + // We will need to `write` and we also want to prevent + // any changes on the state while we are handling initial events + let mut state = self.0.state.write().unwrap(); + assert!(state.need_initial_events); + gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let stream_id = @@ -276,17 +280,17 @@ impl TcpClientSrcPadHandler { .group_id(gst::util_group_id_next()) .build(), ); - let tcpclientsrc = TcpClientSrc::from_instance(element); - if let Some(ref caps) = tcpclientsrc.settings.lock().await.caps { + + if let Some(ref caps) = state.caps { events.push(gst::Event::new_caps(&caps).build()); - inner.configured_caps = Some(caps.clone()); + state.configured_caps = Some(caps.clone()); } events.push( gst::Event::new_segment(&gst::FormattedSegment::::new()) .build(), ); - inner.need_initial_events = false; + state.need_initial_events = false; } if buffer.get_size() == 0 { @@ -329,26 +333,69 @@ impl PadSrcHandler for TcpClientSrcPadHandler { fn src_event( &self, - pad: PadSrcRef, - tcpclientsrc: &TcpClientSrc, + pad: &PadSrcRef, + _tcpclientsrc: &TcpClientSrc, element: &gst::Element, event: gst::Event, ) -> Either> { - gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + use gst::EventView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { EventView::FlushStart(..) => { - let _ = runtime::executor::block_on(tcpclientsrc.pause(element)); + let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); + if flush_join_handle.is_none() { + let element = element.clone(); + let pad_weak = pad.downgrade(); + + *flush_join_handle = Some(pad.spawn(async move { + let res = TcpClientSrc::from_instance(&element).pause(&element).await; + let pad = pad_weak.upgrade().unwrap(); + if res.is_ok() { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete"); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed"); + } + + res + })); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); + } + true } EventView::FlushStop(..) => { - let (res, state, pending) = element.get_state(0.into()); - if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing - || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing - { - let _ = runtime::executor::block_on(tcpclientsrc.start(element)); + let element = element.clone(); + let inner_weak = Arc::downgrade(&self.0); + let pad_weak = pad.downgrade(); + + let fut = async move { + let mut ret = false; + + let pad = pad_weak.upgrade().unwrap(); + let inner_weak = inner_weak.upgrade().unwrap(); + let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take(); + if let Some(flush_join_handle) = flush_join_handle { + if let Ok(Ok(())) = flush_join_handle.await { + ret = TcpClientSrc::from_instance(&element) + .start(&element) + .await + .is_ok(); + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop complete"); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed"); + } + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); + } + + ret } - true + .boxed(); + + return Either::Right(fut); } EventView::Reconfigure(..) => true, EventView::Latency(..) => true, @@ -356,9 +403,9 @@ impl PadSrcHandler for TcpClientSrcPadHandler { }; if ret { - gst_log!(CAT, obj: pad.gst_pad(), "Handled event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event); } else { - gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); } Either::Left(ret) @@ -366,12 +413,14 @@ impl PadSrcHandler for TcpClientSrcPadHandler { fn src_query( &self, - pad: PadSrcRef, + pad: &PadSrcRef, _tcpclientsrc: &TcpClientSrc, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + use gst::QueryView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); let ret = match query.view_mut() { QueryView::Latency(ref mut q) => { q.set(false, 0.into(), 0.into()); @@ -383,8 +432,8 @@ impl PadSrcHandler for TcpClientSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let inner = runtime::executor::block_on(self.lock()); - let caps = if let Some(ref caps) = inner.configured_caps { + let state = self.0.state.read().unwrap(); + let caps = if let Some(ref caps) = state.configured_caps { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -402,9 +451,9 @@ impl PadSrcHandler for TcpClientSrcPadHandler { }; if ret { - gst_log!(CAT, obj: pad.gst_pad(), "Handled query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query); } else { - gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query); } ret @@ -450,6 +499,9 @@ impl TcpClientSrc { let context = { let settings = self.settings.lock().await; + + self.src_pad_handler.0.state.write().unwrap().caps = settings.caps.clone(); + Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -526,7 +578,7 @@ impl TcpClientSrc { ) })?; - this.src_pad_handler.lock().await.socket_stream = Some(socket_stream); + *this.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream); this.state.lock().await.socket = Some(socket); @@ -536,17 +588,18 @@ impl TcpClientSrc { } async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + let preparation_set = self.preparation_set.lock().await.take(); + if preparation_set.is_none() { + gst_log!(CAT, obj: element, "Preparation already completed"); + return Ok(()); + } + gst_debug!(CAT, obj: element, "Completing preparation"); let PreparationSet { join_handle, context, - } = self - .preparation_set - .lock() - .await - .take() - .expect("preparation_set already taken"); + } = preparation_set.unwrap(); join_handle .await @@ -579,7 +632,12 @@ impl TcpClientSrc { } let _ = self.src_pad.unprepare().await; - self.src_pad_handler.lock().await.configured_caps = None; + self.src_pad_handler + .0 + .state + .write() + .unwrap() + .configured_caps = None; gst_debug!(CAT, obj: element, "Unprepared"); Ok(()) @@ -659,7 +717,7 @@ impl ObjectSubclass for TcpClientSrc { Self { src_pad, - src_pad_handler: TcpClientSrcPadHandler::new(), + src_pad_handler: TcpClientSrcPadHandler::default(), state: Mutex::new(State::default()), settings: Mutex::new(Settings::default()), preparation_set: Mutex::new(None), @@ -768,8 +826,12 @@ impl ElementImpl for TcpClientSrc { .map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - let mut src_pad_handler = runtime::executor::block_on(self.src_pad_handler.lock()); - src_pad_handler.need_initial_events = true; + self.src_pad_handler + .0 + .state + .write() + .unwrap() + .need_initial_events = true; } _ => (), } diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index c96bfb02..9931992c 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -18,7 +18,7 @@ use either::Either; use futures::future::BoxFuture; -use futures::lock::{Mutex, MutexGuard}; +use futures::lock::Mutex; use futures::prelude::*; use gio; @@ -36,7 +36,6 @@ use gst; use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_trace}; -use gst::{EventView, QueryView}; use gst_net::*; use lazy_static::lazy_static; @@ -45,7 +44,7 @@ use rand; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::sync::Arc; +use std::sync::{self, Arc}; use std::u16; #[cfg(unix)] @@ -328,37 +327,35 @@ impl SocketRead for UdpReader { } #[derive(Debug)] -struct UdpSrcPadHandlerInner { +struct UdpSrcPadHandlerState { retrieve_sender_address: bool, - socket_stream: Option>, need_initial_events: bool, + caps: Option, configured_caps: Option, } -impl Default for UdpSrcPadHandlerInner { +impl Default for UdpSrcPadHandlerState { fn default() -> Self { - UdpSrcPadHandlerInner { + UdpSrcPadHandlerState { retrieve_sender_address: true, - socket_stream: None, need_initial_events: true, + caps: None, configured_caps: None, } } } -#[derive(Clone, Debug)] -struct UdpSrcPadHandler(Arc>); +#[derive(Debug, Default)] +struct UdpSrcPadHandlerInner { + state: sync::RwLock, + socket_stream: Mutex>>, + flush_join_handle: sync::Mutex>>>, +} + +#[derive(Clone, Debug, Default)] +struct UdpSrcPadHandler(Arc); impl UdpSrcPadHandler { - fn new() -> Self { - UdpSrcPadHandler(Arc::new(Mutex::new(UdpSrcPadHandlerInner::default()))) - } - - #[inline] - async fn lock(&self) -> MutexGuard<'_, UdpSrcPadHandlerInner> { - self.0.lock().await - } - async fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) { let this = self.clone(); let pad_weak = pad.downgrade(); @@ -369,9 +366,10 @@ impl UdpSrcPadHandler { let element = element.clone(); async move { let item = this + .0 + .socket_stream .lock() .await - .socket_stream .as_mut() .expect("Missing SocketStream") .next() @@ -411,7 +409,7 @@ impl UdpSrcPadHandler { }; if let Some(saddr) = saddr { - if this.lock().await.retrieve_sender_address { + if this.0.state.read().unwrap().retrieve_sender_address { let inet_addr = match saddr.ip() { IpAddr::V4(ip) => gio::InetAddress::new_from_bytes( gio::InetAddressBytes::V4(&ip.octets()), @@ -436,8 +434,13 @@ impl UdpSrcPadHandler { { let mut events = Vec::new(); { - let mut inner = self.lock().await; - if inner.need_initial_events { + // Only `read` the state in the hot path + if self.0.state.read().unwrap().need_initial_events { + // We will need to `write` and we also want to prevent + // any changes on the state while we are handling initial events + let mut state = self.0.state.write().unwrap(); + assert!(state.need_initial_events); + gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let stream_id = @@ -447,17 +450,17 @@ impl UdpSrcPadHandler { .group_id(gst::util_group_id_next()) .build(), ); - let udpsrc = UdpSrc::from_instance(element); - if let Some(ref caps) = udpsrc.settings.lock().await.caps { + + if let Some(ref caps) = state.caps { events.push(gst::Event::new_caps(&caps).build()); - inner.configured_caps = Some(caps.clone()); + state.configured_caps = Some(caps.clone()); } events.push( gst::Event::new_segment(&gst::FormattedSegment::::new()) .build(), ); - inner.need_initial_events = false; + state.need_initial_events = false; } } @@ -496,26 +499,69 @@ impl PadSrcHandler for UdpSrcPadHandler { fn src_event( &self, - pad: PadSrcRef, - udpsrc: &UdpSrc, + pad: &PadSrcRef, + _udpsrc: &UdpSrc, element: &gst::Element, event: gst::Event, ) -> Either> { - gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + use gst::EventView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { EventView::FlushStart(..) => { - let _ = runtime::executor::block_on(udpsrc.pause(element)); + let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); + if flush_join_handle.is_none() { + let element = element.clone(); + let pad_weak = pad.downgrade(); + + *flush_join_handle = Some(pad.spawn(async move { + let res = UdpSrc::from_instance(&element).pause(&element).await; + let pad = pad_weak.upgrade().unwrap(); + if res.is_ok() { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete"); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed"); + } + + res + })); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); + } + true } EventView::FlushStop(..) => { - let (res, state, pending) = element.get_state(0.into()); - if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing - || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing - { - let _ = runtime::executor::block_on(udpsrc.start(element)); + let element = element.clone(); + let inner_weak = Arc::downgrade(&self.0); + let pad_weak = pad.downgrade(); + + let fut = async move { + let mut ret = false; + + let pad = pad_weak.upgrade().unwrap(); + let inner_weak = inner_weak.upgrade().unwrap(); + let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take(); + if let Some(flush_join_handle) = flush_join_handle { + if let Ok(Ok(())) = flush_join_handle.await { + ret = UdpSrc::from_instance(&element) + .start(&element) + .await + .is_ok(); + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop complete"); + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed"); + } + } else { + gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); + } + + ret } - true + .boxed(); + + return Either::Right(fut); } EventView::Reconfigure(..) => true, EventView::Latency(..) => true, @@ -523,9 +569,9 @@ impl PadSrcHandler for UdpSrcPadHandler { }; if ret { - gst_log!(CAT, obj: pad.gst_pad(), "Handled event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event); } else { - gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle event {:?}", event); + gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); } Either::Left(ret) @@ -533,12 +579,14 @@ impl PadSrcHandler for UdpSrcPadHandler { fn src_query( &self, - pad: PadSrcRef, + pad: &PadSrcRef, _udpsrc: &UdpSrc, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { - gst_log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + use gst::QueryView; + + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); let ret = match query.view_mut() { QueryView::Latency(ref mut q) => { @@ -551,8 +599,8 @@ impl PadSrcHandler for UdpSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let inner = runtime::executor::block_on(self.lock()); - let caps = if let Some(ref caps) = inner.configured_caps { + let state = self.0.state.read().unwrap(); + let caps = if let Some(ref caps) = state.configured_caps { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -570,9 +618,9 @@ impl PadSrcHandler for UdpSrcPadHandler { }; if ret { - gst_log!(CAT, obj: pad.gst_pad(), "Handled query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query); } else { - gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle query {:?}", query); + gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query); } ret @@ -621,6 +669,12 @@ impl UdpSrc { let context = { let settings = self.settings.lock().await.clone(); + { + let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap(); + src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address; + src_pad_handler_state.caps = settings.caps.clone(); + } + Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -857,11 +911,7 @@ impl UdpSrc { ) })?; - { - let mut src_pad_handler = this.src_pad_handler.lock().await; - src_pad_handler.retrieve_sender_address = settings.retrieve_sender_address; - src_pad_handler.socket_stream = Some(socket_stream); - } + *this.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream); this.state.lock().await.socket = Some(socket); @@ -919,7 +969,12 @@ impl UdpSrc { } let _ = self.src_pad.unprepare().await; - self.src_pad_handler.lock().await.configured_caps = None; + self.src_pad_handler + .0 + .state + .write() + .unwrap() + .configured_caps = None; gst_debug!(CAT, obj: element, "Unprepared"); Ok(()) @@ -1015,7 +1070,7 @@ impl ObjectSubclass for UdpSrc { Self { src_pad, - src_pad_handler: UdpSrcPadHandler::new(), + src_pad_handler: UdpSrcPadHandler::default(), state: Mutex::new(State::default()), settings: Mutex::new(Settings::default()), preparation_set: Mutex::new(None), @@ -1152,9 +1207,12 @@ impl ElementImpl for UdpSrc { .map_err(|_| gst::StateChangeError)?; } gst::StateChange::PausedToReady => { - runtime::executor::block_on(async { - self.src_pad_handler.lock().await.need_initial_events = true; - }); + self.src_pad_handler + .0 + .state + .write() + .unwrap() + .need_initial_events = true; } _ => (), } diff --git a/gst-plugin-threadshare/tests/pad.rs b/gst-plugin-threadshare/tests/pad.rs index 0113e261..8da7a9f3 100644 --- a/gst-plugin-threadshare/tests/pad.rs +++ b/gst-plugin-threadshare/tests/pad.rs @@ -34,13 +34,16 @@ use gst::{gst_debug, gst_error_msg, gst_log}; use lazy_static::lazy_static; use std::boxed::Box; -use std::sync::Arc; +use std::sync::{self, Arc}; +use gstthreadshare::runtime::executor::block_on; use gstthreadshare::runtime::prelude::*; -use gstthreadshare::runtime::{self, Context, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use gstthreadshare::runtime::{ + self, Context, JoinHandle, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef, +}; const DEFAULT_CONTEXT: &str = ""; -const SLEEP_DURATION: u32 = 2; +const THROTTLING_DURATION: u32 = 2; fn init() { use std::sync::Once; @@ -78,8 +81,10 @@ lazy_static! { ); } -#[derive(Clone, Debug)] -struct PadSrcHandlerTest; +#[derive(Clone, Debug, Default)] +struct PadSrcHandlerTest { + flush_join_handle: Arc>>>>, +} impl PadSrcHandlerTest { async fn start_task(&self, pad: PadSrcRef<'_>, receiver: mpsc::Receiver) { @@ -128,7 +133,7 @@ impl PadSrcHandler for PadSrcHandlerTest { fn src_activatemode( &self, - _pad: PadSrcRef, + _pad: &PadSrcRef, _elem_src_test: &ElementSrcTest, _element: &gst::Element, mode: gst::PadMode, @@ -141,34 +146,79 @@ impl PadSrcHandler for PadSrcHandlerTest { fn src_event( &self, - pad: PadSrcRef, - elem_src_test: &ElementSrcTest, + pad: &PadSrcRef, + _elem_src_test: &ElementSrcTest, element: &gst::Element, event: gst::Event, ) -> Either> { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { EventView::FlushStart(..) => { - let _ = runtime::executor::block_on(elem_src_test.pause(element)); + let mut flush_join_handle = self.flush_join_handle.lock().unwrap(); + if flush_join_handle.is_none() { + let element = element.clone(); + let pad_weak = pad.downgrade(); + + *flush_join_handle = Some(pad.spawn(async move { + let res = ElementSrcTest::from_instance(&element) + .pause(&element) + .await; + let pad = pad_weak.upgrade().unwrap(); + if res.is_ok() { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStart complete"); + } else { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed"); + } + + res + })); + } else { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); + } + true } EventView::FlushStop(..) => { - let (res, state, pending) = element.get_state(0.into()); - if res == Ok(gst::StateChangeSuccess::Success) && state == gst::State::Playing - || res == Ok(gst::StateChangeSuccess::Async) && pending == gst::State::Playing - { - let _ = runtime::executor::block_on(elem_src_test.start(element)); + let element = element.clone(); + let flush_join_handle_weak = Arc::downgrade(&self.flush_join_handle); + let pad_weak = pad.downgrade(); + + let fut = async move { + let mut ret = false; + + let pad = pad_weak.upgrade().unwrap(); + + let flush_join_handle = flush_join_handle_weak.upgrade().unwrap(); + let flush_join_handle = flush_join_handle.lock().unwrap().take(); + if let Some(flush_join_handle) = flush_join_handle { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete"); + if let Ok(Ok(())) = flush_join_handle.await { + ret = ElementSrcTest::from_instance(&element) + .start(&element) + .await + .is_ok(); + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStop complete"); + } else { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed"); + } + } else { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); + } + + ret } - true + .boxed(); + + return Either::Right(fut); } _ => false, }; if ret { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled event {:?}", event); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event); } else { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle event {:?}", event); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); } Either::Left(ret) @@ -208,7 +258,7 @@ impl ElementSrcTest { let _state = self.state.lock().await; gst_debug!(SRC_CAT, obj: element, "Preparing"); - let context = Context::acquire(&self.settings.lock().await.context, SLEEP_DURATION) + let context = Context::acquire(&self.settings.lock().await.context, THROTTLING_DURATION) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -280,7 +330,7 @@ impl ElementSrcTest { } impl ObjectSubclass for ElementSrcTest { - const NAME: &'static str = "RsTsElementSrcTest"; + const NAME: &'static str = "TsElementSrcTest"; type ParentType = gst::Element; type Instance = gst::subclass::ElementInstanceStruct; type Class = glib::subclass::simple::ClassStruct; @@ -318,7 +368,7 @@ impl ObjectSubclass for ElementSrcTest { ElementSrcTest { src_pad, - src_pad_handler: PadSrcHandlerTest, + src_pad_handler: PadSrcHandlerTest::default(), state: Mutex::new(ElementSrcState::default()), settings: Mutex::new(settings), } @@ -426,111 +476,180 @@ static SINK_PROPERTIES: [glib::subclass::Property; 1] = ) })]; +#[derive(Debug)] +struct PadSinkHandlerTestInner { + flush_join_handle: sync::Mutex>>, + context: Context, +} + #[derive(Clone, Debug)] -struct PadSinkHandlerTest; +struct PadSinkHandlerTest(Arc); + +impl Default for PadSinkHandlerTest { + fn default() -> Self { + PadSinkHandlerTest(Arc::new(PadSinkHandlerTestInner { + flush_join_handle: sync::Mutex::new(None), + context: Context::acquire("PadSinkHandlerTest", THROTTLING_DURATION).unwrap(), + })) + } +} impl PadSinkHandler for PadSinkHandlerTest { type ElementImpl = ElementSinkTest; fn sink_chain( &self, - pad: PadSinkRef, - elem_sink_test: &ElementSinkTest, - _element: &gst::Element, + _pad: &PadSinkRef, + _elem_sink_test: &ElementSinkTest, + element: &gst::Element, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let pad_weak = pad.downgrade(); - let sender = Arc::clone(&elem_sink_test.sender); + let element = element.clone(); async move { - let pad = pad_weak - .upgrade() - .expect("PadSink no longer exists in sink_chain"); - - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding {:?}", buffer); - sender - .lock() + let elem_sink_test = ElementSinkTest::from_instance(&element); + elem_sink_test + .forward_item(&element, Item::Buffer(buffer)) .await - .as_mut() - .expect("ItemSender not set") - .send(Item::Buffer(buffer)) - .await - .map(|_| gst::FlowSuccess::Ok) - .map_err(|_| gst::FlowError::CustomError) } .boxed() } fn sink_chain_list( &self, - pad: PadSinkRef, - elem_sink_test: &ElementSinkTest, - _element: &gst::Element, + _pad: &PadSinkRef, + _elem_sink_test: &ElementSinkTest, + element: &gst::Element, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let pad_weak = pad.downgrade(); - let sender = Arc::clone(&elem_sink_test.sender); + let element = element.clone(); async move { - let pad = pad_weak - .upgrade() - .expect("PadSink no longer exists in sink_chain_list"); - - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding {:?}", list); - sender - .lock() + let elem_sink_test = ElementSinkTest::from_instance(&element); + elem_sink_test + .forward_item(&element, Item::BufferList(list)) .await - .as_mut() - .expect("ItemSender not set") - .send(Item::BufferList(list)) - .await - .map(|_| gst::FlowSuccess::Ok) - .map_err(|_| gst::FlowError::CustomError) } .boxed() } fn sink_event( &self, - pad: PadSinkRef, - elem_sink_test: &ElementSinkTest, - _element: &gst::Element, + pad: &PadSinkRef, + _elem_sink_test: &ElementSinkTest, + element: &gst::Element, event: gst::Event, ) -> Either> { if event.is_serialized() { + gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); + let pad_weak = pad.downgrade(); - let sender = Arc::clone(&elem_sink_test.sender); + let element = element.clone(); + let inner_weak = Arc::downgrade(&self.0); Either::Right(async move { - let pad = pad_weak - .upgrade() - .expect("PadSink no longer exists in sink_event"); + let elem_sink_test = ElementSinkTest::from_instance(&element); - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding serialized event {:?}", event); - sender - .lock() - .await - .as_mut() - .expect("ItemSender not set") - .send(Item::Event(event)) - .await - .is_ok() + if let EventView::FlushStop(..) = event.view() { + let pad = pad_weak + .upgrade() + .expect("PadSink no longer exists in sink_event"); + + let inner = inner_weak.upgrade().unwrap(); + let flush_join_handle = inner.flush_join_handle.lock().unwrap().take(); + if let Some(flush_join_handle) = flush_join_handle { + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete"); + if let Ok(()) = flush_join_handle.await { + elem_sink_test.start(&element).await; + } else { + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: FlushStart failed to complete"); + } + } else { + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); + } + } + + elem_sink_test.forward_item(&element, Item::Event(event)).await.is_ok() }.boxed()) } else { - gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event); - Either::Left( - runtime::executor::block_on(elem_sink_test.sender.lock()) - .as_mut() - .expect("ItemSender not set") - .try_send(Item::Event(event)) - .is_ok(), - ) + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); + + let ret = match event.view() { + EventView::FlushStart(..) => { + let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); + if flush_join_handle.is_none() { + gst_log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + let element = element.clone(); + *flush_join_handle = Some(self.0.context.spawn(async move { + ElementSinkTest::from_instance(&element) + .stop(&element) + .await; + })); + } else { + gst_debug!(SINK_CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); + } + + true + } + _ => false, + }; + + // Should forward item here + Either::Left(ret) } } } +#[derive(Debug, Default)] +struct ElementSinkState { + flushing: bool, + sender: Option>, +} + #[derive(Debug)] struct ElementSinkTest { sink_pad: PadSink, - sender: Arc>>>, + state: Mutex, +} + +impl ElementSinkTest { + async fn forward_item( + &self, + element: &gst::Element, + item: Item, + ) -> Result { + let mut state = self.state.lock().await; + if !state.flushing { + gst_debug!(SINK_CAT, obj: element, "Fowarding {:?}", item); + state + .sender + .as_mut() + .expect("Item Sender not set") + .send(item) + .await + .map(|_| gst::FlowSuccess::Ok) + .map_err(|_| gst::FlowError::CustomError) + } else { + gst_debug!( + SINK_CAT, + obj: element, + "Not fowarding {:?} due to flushing", + item + ); + Err(gst::FlowError::Flushing) + } + } + + async fn start(&self, element: &gst::Element) { + gst_debug!(SINK_CAT, obj: element, "Starting"); + let mut state = self.state.lock().await; + state.flushing = false; + gst_debug!(SINK_CAT, obj: element, "Started"); + } + + async fn stop(&self, element: &gst::Element) { + gst_debug!(SINK_CAT, obj: element, "Stopping"); + self.state.lock().await.flushing = true; + gst_debug!(SINK_CAT, obj: element, "Stopped"); + } } lazy_static! { @@ -542,7 +661,7 @@ lazy_static! { } impl ObjectSubclass for ElementSinkTest { - const NAME: &'static str = "RsTsElementSinkTest"; + const NAME: &'static str = "TsElementSinkTest"; type ParentType = gst::Element; type Instance = gst::subclass::ElementInstanceStruct; type Class = glib::subclass::simple::ClassStruct; @@ -580,6 +699,7 @@ impl ObjectSubclass for ElementSinkTest { .expect("signal arg") .expect("missing signal arg"); let this = Self::from_instance(&element); + gst_debug!(SINK_CAT, obj: &element, "Pushing FlushStart"); Some( this.sink_pad .gst_pad() @@ -600,6 +720,7 @@ impl ObjectSubclass for ElementSinkTest { .expect("signal arg") .expect("missing signal arg"); let this = Self::from_instance(&element); + gst_debug!(SINK_CAT, obj: &element, "Pushing FlushStop"); Some( this.sink_pad .gst_pad() @@ -616,7 +737,7 @@ impl ObjectSubclass for ElementSinkTest { ElementSinkTest { sink_pad, - sender: Arc::new(Mutex::new(None)), + state: Mutex::new(ElementSinkState::default()), } } } @@ -634,7 +755,7 @@ impl ObjectImpl for ElementSinkTest { .expect("type checked upstream") .expect("ItemSender not found") .clone(); - *runtime::executor::block_on(self.sender.lock()) = Some(sender); + runtime::executor::block_on(self.state.lock()).sender = Some(sender); } _ => unimplemented!(), } @@ -658,7 +779,10 @@ impl ElementImpl for ElementSinkTest { match transition { gst::StateChange::NullToReady => { - runtime::executor::block_on(self.sink_pad.prepare(&PadSinkHandlerTest)); + runtime::executor::block_on(self.sink_pad.prepare(&PadSinkHandlerTest::default())); + } + gst::StateChange::PausedToReady => { + runtime::executor::block_on(self.stop(element)); } gst::StateChange::ReadyToNull => { runtime::executor::block_on(self.sink_pad.unprepare()); @@ -666,12 +790,20 @@ impl ElementImpl for ElementSinkTest { _ => (), } - self.parent_change_state(element, transition) + let success = self.parent_change_state(element, transition)?; + + if transition == gst::StateChange::ReadyToPaused { + runtime::executor::block_on(self.start(element)); + } + + Ok(success) } } fn setup( context_name: &str, + mut middle_element_1: Option, + mut middle_element_2: Option, ) -> ( gst::Pipeline, gst::Element, @@ -680,53 +812,69 @@ fn setup( ) { init(); + let pipeline = gst::Pipeline::new(None); + // Src let src_element = glib::Object::new(ElementSrcTest::get_type(), &[]) .unwrap() .downcast::() .unwrap(); src_element.set_property("context", &context_name).unwrap(); + pipeline.add(&src_element).unwrap(); + + let mut last_element = src_element.clone(); + + if let Some(middle_element) = middle_element_1.take() { + pipeline.add(&middle_element).unwrap(); + last_element.link(&middle_element).unwrap(); + last_element = middle_element; + } + + if let Some(middle_element) = middle_element_2.take() { + // Don't link the 2 middle elements: this is used for ts-proxy + pipeline.add(&middle_element).unwrap(); + last_element = middle_element; + } // Sink let sink_element = glib::Object::new(ElementSinkTest::get_type(), &[]) .unwrap() .downcast::() .unwrap(); + pipeline.add(&sink_element).unwrap(); + last_element.link(&sink_element).unwrap(); let (sender, receiver) = mpsc::channel::(10); sink_element .set_property("sender", &ItemSender { sender }) .unwrap(); - let pipeline = gst::Pipeline::new(None); - pipeline.add_many(&[&src_element, &sink_element]).unwrap(); - - src_element.link(&sink_element).unwrap(); - (pipeline, src_element, sink_element, receiver) } -#[test] -fn task() { - let (pipeline, src_element, sink_element, mut receiver) = setup("task"); - +fn nominal_scenario( + scenario_name: &str, + pipeline: gst::Pipeline, + src_element: gst::Element, + mut receiver: mpsc::Receiver, +) { let elem_src_test = ElementSrcTest::from_instance(&src_element); pipeline.set_state(gst::State::Playing).unwrap(); // Initial events - runtime::executor::block_on( + block_on( elem_src_test.try_push(Item::Event( - gst::Event::new_stream_start("stream_id_task_test") + gst::Event::new_stream_start(scenario_name) .group_id(gst::util_group_id_next()) .build(), )), ) .unwrap(); - match runtime::executor::block_on(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { - gst::EventView::CustomDownstreamSticky(e) => { + EventView::CustomDownstreamSticky(e) => { assert!(PadContext::is_pad_context_sticky_event(&e)) } other => panic!("Unexpected event {:?}", other), @@ -734,34 +882,32 @@ fn task() { other => panic!("Unexpected item {:?}", other), } - match runtime::executor::block_on(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { - gst::EventView::StreamStart(_) => (), + EventView::StreamStart(_) => (), other => panic!("Unexpected event {:?}", other), }, other => panic!("Unexpected item {:?}", other), } - runtime::executor::block_on(elem_src_test.try_push(Item::Event( + block_on(elem_src_test.try_push(Item::Event( gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), ))) .unwrap(); - match runtime::executor::block_on(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { - gst::EventView::Segment(_) => (), + EventView::Segment(_) => (), other => panic!("Unexpected event {:?}", other), }, other => panic!("Unexpected item {:?}", other), } // Buffer - runtime::executor::block_on( - elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))), - ) - .unwrap(); + block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) + .unwrap(); - match runtime::executor::block_on(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::Buffer(buffer) => { let data = buffer.map_readable().unwrap(); assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice()); @@ -774,9 +920,9 @@ fn task() { list.get_mut() .unwrap() .add(gst::Buffer::from_slice(vec![1, 2, 3, 4])); - runtime::executor::block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap(); + block_on(elem_src_test.try_push(Item::BufferList(list))).unwrap(); - match runtime::executor::block_on(receiver.next()).unwrap() { + match block_on(receiver.next()).unwrap() { Item::BufferList(_) => (), other => panic!("Unexpected item {:?}", other), } @@ -785,10 +931,8 @@ fn task() { pipeline.set_state(gst::State::Paused).unwrap(); // Items not longer accepted - runtime::executor::block_on( - elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))), - ) - .unwrap_err(); + block_on(elem_src_test.try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4])))) + .unwrap_err(); // Nothing forwarded receiver.try_next().unwrap_err(); @@ -800,41 +944,106 @@ fn task() { receiver.try_next().unwrap_err(); // Flush - assert!(sink_element - .emit("flush-start", &[]) - .unwrap() - .unwrap() - .get_some::() - .unwrap()); - - assert!(sink_element - .emit("flush-stop", &[]) - .unwrap() - .unwrap() - .get_some::() - .unwrap()); - - // EOS - runtime::executor::block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))) + block_on(elem_src_test.try_push(Item::Event(gst::Event::new_flush_start().build()))).unwrap(); + block_on(elem_src_test.try_push(Item::Event(gst::Event::new_flush_stop(false).build()))) .unwrap(); - match runtime::executor::block_on(receiver.next()).unwrap() { + + match block_on(receiver.next()).unwrap() { Item::Event(event) => match event.view() { - gst::EventView::Eos(_) => (), + EventView::FlushStop(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // EOS + block_on(elem_src_test.try_push(Item::Event(gst::Event::new_eos().build()))).unwrap(); + + match block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Eos(_) => (), other => panic!("Unexpected event {:?}", other), }, other => panic!("Unexpected item {:?}", other), } - // Stop the Pad task pipeline.set_state(gst::State::Ready).unwrap(); // Receiver was dropped when stopping => can't send anymore - runtime::executor::block_on( + block_on( elem_src_test.try_push(Item::Event( - gst::Event::new_stream_start("stream_id_task_test_past_stop") + gst::Event::new_stream_start(&format!("{}_past_stop", scenario_name)) .group_id(gst::util_group_id_next()) .build(), )), ) .unwrap_err(); } + +#[test] +fn src_sink_nominal() { + let name = "src_sink_nominal"; + + let (pipeline, src_element, _sink_element, receiver) = setup(&name, None, None); + + nominal_scenario(&name, pipeline, src_element, receiver); +} + +#[test] +fn src_tsqueue_sink_nominal() { + init(); + + let name = "src_tsqueue_sink"; + + let ts_queue = gst::ElementFactory::make("ts-queue", Some("ts-queue")).unwrap(); + ts_queue + .set_property("context", &format!("{}_queue", name)) + .unwrap(); + ts_queue + .set_property("context-wait", &THROTTLING_DURATION) + .unwrap(); + + let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None); + + nominal_scenario(&name, pipeline, src_element, receiver); +} + +#[test] +fn src_queue_sink_nominal() { + init(); + + let name = "src_queue_sink"; + + let queue = gst::ElementFactory::make("queue", Some("queue")).unwrap(); + let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(queue), None); + + nominal_scenario(&name, pipeline, src_element, receiver); +} + +#[test] +fn src_tsproxy_sink_nominal() { + init(); + + let name = "src_tsproxy_sink"; + + let ts_proxy_sink = gst::ElementFactory::make("ts-proxysink", Some("ts-proxysink")).unwrap(); + ts_proxy_sink + .set_property("proxy-context", &format!("{}_proxy_context", name)) + .unwrap(); + + let ts_proxy_src = gst::ElementFactory::make("ts-proxysrc", Some("ts-proxysrc")).unwrap(); + ts_proxy_src + .set_property("proxy-context", &format!("{}_proxy_context", name)) + .unwrap(); + ts_proxy_src + .set_property("context", &format!("{}_context", name)) + .unwrap(); + ts_proxy_src + .set_property("context-wait", &THROTTLING_DURATION) + .unwrap(); + + let (pipeline, src_element, _sink_element, receiver) = + setup(name, Some(ts_proxy_sink), Some(ts_proxy_src)); + + nominal_scenario(&name, pipeline, src_element, receiver); +}