From 116cf9bd3c3ea2c809e20fd70049c425e0a1a238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Thu, 19 Mar 2020 19:34:51 +0100 Subject: [PATCH] threadshare/*src: rework pause/flush_start/flush_stop This commit fixes several issues with the `Ts*Src` elements. The pause functions used cancel_task which breaks the Task loop at await points. For some elements, this implies making sure no item is being lost. Moreover, cancelling the Task also cancels downstream processing, which makes it difficult to ensure elements can handle all cases. This commit reimplements Task::pause which allows completing the running loop iteration before pausing the loop. See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/277#note_439529 In the Paused state, incoming items were rejected by TsAppSrc and DataQueue. See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/277#note_438455 - FlushStart must engage items rejection and cancel the Task. - FlushStop must purge the internal stream & accept items again. If the task was cancelled, `push_prelude` could set `need_initial_events` to `true` when the events weren't actually pushed yet. TsAppSrc used to renew its internal channel which could cause Buffer loss when transitionning Playing -> Paused -> Playing. See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/issues/98 --- gst-plugin-threadshare/src/appsrc.rs | 274 +++++--- gst-plugin-threadshare/src/dataqueue.rs | 2 +- gst-plugin-threadshare/src/proxy.rs | 9 +- .../src/runtime/executor.rs | 6 + gst-plugin-threadshare/src/runtime/pad.rs | 14 + gst-plugin-threadshare/src/runtime/task.rs | 222 +++++- gst-plugin-threadshare/src/socket.rs | 2 +- gst-plugin-threadshare/src/tcpclientsrc.rs | 152 ++-- gst-plugin-threadshare/src/udpsink.rs | 7 + gst-plugin-threadshare/src/udpsrc.rs | 160 +++-- gst-plugin-threadshare/tests/appsrc.rs | 208 +++++- gst-plugin-threadshare/tests/pad.rs | 651 +++++++++++++++--- gst-plugin-threadshare/tests/pipeline.rs | 48 +- 13 files changed, 1412 insertions(+), 343 deletions(-) diff --git a/gst-plugin-threadshare/src/appsrc.rs b/gst-plugin-threadshare/src/appsrc.rs index eb4e251c..35ab8603 100644 --- a/gst-plugin-threadshare/src/appsrc.rs +++ b/gst-plugin-threadshare/src/appsrc.rs @@ -35,8 +35,8 @@ use lazy_static::lazy_static; use rand; use std::convert::TryInto; +use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::sync::{self, Arc}; use std::u32; use crate::runtime::prelude::*; @@ -138,48 +138,112 @@ enum StreamItem { #[derive(Debug)] struct AppSrcPadHandlerState { need_initial_events: bool, + need_segment: bool, caps: Option, - configured_caps: Option, } impl Default for AppSrcPadHandlerState { fn default() -> Self { AppSrcPadHandlerState { need_initial_events: true, + need_segment: true, caps: None, - configured_caps: None, } } } -#[derive(Debug, Default)] +#[derive(Debug)] struct AppSrcPadHandlerInner { - state: sync::RwLock, + state: FutMutex, + configured_caps: StdMutex>, + receiver: FutMutex>, } -#[derive(Clone, Debug, Default)] +impl AppSrcPadHandlerInner { + fn new(receiver: mpsc::Receiver, caps: Option) -> Self { + AppSrcPadHandlerInner { + state: FutMutex::new(AppSrcPadHandlerState { + caps, + ..Default::default() + }), + configured_caps: StdMutex::new(None), + receiver: FutMutex::new(receiver), + } + } +} + +#[derive(Clone, Debug)] struct AppSrcPadHandler(Arc); impl AppSrcPadHandler { - fn start_task( - &self, - pad: PadSrcRef<'_>, - element: &gst::Element, - receiver: mpsc::Receiver, - ) { + fn new(receiver: mpsc::Receiver, caps: Option) -> AppSrcPadHandler { + AppSrcPadHandler(Arc::new(AppSrcPadHandlerInner::new(receiver, caps))) + } + + fn reset(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler"); + + *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); + *self.0.configured_caps.lock().unwrap() = None; + + self.flush(pad); + + gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset"); + } + + fn flush(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + + // Purge the channel + let mut receiver = self + .0 + .receiver + .try_lock() + .expect("Channel receiver is locked elsewhere"); + loop { + match receiver.try_next() { + Ok(Some(_item)) => { + gst_log!(CAT, obj: pad.gst_pad(), "Dropping pending item"); + } + Err(_) => { + gst_log!(CAT, obj: pad.gst_pad(), "No more pending item"); + break; + } + Ok(None) => { + panic!("Channel sender dropped"); + } + } + } + + self.0 + .state + .try_lock() + .expect("state is locked elsewhere") + .need_segment = true; + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushed"); + } + + fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) { let this = self.clone(); let pad_weak = pad.downgrade(); let element = element.clone(); - let receiver = Arc::new(FutMutex::new(receiver)); + pad.start_task(move || { let this = this.clone(); let pad_weak = pad_weak.clone(); let element = element.clone(); - let receiver = Arc::clone(&receiver); async move { - let item = receiver.lock().await.next().await; + let item = this.0.receiver.lock().await.next().await; let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + let item = match item { Some(item) => item, None => { @@ -219,37 +283,31 @@ impl AppSrcPadHandler { } async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { - let mut events = Vec::new(); - - // Only `read` the state in the hot path - if self.0.state.read().unwrap().need_initial_events { - // We will need to `write` and we also want to prevent - // any changes on the state while we are handling initial events - let mut state = self.0.state.write().unwrap(); - assert!(state.need_initial_events); - + let mut state = self.0.state.lock().await; + if state.need_initial_events { gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - events.push( - gst::Event::new_stream_start(&stream_id) - .group_id(gst::GroupId::next()) - .build(), - ); + let stream_start_evt = gst::Event::new_stream_start(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + pad.push_event(stream_start_evt).await; if let Some(ref caps) = state.caps { - events.push(gst::Event::new_caps(&caps).build()); - state.configured_caps = Some(caps.clone()); + let caps_evt = gst::Event::new_caps(&caps).build(); + pad.push_event(caps_evt).await; + *self.0.configured_caps.lock().unwrap() = Some(caps.clone()); } - events.push( - gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), - ); state.need_initial_events = false; } - for event in events { - pad.push_event(event).await; + if state.need_segment { + let segment_evt = + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(); + pad.push_event(segment_evt).await; + + state.need_segment = false; } } @@ -259,6 +317,8 @@ impl AppSrcPadHandler { element: &gst::Element, item: StreamItem, ) -> Result { + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", item); + self.push_prelude(pad, element).await; match item { @@ -291,13 +351,11 @@ impl PadSrcHandler for AppSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = appsrc.pause(element); - + appsrc.flush_start(element); true } EventView::FlushStop(..) => { appsrc.flush_stop(element); - true } EventView::Reconfigure(..) => true, @@ -335,8 +393,7 @@ impl PadSrcHandler for AppSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let state = self.0.state.read().unwrap(); - let caps = if let Some(ref caps) = state.configured_caps { + let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -362,20 +419,29 @@ impl PadSrcHandler for AppSrcPadHandler { } } +#[derive(Debug, Eq, PartialEq)] +enum AppSrcState { + Paused, + RejectBuffers, + Started, +} + +#[derive(Debug)] struct AppSrc { src_pad: PadSrc, - src_pad_handler: AppSrcPadHandler, + src_pad_handler: StdMutex>, + state: StdMutex, sender: StdMutex>>, settings: StdMutex, } impl AppSrc { fn push_buffer(&self, element: &gst::Element, mut buffer: gst::Buffer) -> bool { - let mut sender = self.sender.lock().unwrap(); - let sender = match sender.as_mut() { - Some(sender) => sender, - None => return false, - }; + let state = self.state.lock().unwrap(); + if *state == AppSrcState::RejectBuffers { + gst_debug!(CAT, obj: element, "Rejecting buffer due to element state"); + return false; + } let do_timestamp = self.settings.lock().unwrap().do_timestamp; if do_timestamp { @@ -392,7 +458,14 @@ impl AppSrc { } } - match sender.try_send(StreamItem::Buffer(buffer)) { + match self + .sender + .lock() + .unwrap() + .as_mut() + .unwrap() + .try_send(StreamItem::Buffer(buffer)) + { Ok(_) => true, Err(err) => { gst_error!(CAT, obj: element, "Failed to queue buffer: {}", err); @@ -407,6 +480,7 @@ impl AppSrc { Some(sender) => sender, None => return false, }; + let eos = StreamItem::Event(gst::Event::new_eos().build()); match sender.try_send(eos) { Ok(_) => true, @@ -421,8 +495,6 @@ impl AppSrc { let settings = self.settings.lock().unwrap(); gst_debug!(CAT, obj: element, "Preparing"); - self.src_pad_handler.0.state.write().unwrap().caps = settings.caps.clone(); - let context = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst_error_msg!( @@ -431,8 +503,14 @@ impl AppSrc { ) })?; + let max_buffers = settings.max_buffers.try_into().unwrap(); + let (sender, receiver) = mpsc::channel(max_buffers); + *self.sender.lock().unwrap() = Some(sender); + + let src_pad_handler = AppSrcPadHandler::new(receiver, settings.caps.clone()); + self.src_pad - .prepare(context, &self.src_pad_handler) + .prepare(context, &src_pad_handler) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -440,6 +518,8 @@ impl AppSrc { ) })?; + *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + gst_debug!(CAT, obj: element, "Prepared"); Ok(()) @@ -449,8 +529,9 @@ impl AppSrc { gst_debug!(CAT, obj: element, "Unpreparing"); let _ = self.src_pad.unprepare(); + *self.src_pad_handler.lock().unwrap() = None; - *self.src_pad_handler.0.state.write().unwrap() = Default::default(); + *self.sender.lock().unwrap() = None; gst_debug!(CAT, obj: element, "Unprepared"); @@ -460,16 +541,18 @@ impl AppSrc { fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Stopping"); + *self.state.lock().unwrap() = AppSrcState::RejectBuffers; + // Now stop the task if it was still running, blocking // until this has actually happened self.src_pad.stop_task(); self.src_pad_handler - .0 - .state - .write() + .lock() .unwrap() - .need_initial_events = true; + .as_ref() + .unwrap() + .reset(&self.src_pad.as_ref()); gst_debug!(CAT, obj: element, "Stopped"); @@ -477,15 +560,15 @@ impl AppSrc { } fn start(&self, element: &gst::Element) -> Result<(), ()> { - let mut sender = self.sender.lock().unwrap(); - if sender.is_some() { + let mut state = self.state.lock().unwrap(); + if *state == AppSrcState::Started { gst_debug!(CAT, obj: element, "Already started"); return Ok(()); } gst_debug!(CAT, obj: element, "Starting"); - self.start_unchecked(element, &mut sender); + self.start_unchecked(element, &mut state); gst_debug!(CAT, obj: element, "Started"); @@ -493,12 +576,10 @@ impl AppSrc { } fn flush_stop(&self, element: &gst::Element) { - // Keep the lock on the `sender` until `flush_stop` is complete + // Keep the lock on the `state` until `flush_stop` is complete // so as to prevent race conditions due to concurrent state transitions. - // Note that this won't deadlock as `sender` is not used - // within the `src_pad`'s `Task`. - let mut sender = self.sender.lock().unwrap(); - if sender.is_some() { + let mut state = self.state.lock().unwrap(); + if *state == AppSrcState::Started { gst_debug!(CAT, obj: element, "Already started"); return; } @@ -506,38 +587,52 @@ impl AppSrc { gst_debug!(CAT, obj: element, "Stopping Flush"); self.src_pad.stop_task(); - self.start_unchecked(element, &mut sender); - - gst_debug!(CAT, obj: element, "Stopped Flush"); - } - - fn start_unchecked( - &self, - element: &gst::Element, - sender: &mut Option>, - ) { - let max_buffers = self - .settings - .lock() - .unwrap() - .max_buffers - .try_into() - .unwrap(); - let (new_sender, receiver) = mpsc::channel(max_buffers); - *sender = Some(new_sender); self.src_pad_handler - .start_task(self.src_pad.as_ref(), element, receiver); + .lock() + .unwrap() + .as_ref() + .unwrap() + .flush(&self.src_pad.as_ref()); + + self.start_unchecked(element, &mut state); + + gst_debug!(CAT, obj: element, "Flush Stopped"); + } + + fn start_unchecked(&self, element: &gst::Element, state: &mut AppSrcState) { + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .start_task(self.src_pad.as_ref(), element); + + *state = AppSrcState::Started; + } + + fn flush_start(&self, element: &gst::Element) { + // Keep the lock on the `state` until `flush_start` is complete + // so as to prevent race conditions due to concurrent state transitions. + let mut state = self.state.lock().unwrap(); + + gst_debug!(CAT, obj: element, "Starting Flush"); + + *state = AppSrcState::RejectBuffers; + self.src_pad.cancel_task(); + + gst_debug!(CAT, obj: element, "Flush Started"); } fn pause(&self, element: &gst::Element) -> Result<(), ()> { - let mut sender = self.sender.lock().unwrap(); + // Lock the state to prevent race condition due to concurrent FlushStop + let mut state = self.state.lock().unwrap(); + gst_debug!(CAT, obj: element, "Pausing"); - self.src_pad.cancel_task(); + self.src_pad.pause_task(); - // Prevent subsequent items from being enqueued - *sender = None; + *state = AppSrcState::Paused; gst_debug!(CAT, obj: element, "Paused"); @@ -616,7 +711,8 @@ impl ObjectSubclass for AppSrc { Self { src_pad, - src_pad_handler: AppSrcPadHandler::default(), + src_pad_handler: StdMutex::new(None), + state: StdMutex::new(AppSrcState::RejectBuffers), sender: StdMutex::new(None), settings: StdMutex::new(Settings::default()), } diff --git a/gst-plugin-threadshare/src/dataqueue.rs b/gst-plugin-threadshare/src/dataqueue.rs index 8434dbd9..9f77d966 100644 --- a/gst-plugin-threadshare/src/dataqueue.rs +++ b/gst-plugin-threadshare/src/dataqueue.rs @@ -185,7 +185,7 @@ impl DataQueue { pub fn push(&self, item: DataQueueItem) -> Result<(), DataQueueItem> { let mut inner = self.0.lock().unwrap(); - if inner.state != DataQueueState::Started { + if inner.state == DataQueueState::Stopped { gst_debug!( DATA_QUEUE_CAT, obj: &inner.element, diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs index 93a525d7..45d7b05e 100644 --- a/gst-plugin-threadshare/src/proxy.rs +++ b/gst-plugin-threadshare/src/proxy.rs @@ -838,7 +838,7 @@ impl ProxySrcPadHandler { let item = match item { Some(item) => item, None => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "DataQueue Stopped"); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "DataQueue Stopped or Paused"); return glib::Continue(false); } }; @@ -1109,6 +1109,7 @@ impl ProxySrc { self.src_pad.stop_task(); let dataqueue = dataqueue.as_ref().unwrap(); + dataqueue.clear(); dataqueue.stop(); gst_debug!(SRC_CAT, obj: element, "Stopped"); @@ -1171,11 +1172,9 @@ impl ProxySrc { let dataqueue = self.dataqueue.lock().unwrap(); gst_debug!(SRC_CAT, obj: element, "Pausing"); - self.src_pad.cancel_task(); + dataqueue.as_ref().unwrap().pause(); - let dataqueue = dataqueue.as_ref().unwrap(); - dataqueue.pause(); - dataqueue.clear(); + self.src_pad.pause_task(); gst_debug!(SRC_CAT, obj: element, "Paused"); diff --git a/gst-plugin-threadshare/src/runtime/executor.rs b/gst-plugin-threadshare/src/runtime/executor.rs index 2310485e..9ee0ce61 100644 --- a/gst-plugin-threadshare/src/runtime/executor.rs +++ b/gst-plugin-threadshare/src/runtime/executor.rs @@ -157,6 +157,12 @@ pub fn block_on(future: Fut) -> Fut::Output { }) } +/// Yields execution back to the runtime +#[inline] +pub async fn yield_now() { + tokio::task::yield_now().await; +} + struct ContextThread { name: String, } diff --git a/gst-plugin-threadshare/src/runtime/pad.rs b/gst-plugin-threadshare/src/runtime/pad.rs index 63ac597b..20216a5b 100644 --- a/gst-plugin-threadshare/src/runtime/pad.rs +++ b/gst-plugin-threadshare/src/runtime/pad.rs @@ -343,6 +343,11 @@ impl<'a> PadSrcRef<'a> { self.strong.start_task(func); } + /// Pauses the `Started` `Pad` `Task`. + pub fn pause_task(&self) { + self.strong.pause_task(); + } + /// Cancels the `Started` `Pad` `Task`. pub fn cancel_task(&self) { self.strong.cancel_task(); @@ -473,6 +478,11 @@ impl PadSrcStrong { self.0.task.start(func); } + #[inline] + fn pause_task(&self) { + self.0.task.pause(); + } + #[inline] fn cancel_task(&self) { self.0.task.cancel(); @@ -722,6 +732,10 @@ impl PadSrc { self.0.start_task(func); } + pub fn pause_task(&self) { + self.0.pause_task(); + } + pub fn cancel_task(&self) { self.0.cancel_task(); } diff --git a/gst-plugin-threadshare/src/runtime/task.rs b/gst-plugin-threadshare/src/runtime/task.rs index ad4dd0a1..62fe7ba7 100644 --- a/gst-plugin-threadshare/src/runtime/task.rs +++ b/gst-plugin-threadshare/src/runtime/task.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019 François Laignel +// Copyright (C) 2019-2020 François Laignel // Copyright (C) 2020 Sebastian Dröge // // This library is free software; you can redistribute it and/or @@ -18,18 +18,29 @@ //! An execution loop to run asynchronous processing. +use futures::channel::oneshot; use futures::future::{abortable, AbortHandle, Aborted}; use futures::prelude::*; -use gst::TaskState; -use gst::{gst_debug, gst_log, gst_trace, gst_warning}; +use gst::{gst_debug, gst_error, gst_log, gst_trace, gst_warning}; use std::fmt; use std::sync::{Arc, Mutex}; -use super::executor::block_on; +use super::executor::{block_on, yield_now}; use super::{Context, JoinHandle, RUNTIME_CAT}; +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] +pub enum TaskState { + Cancelled, + Started, + Stopped, + Paused, + Pausing, + Preparing, + Unprepared, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub enum TaskError { ActiveTask, @@ -53,27 +64,27 @@ struct TaskInner { prepare_abort_handle: Option, abort_handle: Option, loop_handle: Option>>, + resume_sender: Option>, } impl Default for TaskInner { fn default() -> Self { TaskInner { context: None, - state: TaskState::Stopped, + state: TaskState::Unprepared, prepare_handle: None, prepare_abort_handle: None, abort_handle: None, loop_handle: None, + resume_sender: None, } } } impl Drop for TaskInner { fn drop(&mut self) { - // Check invariant which can't be held automatically in `Task` - // because `drop` can't be `async` - if self.state != TaskState::Stopped { - panic!("Missing call to `Task::stop`"); + if self.state != TaskState::Unprepared { + panic!("Missing call to `Task::unprepared`"); } } } @@ -103,7 +114,7 @@ impl Task { gst_debug!(RUNTIME_CAT, "Preparing task"); let mut inner = self.0.lock().unwrap(); - if inner.state != TaskState::Stopped { + if inner.state != TaskState::Unprepared { return Err(TaskError::ActiveTask); } @@ -138,6 +149,7 @@ impl Task { inner.context = Some(context); + inner.state = TaskState::Preparing; gst_debug!(RUNTIME_CAT, "Task prepared"); Ok(()) @@ -147,7 +159,7 @@ impl Task { gst_debug!(RUNTIME_CAT, "Preparing task"); let mut inner = self.0.lock().unwrap(); - if inner.state != TaskState::Stopped { + if inner.state != TaskState::Unprepared { return Err(TaskError::ActiveTask); } @@ -156,19 +168,25 @@ impl Task { inner.context = Some(context); + inner.state = TaskState::Stopped; gst_debug!(RUNTIME_CAT, "Task prepared"); Ok(()) } pub fn unprepare(&self) -> Result<(), TaskError> { - gst_debug!(RUNTIME_CAT, "Unpreparing task"); - let mut inner = self.0.lock().unwrap(); if inner.state != TaskState::Stopped { + gst_error!( + RUNTIME_CAT, + "Attempt to Unprepare a task in state {:?}", + inner.state + ); return Err(TaskError::ActiveTask); } + gst_debug!(RUNTIME_CAT, "Unpreparing task"); + // Abort any pending preparation if let Some(abort_handle) = inner.prepare_abort_handle.take() { abort_handle.abort(); @@ -176,6 +194,9 @@ impl Task { let prepare_handle = inner.prepare_handle.take(); let context = inner.context.take().unwrap(); + + inner.state = TaskState::Unprepared; + drop(inner); if let Some(prepare_handle) = prepare_handle { @@ -251,8 +272,27 @@ impl Task { gst_log!(RUNTIME_CAT, "Task already Started"); return; } - TaskState::Paused | TaskState::Stopped => (), - other => unreachable!("Unexpected Task state {:?}", other), + TaskState::Pausing => { + gst_debug!(RUNTIME_CAT, "Re-starting a Pausing task"); + + assert!(inner.resume_sender.is_none()); + + inner.state = TaskState::Started; + return; + } + TaskState::Paused => { + inner + .resume_sender + .take() + .expect("Task Paused but the resume_sender is already taken") + .send(()) + .expect("Task Paused but the resume_receiver was dropped"); + + gst_log!(RUNTIME_CAT, "Resume requested"); + return; + } + TaskState::Stopped | TaskState::Cancelled | TaskState::Preparing => (), + TaskState::Unprepared => panic!("Attempt to start an unprepared Task"), } gst_debug!(RUNTIME_CAT, "Starting Task"); @@ -280,21 +320,50 @@ impl Task { let res = prepare_handle.await; if res.is_err() { gst_warning!(RUNTIME_CAT, "Preparing failed"); + inner_clone.lock().unwrap().state = TaskState::Unprepared; + return; } + + inner_clone.lock().unwrap().state = TaskState::Stopped; } gst_trace!(RUNTIME_CAT, "Starting task loop"); // Then loop as long as we're actually running loop { - match inner_clone.lock().unwrap().state { - TaskState::Started => (), - TaskState::Paused | TaskState::Stopped => { - gst_trace!(RUNTIME_CAT, "Stopping task loop"); - break; + let mut resume_receiver = { + let mut inner = inner_clone.lock().unwrap(); + match inner.state { + TaskState::Started => None, + TaskState::Pausing => { + let (sender, receiver) = oneshot::channel(); + inner.resume_sender = Some(sender); + + inner.state = TaskState::Paused; + + Some(receiver) + } + TaskState::Stopped | TaskState::Cancelled => { + gst_trace!(RUNTIME_CAT, "Stopping task loop"); + break; + } + TaskState::Paused => { + unreachable!("The Paused state is controlled by the loop"); + } + other => { + unreachable!("Task loop iteration in state {:?}", other); + } } - other => unreachable!("Unexpected Task state {:?}", other), + }; + + if let Some(resume_receiver) = resume_receiver.take() { + gst_trace!(RUNTIME_CAT, "Task loop paused"); + + let _ = resume_receiver.await; + + gst_trace!(RUNTIME_CAT, "Resuming task loop"); + inner_clone.lock().unwrap().state = TaskState::Started; } if func().await == glib::Continue(false) { @@ -309,12 +378,15 @@ impl Task { .map(|h| h.task_id() == task_id) .unwrap_or(false) { - gst_trace!(RUNTIME_CAT, "Pausing task loop"); - inner.state = TaskState::Paused; + gst_trace!(RUNTIME_CAT, "Exiting task loop"); + inner.state = TaskState::Cancelled; } break; } + + // Make sure the loop can be aborted even if `func` never goes `Pending`. + yield_now().await; } // Once the loop function is finished we can forget the corresponding @@ -332,6 +404,7 @@ impl Task { { inner.abort_handle = None; inner.loop_handle = None; + inner.state = TaskState::Stopped; } } @@ -351,11 +424,29 @@ impl Task { gst_debug!(RUNTIME_CAT, "Task Started"); } + /// Requests the `Task` loop to pause. + /// + /// If an iteration is in progress, it will run to completion, + /// then no more iteration will be executed before `start` is called again. + pub fn pause(&self) { + let mut inner = self.0.lock().unwrap(); + if inner.state != TaskState::Started { + gst_log!(RUNTIME_CAT, "Task not started"); + return; + } + + inner.state = TaskState::Pausing; + gst_debug!(RUNTIME_CAT, "Pause requested"); + } + /// Cancels the `Task` so that it stops running as soon as possible. pub fn cancel(&self) { let mut inner = self.0.lock().unwrap(); - if inner.state != TaskState::Started { - gst_log!(RUNTIME_CAT, "Task already paused or stopped"); + if inner.state != TaskState::Started + && inner.state != TaskState::Paused + && inner.state != TaskState::Pausing + { + gst_log!(RUNTIME_CAT, "Task not Started nor Paused"); return; } @@ -366,14 +457,16 @@ impl Task { abort_handle.abort(); } - inner.state = TaskState::Paused; + inner.resume_sender = None; + + inner.state = TaskState::Cancelled; } /// Stops the `Started` `Task` and wait for it to finish. pub fn stop(&self) { let mut inner = self.0.lock().unwrap(); - if inner.state == TaskState::Stopped { - gst_log!(RUNTIME_CAT, "Task already stopped"); + if inner.state == TaskState::Stopped || inner.state == TaskState::Preparing { + gst_log!(RUNTIME_CAT, "Task loop already stopped"); return; } @@ -388,6 +481,9 @@ impl Task { // And now wait for it to actually stop let loop_handle = inner.loop_handle.take(); + + inner.resume_sender = None; + let context = inner.context.as_ref().unwrap().clone(); drop(inner); @@ -550,4 +646,74 @@ mod tests { task.unprepare().unwrap(); gst_debug!(RUNTIME_CAT, "task test: unprepared"); } + + #[tokio::test] + async fn pause_start() { + use gst::gst_error; + + gst::init().unwrap(); + + let context = Context::acquire("task_pause_start", 2).unwrap(); + + let task = Task::default(); + task.prepare(context).unwrap(); + + let (iter_sender, mut iter_receiver) = mpsc::channel(0); + let iter_sender = Arc::new(Mutex::new(iter_sender)); + + let (mut complete_sender, complete_receiver) = mpsc::channel(0); + let complete_receiver = Arc::new(Mutex::new(complete_receiver)); + + gst_debug!(RUNTIME_CAT, "task_pause_start: starting"); + task.start(move || { + let iter_sender = Arc::clone(&iter_sender); + let complete_receiver = Arc::clone(&complete_receiver); + async move { + gst_debug!(RUNTIME_CAT, "task_pause_start: entering iteration"); + iter_sender.lock().await.send(()).await.unwrap(); + + gst_debug!( + RUNTIME_CAT, + "task_pause_start: iteration awaiting completion" + ); + complete_receiver.lock().await.next().await.unwrap(); + gst_debug!(RUNTIME_CAT, "task_pause_start: iteration complete"); + glib::Continue(true) + } + }); + + gst_debug!(RUNTIME_CAT, "task_pause_start: awaiting 1st iteration"); + iter_receiver.next().await.unwrap(); + + task.pause(); + + gst_debug!( + RUNTIME_CAT, + "task_pause_start: sending 1st iteration completion" + ); + complete_sender.send(()).await.unwrap(); + + // Loop held on + iter_receiver.try_next().unwrap_err(); + + task.start(|| { + gst_error!( + RUNTIME_CAT, + "task_pause_start: reached start to resume closure" + ); + future::pending() + }); + + gst_debug!(RUNTIME_CAT, "task_pause_start: awaiting 2d iteration"); + iter_receiver.next().await.unwrap(); + + gst_debug!( + RUNTIME_CAT, + "task_pause_start: sending 2d iteration completion" + ); + complete_sender.send(()).await.unwrap(); + + task.stop(); + task.unprepare().unwrap(); + } } diff --git a/gst-plugin-threadshare/src/socket.rs b/gst-plugin-threadshare/src/socket.rs index 54bb80b8..790e977b 100644 --- a/gst-plugin-threadshare/src/socket.rs +++ b/gst-plugin-threadshare/src/socket.rs @@ -246,7 +246,7 @@ impl SocketStream { let (read_fut, clock, base_time) = { let mut inner = self.socket.0.lock().unwrap(); if inner.state != SocketState::Started { - gst_debug!(SOCKET_CAT, obj: &inner.element, "DataQueue is not Started"); + gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket is not Started"); return None; } diff --git a/gst-plugin-threadshare/src/tcpclientsrc.rs b/gst-plugin-threadshare/src/tcpclientsrc.rs index ce68ce37..dfda2438 100644 --- a/gst-plugin-threadshare/src/tcpclientsrc.rs +++ b/gst-plugin-threadshare/src/tcpclientsrc.rs @@ -37,8 +37,8 @@ use rand; use std::io; use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::sync::{self, Arc}; use std::u16; use tokio::io::AsyncReadExt; @@ -177,29 +177,73 @@ impl SocketRead for TcpClientReader { #[derive(Debug)] struct TcpClientSrcPadHandlerState { need_initial_events: bool, + need_segment: bool, caps: Option, - configured_caps: Option, } impl Default for TcpClientSrcPadHandlerState { fn default() -> Self { TcpClientSrcPadHandlerState { need_initial_events: true, + need_segment: true, caps: None, - configured_caps: None, } } } -#[derive(Debug, Default)] +#[derive(Debug)] struct TcpClientSrcPadHandlerInner { - state: sync::RwLock, + state: FutMutex, + configured_caps: StdMutex>, } -#[derive(Clone, Debug, Default)] +impl TcpClientSrcPadHandlerInner { + fn new(caps: Option) -> Self { + TcpClientSrcPadHandlerInner { + state: FutMutex::new(TcpClientSrcPadHandlerState { + caps, + ..Default::default() + }), + configured_caps: StdMutex::new(None), + } + } +} + +#[derive(Clone, Debug)] struct TcpClientSrcPadHandler(Arc); impl TcpClientSrcPadHandler { + fn new(caps: Option) -> Self { + TcpClientSrcPadHandler(Arc::new(TcpClientSrcPadHandlerInner::new(caps))) + } + + fn reset(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler"); + + *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); + *self.0.configured_caps.lock().unwrap() = None; + + gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset"); + } + + fn flush(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + + self.0 + .state + .try_lock() + .expect("state is locked elsewhere") + .need_segment = true; + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushed"); + } + fn start_task( &self, pad: PadSrcRef<'_>, @@ -284,37 +328,31 @@ impl TcpClientSrcPadHandler { } async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { - let mut events = Vec::new(); - - // Only `read` the state in the hot path - if self.0.state.read().unwrap().need_initial_events { - // We will need to `write` and we also want to prevent - // any changes on the state while we are handling initial events - let mut state = self.0.state.write().unwrap(); - assert!(state.need_initial_events); - + let mut state = self.0.state.lock().await; + if state.need_initial_events { gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - events.push( - gst::Event::new_stream_start(&stream_id) - .group_id(gst::GroupId::next()) - .build(), - ); + let stream_start_evt = gst::Event::new_stream_start(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + pad.push_event(stream_start_evt).await; if let Some(ref caps) = state.caps { - events.push(gst::Event::new_caps(&caps).build()); - state.configured_caps = Some(caps.clone()); + let caps_evt = gst::Event::new_caps(&caps).build(); + pad.push_event(caps_evt).await; + *self.0.configured_caps.lock().unwrap() = Some(caps.clone()); } - events.push( - gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), - ); state.need_initial_events = false; } - for event in events { - pad.push_event(event).await; + if state.need_segment { + let segment_evt = + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(); + pad.push_event(segment_evt).await; + + state.need_segment = false; } } @@ -324,6 +362,8 @@ impl TcpClientSrcPadHandler { element: &gst::Element, buffer: gst::Buffer, ) -> Result { + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); + self.push_prelude(pad, element).await; if buffer.get_size() == 0 { @@ -352,7 +392,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - tcpclientsrc.pause(element).unwrap(); + tcpclientsrc.flush_start(element); true } @@ -396,8 +436,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let state = self.0.state.read().unwrap(); - let caps = if let Some(ref caps) = state.configured_caps { + let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -426,7 +465,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { struct TcpClientSrc { src_pad: PadSrc, - src_pad_handler: TcpClientSrcPadHandler, + src_pad_handler: StdMutex>, socket: StdMutex>>, settings: StdMutex, } @@ -499,16 +538,13 @@ impl TcpClientSrc { ) })?; - { - let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap(); - src_pad_handler_state.caps = settings.caps; - } - *socket_storage = Some(socket); drop(socket_storage); + let src_pad_handler = TcpClientSrcPadHandler::new(settings.caps); + self.src_pad - .prepare(context, &self.src_pad_handler) + .prepare(context, &src_pad_handler) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -516,6 +552,8 @@ impl TcpClientSrc { ) })?; + *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + gst_debug!(CAT, obj: element, "Prepared"); Ok(()) @@ -529,8 +567,7 @@ impl TcpClientSrc { } let _ = self.src_pad.unprepare(); - - *self.src_pad_handler.0.state.write().unwrap() = Default::default(); + *self.src_pad_handler.lock().unwrap() = None; gst_debug!(CAT, obj: element, "Unprepared"); @@ -545,11 +582,11 @@ impl TcpClientSrc { self.src_pad.stop_task(); self.src_pad_handler - .0 - .state - .write() + .lock() .unwrap() - .need_initial_events = true; + .as_ref() + .unwrap() + .reset(&self.src_pad.as_ref()); gst_debug!(CAT, obj: element, "Stopped"); @@ -591,6 +628,14 @@ impl TcpClientSrc { gst_debug!(CAT, obj: element, "Stopping Flush"); self.src_pad.stop_task(); + + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .flush(&self.src_pad.as_ref()); + self.start_unchecked(element, socket); gst_debug!(CAT, obj: element, "Stopped Flush"); @@ -602,9 +647,26 @@ impl TcpClientSrc { .unwrap(); self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() .start_task(self.src_pad.as_ref(), element, socket_stream); } + fn flush_start(&self, element: &gst::Element) { + let socket = self.socket.lock().unwrap(); + gst_debug!(CAT, obj: element, "Starting Flush"); + + if let Some(socket) = socket.as_ref() { + socket.pause(); + } + + self.src_pad.cancel_task(); + + gst_debug!(CAT, obj: element, "Flush Started"); + } + fn pause(&self, element: &gst::Element) -> Result<(), ()> { let socket = self.socket.lock().unwrap(); gst_debug!(CAT, obj: element, "Pausing"); @@ -613,7 +675,7 @@ impl TcpClientSrc { socket.pause(); } - self.src_pad.cancel_task(); + self.src_pad.pause_task(); gst_debug!(CAT, obj: element, "Paused"); @@ -656,7 +718,7 @@ impl ObjectSubclass for TcpClientSrc { Self { src_pad, - src_pad_handler: TcpClientSrcPadHandler::default(), + src_pad_handler: StdMutex::new(None), socket: StdMutex::new(None), settings: StdMutex::new(Settings::default()), } diff --git a/gst-plugin-threadshare/src/udpsink.rs b/gst-plugin-threadshare/src/udpsink.rs index 785ef2cf..bb96ce68 100644 --- a/gst-plugin-threadshare/src/udpsink.rs +++ b/gst-plugin-threadshare/src/udpsink.rs @@ -649,6 +649,12 @@ impl UdpSinkPadHandler { } } + fn unprepare(&self) { + if let Some(task) = &self.0.read().unwrap().task { + task.unprepare().unwrap(); + } + } + fn stop_task(&self) { if let Some(task) = &self.0.read().unwrap().task { task.stop(); @@ -959,6 +965,7 @@ impl UdpSink { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Unpreparing"); + self.sink_pad_handler.unprepare(); self.sink_pad.unprepare(); gst_debug!(CAT, obj: element, "Unprepared"); diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index a241deae..6fa58357 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -39,8 +39,8 @@ use rand; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::sync::{self, Arc}; use std::u16; use crate::runtime::prelude::*; @@ -230,8 +230,8 @@ impl SocketRead for UdpReader { struct UdpSrcPadHandlerState { retrieve_sender_address: bool, need_initial_events: bool, + need_segment: bool, caps: Option, - configured_caps: Option, } impl Default for UdpSrcPadHandlerState { @@ -239,21 +239,69 @@ impl Default for UdpSrcPadHandlerState { UdpSrcPadHandlerState { retrieve_sender_address: true, need_initial_events: true, + need_segment: true, caps: None, - configured_caps: None, } } } -#[derive(Debug, Default)] +#[derive(Debug)] struct UdpSrcPadHandlerInner { - state: sync::RwLock, + state: FutMutex, + configured_caps: StdMutex>, } -#[derive(Clone, Debug, Default)] +impl UdpSrcPadHandlerInner { + fn new(caps: Option, retrieve_sender_address: bool) -> Self { + UdpSrcPadHandlerInner { + state: FutMutex::new(UdpSrcPadHandlerState { + retrieve_sender_address, + caps, + ..Default::default() + }), + configured_caps: StdMutex::new(None), + } + } +} + +#[derive(Clone, Debug)] struct UdpSrcPadHandler(Arc); impl UdpSrcPadHandler { + fn new(caps: Option, retrieve_sender_address: bool) -> UdpSrcPadHandler { + UdpSrcPadHandler(Arc::new(UdpSrcPadHandlerInner::new( + caps, + retrieve_sender_address, + ))) + } + + fn reset(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler"); + + *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); + *self.0.configured_caps.lock().unwrap() = None; + + gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset"); + } + + fn flush(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + + self.0 + .state + .try_lock() + .expect("state is locked elsewhere") + .need_segment = true; + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushed"); + } + fn start_task( &self, pad: PadSrcRef<'_>, @@ -306,7 +354,7 @@ impl UdpSrcPadHandler { }; if let Some(saddr) = saddr { - if this.0.state.read().unwrap().retrieve_sender_address { + if this.0.state.lock().await.retrieve_sender_address { let inet_addr = match saddr.ip() { IpAddr::V4(ip) => gio::InetAddress::new_from_bytes( gio::InetAddressBytes::V4(&ip.octets()), @@ -354,37 +402,31 @@ impl UdpSrcPadHandler { } async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { - let mut events = Vec::new(); - - // Only `read` the state in the hot path - if self.0.state.read().unwrap().need_initial_events { - // We will need to `write` and we also want to prevent - // any changes on the state while we are handling initial events - let mut state = self.0.state.write().unwrap(); - assert!(state.need_initial_events); - + let mut state = self.0.state.lock().await; + if state.need_initial_events { gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - events.push( - gst::Event::new_stream_start(&stream_id) - .group_id(gst::GroupId::next()) - .build(), - ); + let stream_start_evt = gst::Event::new_stream_start(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + pad.push_event(stream_start_evt).await; if let Some(ref caps) = state.caps { - events.push(gst::Event::new_caps(&caps).build()); - state.configured_caps = Some(caps.clone()); + let caps_evt = gst::Event::new_caps(&caps).build(); + pad.push_event(caps_evt).await; + *self.0.configured_caps.lock().unwrap() = Some(caps.clone()); } - events.push( - gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), - ); state.need_initial_events = false; } - for event in events { - pad.push_event(event).await; + if state.need_segment { + let segment_evt = + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(); + pad.push_event(segment_evt).await; + + state.need_segment = false; } } @@ -394,6 +436,8 @@ impl UdpSrcPadHandler { element: &gst::Element, buffer: gst::Buffer, ) -> Result { + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); + self.push_prelude(pad, element).await; pad.push(buffer).await @@ -416,7 +460,7 @@ impl PadSrcHandler for UdpSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - udpsrc.pause(element).unwrap(); + udpsrc.flush_start(element); true } @@ -461,8 +505,7 @@ impl PadSrcHandler for UdpSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let state = self.0.state.read().unwrap(); - let caps = if let Some(ref caps) = state.configured_caps { + let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -491,7 +534,7 @@ impl PadSrcHandler for UdpSrcPadHandler { struct UdpSrc { src_pad: PadSrc, - src_pad_handler: UdpSrcPadHandler, + src_pad_handler: StdMutex>, socket: StdMutex>>, settings: StdMutex, } @@ -686,19 +729,16 @@ impl UdpSrc { ) })?; - { - let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap(); - src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address; - src_pad_handler_state.caps = settings.caps; - } - *socket_storage = Some(socket); drop(socket_storage); element.notify("used-socket"); + let src_pad_handler = + UdpSrcPadHandler::new(settings.caps, settings.retrieve_sender_address); + self.src_pad - .prepare(context, &self.src_pad_handler) + .prepare(context, &src_pad_handler) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -706,6 +746,8 @@ impl UdpSrc { ) })?; + *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + gst_debug!(CAT, obj: element, "Prepared"); Ok(()) @@ -722,8 +764,7 @@ impl UdpSrc { } let _ = self.src_pad.unprepare(); - - *self.src_pad_handler.0.state.write().unwrap() = Default::default(); + *self.src_pad_handler.lock().unwrap() = None; gst_debug!(CAT, obj: element, "Unprepared"); @@ -738,11 +779,11 @@ impl UdpSrc { self.src_pad.stop_task(); self.src_pad_handler - .0 - .state - .write() + .lock() .unwrap() - .need_initial_events = true; + .as_ref() + .unwrap() + .reset(&self.src_pad.as_ref()); gst_debug!(CAT, obj: element, "Stopped"); @@ -784,6 +825,14 @@ impl UdpSrc { gst_debug!(CAT, obj: element, "Stopping Flush"); self.src_pad.stop_task(); + + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .flush(&self.src_pad.as_ref()); + self.start_unchecked(element, socket); gst_debug!(CAT, obj: element, "Stopped Flush"); @@ -795,9 +844,26 @@ impl UdpSrc { .unwrap(); self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() .start_task(self.src_pad.as_ref(), element, socket_stream); } + fn flush_start(&self, element: &gst::Element) { + let socket = self.socket.lock().unwrap(); + gst_debug!(CAT, obj: element, "Starting Flush"); + + if let Some(socket) = socket.as_ref() { + socket.pause(); + } + + self.src_pad.cancel_task(); + + gst_debug!(CAT, obj: element, "Flush Started"); + } + fn pause(&self, element: &gst::Element) -> Result<(), ()> { let socket = self.socket.lock().unwrap(); gst_debug!(CAT, obj: element, "Pausing"); @@ -806,7 +872,7 @@ impl UdpSrc { socket.pause(); } - self.src_pad.cancel_task(); + self.src_pad.pause_task(); gst_debug!(CAT, obj: element, "Paused"); @@ -865,7 +931,7 @@ impl ObjectSubclass for UdpSrc { Self { src_pad, - src_pad_handler: UdpSrcPadHandler::default(), + src_pad_handler: StdMutex::new(None), socket: StdMutex::new(None), settings: StdMutex::new(Settings::default()), } diff --git a/gst-plugin-threadshare/tests/appsrc.rs b/gst-plugin-threadshare/tests/appsrc.rs index cfe785f0..8610fcb4 100644 --- a/gst-plugin-threadshare/tests/appsrc.rs +++ b/gst-plugin-threadshare/tests/appsrc.rs @@ -18,6 +18,8 @@ use glib::prelude::*; use gst; +use gst::prelude::*; + use gst_check; use gstthreadshare; @@ -33,7 +35,7 @@ fn init() { } #[test] -fn test_push() { +fn push() { init(); let mut h = gst_check::Harness::new("ts-appsrc"); @@ -43,7 +45,7 @@ fn test_push() { let appsrc = h.get_element().unwrap(); appsrc.set_property("caps", &caps).unwrap(); appsrc.set_property("do-timestamp", &true).unwrap(); - appsrc.set_property("context", &"test-push").unwrap(); + appsrc.set_property("context", &"appsrc-push").unwrap(); } h.play(); @@ -97,3 +99,205 @@ fn test_push() { } assert!(n_events >= 2); } + +#[test] +fn pause() { + init(); + + let mut h = gst_check::Harness::new("ts-appsrc"); + + let caps = gst::Caps::new_simple("foo/bar", &[]); + { + let appsrc = h.get_element().unwrap(); + appsrc.set_property("caps", &caps).unwrap(); + appsrc.set_property("do-timestamp", &true).unwrap(); + appsrc.set_property("context", &"appsrc-pause").unwrap(); + } + + h.play(); + + let appsrc = h.get_element().unwrap(); + + // Initial buffer + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![1, 2, 3, 4])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + + appsrc + .change_state(gst::StateChange::PlayingToPaused) + .unwrap(); + + // Pre-pause buffer + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![5, 6, 7])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + appsrc + .change_state(gst::StateChange::PlayingToPaused) + .unwrap(); + + // Buffer is queued during Paused + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![8, 9])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + appsrc + .change_state(gst::StateChange::PausedToPlaying) + .unwrap(); + + // Pull Pre-pause buffer + let _ = h.pull().unwrap(); + + // Pull buffer queued while Paused + let _ = h.pull().unwrap(); + + // Can push again + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + assert!(h.try_pull().is_none()); +} + +#[test] +fn flush() { + init(); + + let mut h = gst_check::Harness::new("ts-appsrc"); + + let caps = gst::Caps::new_simple("foo/bar", &[]); + { + let appsrc = h.get_element().unwrap(); + appsrc.set_property("caps", &caps).unwrap(); + appsrc.set_property("do-timestamp", &true).unwrap(); + appsrc.set_property("context", &"appsrc-flush").unwrap(); + } + + h.play(); + + let appsrc = h.get_element().unwrap(); + + // Initial buffer + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![1, 2, 3, 4])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + + // FlushStart + assert!(h.push_upstream_event(gst::Event::new_flush_start().build())); + + // Can't push buffer while flushing + assert!(!appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + assert!(h.try_pull().is_none()); + + // FlushStop + assert!(h.push_upstream_event(gst::Event::new_flush_stop(true).build())); + + // No buffer available due to flush + assert!(h.try_pull().is_none()); + + // Can push again + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + assert!(h.try_pull().is_none()); +} + +#[test] +fn pause_flush() { + init(); + + let mut h = gst_check::Harness::new("ts-appsrc"); + + let caps = gst::Caps::new_simple("foo/bar", &[]); + { + let appsrc = h.get_element().unwrap(); + appsrc.set_property("caps", &caps).unwrap(); + appsrc.set_property("do-timestamp", &true).unwrap(); + appsrc + .set_property("context", &"appsrc-pause_flush") + .unwrap(); + } + + h.play(); + + let appsrc = h.get_element().unwrap(); + + // Initial buffer + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![1, 2, 3, 4])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + + appsrc + .change_state(gst::StateChange::PlayingToPaused) + .unwrap(); + + // FlushStart + assert!(h.push_upstream_event(gst::Event::new_flush_start().build())); + + // Can't push buffer while flushing + assert!(!appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + assert!(h.try_pull().is_none()); + + // FlushStop + assert!(h.push_upstream_event(gst::Event::new_flush_stop(true).build())); + + appsrc + .change_state(gst::StateChange::PausedToPlaying) + .unwrap(); + + // No buffer available due to flush + assert!(h.try_pull().is_none()); + + // Can push again + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + assert!(h.try_pull().is_none()); +} diff --git a/gst-plugin-threadshare/tests/pad.rs b/gst-plugin-threadshare/tests/pad.rs index 63d6a93c..ae06dddf 100644 --- a/gst-plugin-threadshare/tests/pad.rs +++ b/gst-plugin-threadshare/tests/pad.rs @@ -29,13 +29,14 @@ use gst; use gst::prelude::*; use gst::subclass::prelude::*; use gst::EventView; -use gst::{gst_debug, gst_error_msg, gst_log}; +use gst::{gst_debug, gst_error_msg, gst_info, gst_log}; use lazy_static::lazy_static; use std::boxed::Box; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use std::sync::Mutex as StdMutex; use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; @@ -79,36 +80,93 @@ lazy_static! { ); } -#[derive(Clone, Debug, Default)] -struct PadSrcHandlerTest; +#[derive(Debug)] +struct PadSrcHandlerTestInner { + receiver: FutMutex>, +} + +impl PadSrcHandlerTestInner { + fn new(receiver: mpsc::Receiver) -> PadSrcHandlerTestInner { + PadSrcHandlerTestInner { + receiver: FutMutex::new(receiver), + } + } +} + +#[derive(Clone, Debug)] +struct PadSrcHandlerTest(Arc); impl PadSrcHandlerTest { - fn start_task(&self, pad: PadSrcRef<'_>, receiver: mpsc::Receiver) { + fn new(receiver: mpsc::Receiver) -> PadSrcHandlerTest { + PadSrcHandlerTest(Arc::new(PadSrcHandlerTestInner::new(receiver))) + } + + fn stop(&self, pad: &PadSrcRef<'_>) { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Stopping handler"); + + pad.stop_task(); + // From here on, the task is stopped so it can't hold resources anymore + + self.flush(pad); + + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handler stopped"); + } + + fn flush(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing"); + + // Purge the channel + let mut receiver = self + .0 + .receiver + .try_lock() + .expect("Channel receiver is locked elsewhere"); + loop { + match receiver.try_next() { + Ok(Some(_item)) => { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Dropping pending item"); + } + Err(_) => { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "No more pending item"); + break; + } + Ok(None) => { + panic!("Channel sender dropped"); + } + } + } + + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushed"); + } + + fn start_task(&self, pad: PadSrcRef<'_>) { gst_debug!(SRC_CAT, obj: pad.gst_pad(), "SrcPad task starting"); + + let this = self.clone(); let pad_weak = pad.downgrade(); - let receiver = Arc::new(FutMutex::new(receiver)); + pad.start_task(move || { let pad_weak = pad_weak.clone(); - let receiver = Arc::clone(&receiver); + let this = this.clone(); + async move { + let item = this.0.receiver.lock().await.next().await; + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - let item = { - let mut receiver = receiver.lock().await; - - match receiver.next().await { - Some(item) => item, - None => { - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "SrcPad channel aborted"); - return glib::Continue(false); - } + let item = match item { + Some(item) => item, + None => { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "SrcPad channel aborted"); + return glib::Continue(false); } }; - // We could also check here first if we're flushing but as we're not doing anything - // complicated below we can just defer that to the pushing function - - match Self::push_item(pad, item).await { + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + match this.push_item(pad, item).await { Ok(_) => glib::Continue(true), Err(gst::FlowError::Flushing) => glib::Continue(false), Err(err) => panic!("Got error {:?}", err), @@ -117,7 +175,13 @@ impl PadSrcHandlerTest { }); } - async fn push_item(pad: PadSrcRef<'_>, item: Item) -> Result { + async fn push_item( + self, + pad: PadSrcRef<'_>, + item: Item, + ) -> Result { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item); + match item { Item::Event(event) => { pad.push_event(event).await; @@ -144,9 +208,7 @@ impl PadSrcHandler for PadSrcHandlerTest { let ret = match event.view() { EventView::FlushStart(..) => { - // Cancel the task so that it finishes ASAP - // and clear the sender - elem_src_test.pause(element).unwrap(); + elem_src_test.flush_start(element); true } EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true, @@ -167,16 +229,34 @@ impl PadSrcHandler for PadSrcHandlerTest { } } +#[derive(Debug, Eq, PartialEq)] +enum ElementSrcTestState { + Paused, + RejectItems, + Started, +} + #[derive(Debug)] struct ElementSrcTest { src_pad: PadSrc, - src_pad_handler: PadSrcHandlerTest, - sender: Mutex>>, - settings: Mutex, + src_pad_handler: StdMutex>, + state: StdMutex, + sender: StdMutex>>, + settings: StdMutex, } impl ElementSrcTest { fn try_push(&self, item: Item) -> Result<(), Item> { + let state = self.state.lock().unwrap(); + if *state == ElementSrcTestState::RejectItems { + gst_debug!( + SRC_CAT, + "ElementSrcTest rejecting item due to element state" + ); + + return Err(item); + } + match self.sender.lock().unwrap().as_mut() { Some(sender) => sender .try_send(item) @@ -196,8 +276,13 @@ impl ElementSrcTest { ) })?; + let (sender, receiver) = mpsc::channel(1); + *self.sender.lock().unwrap() = Some(sender); + + let src_pad_handler = PadSrcHandlerTest::new(receiver); + self.src_pad - .prepare(context, &self.src_pad_handler) + .prepare(context, &src_pad_handler) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -205,6 +290,8 @@ impl ElementSrcTest { ) })?; + *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + gst_debug!(SRC_CAT, obj: element, "Prepared"); Ok(()) @@ -214,6 +301,7 @@ impl ElementSrcTest { gst_debug!(SRC_CAT, obj: element, "Unpreparing"); self.src_pad.unprepare().unwrap(); + *self.src_pad_handler.lock().unwrap() = None; gst_debug!(SRC_CAT, obj: element, "Unprepared"); @@ -221,15 +309,15 @@ impl ElementSrcTest { } fn start(&self, element: &gst::Element) -> Result<(), ()> { - let mut sender = self.sender.lock().unwrap(); - if sender.is_some() { + let mut state = self.state.lock().unwrap(); + if *state == ElementSrcTestState::Started { gst_debug!(SRC_CAT, obj: element, "Already started"); return Err(()); } gst_debug!(SRC_CAT, obj: element, "Starting"); - self.start_unchecked(&mut sender); + self.start_unchecked(&mut state); gst_debug!(SRC_CAT, obj: element, "Started"); @@ -237,12 +325,8 @@ impl ElementSrcTest { } fn flush_stop(&self, element: &gst::Element) { - // Keep the lock on the `sender` until `flush_stop` is complete - // so as to prevent race conditions due to concurrent state transitions. - // Note that this won't deadlock as `sender` is not used - // within the `src_pad`'s `Task`. - let mut sender = self.sender.lock().unwrap(); - if sender.is_some() { + let mut state = self.state.lock().unwrap(); + if *state == ElementSrcTestState::Started { gst_debug!(SRC_CAT, obj: element, "Already started"); return; } @@ -252,30 +336,52 @@ impl ElementSrcTest { // Stop it so we wait for it to actually finish self.src_pad.stop_task(); + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .flush(&self.src_pad.as_ref()); + // And then start it again - self.start_unchecked(&mut sender); + self.start_unchecked(&mut state); gst_debug!(SRC_CAT, obj: element, "Stopped Flush"); } - fn start_unchecked(&self, sender: &mut Option>) { - // Start the task and set up the sender. We only accept - // data in Playing - let (sender_new, receiver) = mpsc::channel(1); - *sender = Some(sender_new); + fn start_unchecked(&self, state: &mut ElementSrcTestState) { self.src_pad_handler - .start_task(self.src_pad.as_ref(), receiver); + .lock() + .unwrap() + .as_ref() + .unwrap() + .start_task(self.src_pad.as_ref()); + + *state = ElementSrcTestState::Started; + } + + fn flush_start(&self, element: &gst::Element) { + // Keep the lock on the `state` until `flush_start` is complete + // so as to prevent race conditions due to concurrent state transitions. + let mut state = self.state.lock().unwrap(); + + gst_debug!(SRC_CAT, obj: element, "Starting Flush"); + + *state = ElementSrcTestState::RejectItems; + self.src_pad.cancel_task(); + + gst_debug!(SRC_CAT, obj: element, "Flush Started"); } fn pause(&self, element: &gst::Element) -> Result<(), ()> { - let mut sender = self.sender.lock().unwrap(); + // Lock the state to prevent race condition due to concurrent FlushStop + let mut state = self.state.lock().unwrap(); + gst_debug!(SRC_CAT, obj: element, "Pausing"); - // Cancel task, we only accept data in Playing - self.src_pad.cancel_task(); + self.src_pad.pause_task(); - // Prevent subsequent items from being enqueued - *sender = None; + *state = ElementSrcTestState::Paused; gst_debug!(SRC_CAT, obj: element, "Paused"); @@ -285,9 +391,14 @@ impl ElementSrcTest { fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(SRC_CAT, obj: element, "Stopping"); - // Now stop the task if it was still running, blocking - // until this has actually happened - self.src_pad.stop_task(); + *self.state.lock().unwrap() = ElementSrcTestState::RejectItems; + + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .stop(&self.src_pad.as_ref()); gst_debug!(SRC_CAT, obj: element, "Stopped"); @@ -334,9 +445,10 @@ impl ObjectSubclass for ElementSrcTest { ElementSrcTest { src_pad, - src_pad_handler: PadSrcHandlerTest::default(), - sender: Mutex::new(None), - settings: Mutex::new(settings), + src_pad_handler: StdMutex::new(None), + state: StdMutex::new(ElementSrcTestState::RejectItems), + sender: StdMutex::new(None), + settings: StdMutex::new(settings), } } } @@ -595,6 +707,24 @@ impl ElementSinkTest { } } +impl ElementSinkTest { + fn push_flush_start(&self, element: &gst::Element) { + gst_debug!(SINK_CAT, obj: element, "Pushing FlushStart"); + self.sink_pad + .gst_pad() + .push_event(gst::Event::new_flush_start().build()); + gst_debug!(SINK_CAT, obj: element, "FlushStart pushed"); + } + + fn push_flush_stop(&self, element: &gst::Element) { + gst_debug!(SINK_CAT, obj: element, "Pushing FlushStop"); + self.sink_pad + .gst_pad() + .push_event(gst::Event::new_flush_stop(true).build()); + gst_debug!(SINK_CAT, obj: element, "FlushStop pushed"); + } +} + lazy_static! { static ref SINK_CAT: gst::DebugCategory = gst::DebugCategory::new( "ts-element-sink-test", @@ -823,10 +953,10 @@ fn nominal_scenario( // Pause the Pad task pipeline.set_state(gst::State::Paused).unwrap(); - // Items not longer accepted + // Item accepted, but not processed before switching to Playing again elem_src_test - .try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))) - .unwrap_err(); + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7]))) + .unwrap(); // Nothing forwarded receiver.try_next().unwrap_err(); @@ -834,8 +964,13 @@ fn nominal_scenario( // Switch back the Pad task to Started pipeline.set_state(gst::State::Playing).unwrap(); - // Still nothing forwarded - receiver.try_next().unwrap_err(); + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![5, 6, 7].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } // Flush src_element.send_event(gst::Event::new_flush_start().build()); @@ -849,6 +984,33 @@ fn nominal_scenario( other => panic!("Unexpected item {:?}", other), } + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![8, 9]))) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![8, 9].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + // EOS elem_src_test .try_push(Item::Event(gst::Event::new_eos().build())) @@ -885,24 +1047,24 @@ fn src_sink_nominal() { nominal_scenario(&name, pipeline, src_element, receiver); } -// #[test] -// fn src_tsqueue_sink_nominal() { -// init(); -// -// let name = "src_tsqueue_sink"; -// -// let ts_queue = gst::ElementFactory::make("ts-queue", Some("ts-queue")).unwrap(); -// ts_queue -// .set_property("context", &format!("{}_queue", name)) -// .unwrap(); -// ts_queue -// .set_property("context-wait", &THROTTLING_DURATION) -// .unwrap(); -// -// let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None); -// -// nominal_scenario(&name, pipeline, src_element, receiver); -// } +#[test] +fn src_tsqueue_sink_nominal() { + init(); + + let name = "src_tsqueue_sink"; + + let ts_queue = gst::ElementFactory::make("ts-queue", Some("ts-queue")).unwrap(); + ts_queue + .set_property("context", &format!("{}_queue", name)) + .unwrap(); + ts_queue + .set_property("context-wait", &THROTTLING_DURATION) + .unwrap(); + + let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None); + + nominal_scenario(&name, pipeline, src_element, receiver); +} #[test] fn src_queue_sink_nominal() { @@ -916,30 +1078,309 @@ fn src_queue_sink_nominal() { nominal_scenario(&name, pipeline, src_element, receiver); } -// #[test] -// fn src_tsproxy_sink_nominal() { -// init(); -// -// let name = "src_tsproxy_sink"; -// -// let ts_proxy_sink = gst::ElementFactory::make("ts-proxysink", Some("ts-proxysink")).unwrap(); -// ts_proxy_sink -// .set_property("proxy-context", &format!("{}_proxy_context", name)) -// .unwrap(); -// -// let ts_proxy_src = gst::ElementFactory::make("ts-proxysrc", Some("ts-proxysrc")).unwrap(); -// ts_proxy_src -// .set_property("proxy-context", &format!("{}_proxy_context", name)) -// .unwrap(); -// ts_proxy_src -// .set_property("context", &format!("{}_context", name)) -// .unwrap(); -// ts_proxy_src -// .set_property("context-wait", &THROTTLING_DURATION) -// .unwrap(); -// -// let (pipeline, src_element, _sink_element, receiver) = -// setup(name, Some(ts_proxy_sink), Some(ts_proxy_src)); -// -// nominal_scenario(&name, pipeline, src_element, receiver); -// } +#[test] +fn src_tsproxy_sink_nominal() { + init(); + + let name = "src_tsproxy_sink"; + + let ts_proxy_sink = gst::ElementFactory::make("ts-proxysink", Some("ts-proxysink")).unwrap(); + ts_proxy_sink + .set_property("proxy-context", &format!("{}_proxy_context", name)) + .unwrap(); + + let ts_proxy_src = gst::ElementFactory::make("ts-proxysrc", Some("ts-proxysrc")).unwrap(); + ts_proxy_src + .set_property("proxy-context", &format!("{}_proxy_context", name)) + .unwrap(); + ts_proxy_src + .set_property("context", &format!("{}_context", name)) + .unwrap(); + ts_proxy_src + .set_property("context-wait", &THROTTLING_DURATION) + .unwrap(); + + let (pipeline, src_element, _sink_element, receiver) = + setup(name, Some(ts_proxy_sink), Some(ts_proxy_src)); + + nominal_scenario(&name, pipeline, src_element, receiver); +} + +#[test] +fn start_pause_start() { + init(); + + let scenario_name = "start_pause_start"; + + let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None); + + let elem_src_test = ElementSrcTest::from_instance(&src_element); + + pipeline.set_state(gst::State::Playing).unwrap(); + + // Initial events + elem_src_test + .try_push(Item::Event( + gst::Event::new_stream_start(scenario_name) + .group_id(gst::GroupId::next()) + .build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))) + .unwrap(); + + pipeline.set_state(gst::State::Paused).unwrap(); + + pipeline.set_state(gst::State::Playing).unwrap(); + + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7]))) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![5, 6, 7].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + + // Nothing else forwarded + receiver.try_next().unwrap_err(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn start_stop_start() { + init(); + + let scenario_name = "start_stop_start"; + + let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None); + + let elem_src_test = ElementSrcTest::from_instance(&src_element); + + pipeline.set_state(gst::State::Playing).unwrap(); + + // Initial events + elem_src_test + .try_push(Item::Event( + gst::Event::new_stream_start(&format!("{}-after_stop", scenario_name)) + .group_id(gst::GroupId::next()) + .build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))) + .unwrap(); + + pipeline.set_state(gst::State::Ready).unwrap(); + + pipeline.set_state(gst::State::Playing).unwrap(); + + // Initial events again + elem_src_test + .try_push(Item::Event( + gst::Event::new_stream_start(scenario_name) + .group_id(gst::GroupId::next()) + .build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(_buffer) => { + gst_info!( + SRC_CAT, + "{}: initial buffer went through, don't expect any pending item to be dropped", + scenario_name + ); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + } + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7]))) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![5, 6, 7].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn start_flush() { + init(); + + let scenario_name = "start_flush"; + + let (pipeline, src_element, sink_element, mut receiver) = setup(&scenario_name, None, None); + + let elem_src_test = ElementSrcTest::from_instance(&src_element); + + pipeline.set_state(gst::State::Playing).unwrap(); + + // Initial events + elem_src_test + .try_push(Item::Event( + gst::Event::new_stream_start(&format!("{}-after_stop", scenario_name)) + .group_id(gst::GroupId::next()) + .build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))) + .unwrap(); + + let elem_sink_test = ElementSinkTest::from_instance(&sink_element); + + elem_sink_test.push_flush_start(&sink_element); + + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7]))) + .unwrap_err(); + + elem_sink_test.push_flush_stop(&sink_element); + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Post flush buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![8, 9]))) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![8, 9].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + + pipeline.set_state(gst::State::Null).unwrap(); +} diff --git a/gst-plugin-threadshare/tests/pipeline.rs b/gst-plugin-threadshare/tests/pipeline.rs index 1105d55a..61a7851a 100644 --- a/gst-plugin-threadshare/tests/pipeline.rs +++ b/gst-plugin-threadshare/tests/pipeline.rs @@ -504,7 +504,7 @@ fn premature_shutdown() { appsink.set_property("sync", &false).unwrap(); appsink.set_property("async", &false).unwrap(); - let (sender, receiver) = mpsc::channel(); + let (appsink_sender, appsink_receiver) = mpsc::channel(); let appsink = appsink.dynamic_cast::().unwrap(); appsink.connect_new_sample(move |appsink| { @@ -517,13 +517,18 @@ fn premature_shutdown() { .unwrap() .unwrap(); - sender.send(()).unwrap(); + appsink_sender.send(()).unwrap(); Ok(gst::FlowSuccess::Ok) }); - fn push_buffer(src: &gst::Element) -> bool { - gst_debug!(CAT, obj: src, "premature_shutdown: pushing buffer"); + fn push_buffer(src: &gst::Element, intent: &str) -> bool { + gst_debug!( + CAT, + obj: src, + "premature_shutdown: pushing buffer {}", + intent + ); src.emit("push-buffer", &[&gst::Buffer::from_slice(vec![0; 1024])]) .unwrap() .unwrap() @@ -536,38 +541,41 @@ fn premature_shutdown() { let mut scenario = Some(move || { gst_debug!(CAT, "premature_shutdown: STEP 1: Playing"); // Initialize the dataflow - assert!(push_buffer(&src)); + assert!(push_buffer(&src, "(initial)")); // Wait for the buffer to reach AppSink - receiver.recv().unwrap(); - assert_eq!(receiver.try_recv().unwrap_err(), mpsc::TryRecvError::Empty); + appsink_receiver.recv().unwrap(); + assert_eq!( + appsink_receiver.try_recv().unwrap_err(), + mpsc::TryRecvError::Empty + ); - assert!(push_buffer(&src)); + assert!(push_buffer(&src, "before Playing -> Paused")); + gst_debug!(CAT, "premature_shutdown: STEP 2: Playing -> Paused"); pipeline_clone.set_state(gst::State::Paused).unwrap(); - // Paused -> can't push_buffer - assert!(!push_buffer(&src)); - - gst_debug!(CAT, "premature_shutdown: STEP 2: Paused -> Playing"); + gst_debug!(CAT, "premature_shutdown: STEP 3: Paused -> Playing"); pipeline_clone.set_state(gst::State::Playing).unwrap(); - gst_debug!(CAT, "premature_shutdown: STEP 3: Playing"); + gst_debug!(CAT, "premature_shutdown: Playing again"); - receiver.recv().unwrap(); + gst_debug!(CAT, "Waiting for buffer sent before Playing -> Paused"); + appsink_receiver.recv().unwrap(); - assert!(push_buffer(&src)); - receiver.recv().unwrap(); + assert!(push_buffer(&src, "after Paused -> Playing")); + gst_debug!(CAT, "Waiting for buffer sent after Paused -> Playing"); + appsink_receiver.recv().unwrap(); // Fill up the (dataqueue) and abruptly shutdown - assert!(push_buffer(&src)); - assert!(push_buffer(&src)); + assert!(push_buffer(&src, "filling 1")); + assert!(push_buffer(&src, "filling 2")); - gst_debug!(CAT, "premature_shutdown: STEP 4: Shutdown"); + gst_debug!(CAT, "premature_shutdown: STEP 4: Playing -> Null"); pipeline_clone.set_state(gst::State::Null).unwrap(); - assert!(!push_buffer(&src)); + assert!(!push_buffer(&src, "after Null")); l_clone.quit(); });