diff --git a/gst-plugin-threadshare/src/appsrc.rs b/gst-plugin-threadshare/src/appsrc.rs index eb4e251c..35ab8603 100644 --- a/gst-plugin-threadshare/src/appsrc.rs +++ b/gst-plugin-threadshare/src/appsrc.rs @@ -35,8 +35,8 @@ use lazy_static::lazy_static; use rand; use std::convert::TryInto; +use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::sync::{self, Arc}; use std::u32; use crate::runtime::prelude::*; @@ -138,48 +138,112 @@ enum StreamItem { #[derive(Debug)] struct AppSrcPadHandlerState { need_initial_events: bool, + need_segment: bool, caps: Option, - configured_caps: Option, } impl Default for AppSrcPadHandlerState { fn default() -> Self { AppSrcPadHandlerState { need_initial_events: true, + need_segment: true, caps: None, - configured_caps: None, } } } -#[derive(Debug, Default)] +#[derive(Debug)] struct AppSrcPadHandlerInner { - state: sync::RwLock, + state: FutMutex, + configured_caps: StdMutex>, + receiver: FutMutex>, } -#[derive(Clone, Debug, Default)] +impl AppSrcPadHandlerInner { + fn new(receiver: mpsc::Receiver, caps: Option) -> Self { + AppSrcPadHandlerInner { + state: FutMutex::new(AppSrcPadHandlerState { + caps, + ..Default::default() + }), + configured_caps: StdMutex::new(None), + receiver: FutMutex::new(receiver), + } + } +} + +#[derive(Clone, Debug)] struct AppSrcPadHandler(Arc); impl AppSrcPadHandler { - fn start_task( - &self, - pad: PadSrcRef<'_>, - element: &gst::Element, - receiver: mpsc::Receiver, - ) { + fn new(receiver: mpsc::Receiver, caps: Option) -> AppSrcPadHandler { + AppSrcPadHandler(Arc::new(AppSrcPadHandlerInner::new(receiver, caps))) + } + + fn reset(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler"); + + *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); + *self.0.configured_caps.lock().unwrap() = None; + + self.flush(pad); + + gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset"); + } + + fn flush(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + + // Purge the channel + let mut receiver = self + .0 + .receiver + .try_lock() + .expect("Channel receiver is locked elsewhere"); + loop { + match receiver.try_next() { + Ok(Some(_item)) => { + gst_log!(CAT, obj: pad.gst_pad(), "Dropping pending item"); + } + Err(_) => { + gst_log!(CAT, obj: pad.gst_pad(), "No more pending item"); + break; + } + Ok(None) => { + panic!("Channel sender dropped"); + } + } + } + + self.0 + .state + .try_lock() + .expect("state is locked elsewhere") + .need_segment = true; + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushed"); + } + + fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) { let this = self.clone(); let pad_weak = pad.downgrade(); let element = element.clone(); - let receiver = Arc::new(FutMutex::new(receiver)); + pad.start_task(move || { let this = this.clone(); let pad_weak = pad_weak.clone(); let element = element.clone(); - let receiver = Arc::clone(&receiver); async move { - let item = receiver.lock().await.next().await; + let item = this.0.receiver.lock().await.next().await; let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + let item = match item { Some(item) => item, None => { @@ -219,37 +283,31 @@ impl AppSrcPadHandler { } async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { - let mut events = Vec::new(); - - // Only `read` the state in the hot path - if self.0.state.read().unwrap().need_initial_events { - // We will need to `write` and we also want to prevent - // any changes on the state while we are handling initial events - let mut state = self.0.state.write().unwrap(); - assert!(state.need_initial_events); - + let mut state = self.0.state.lock().await; + if state.need_initial_events { gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - events.push( - gst::Event::new_stream_start(&stream_id) - .group_id(gst::GroupId::next()) - .build(), - ); + let stream_start_evt = gst::Event::new_stream_start(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + pad.push_event(stream_start_evt).await; if let Some(ref caps) = state.caps { - events.push(gst::Event::new_caps(&caps).build()); - state.configured_caps = Some(caps.clone()); + let caps_evt = gst::Event::new_caps(&caps).build(); + pad.push_event(caps_evt).await; + *self.0.configured_caps.lock().unwrap() = Some(caps.clone()); } - events.push( - gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), - ); state.need_initial_events = false; } - for event in events { - pad.push_event(event).await; + if state.need_segment { + let segment_evt = + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(); + pad.push_event(segment_evt).await; + + state.need_segment = false; } } @@ -259,6 +317,8 @@ impl AppSrcPadHandler { element: &gst::Element, item: StreamItem, ) -> Result { + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", item); + self.push_prelude(pad, element).await; match item { @@ -291,13 +351,11 @@ impl PadSrcHandler for AppSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - let _ = appsrc.pause(element); - + appsrc.flush_start(element); true } EventView::FlushStop(..) => { appsrc.flush_stop(element); - true } EventView::Reconfigure(..) => true, @@ -335,8 +393,7 @@ impl PadSrcHandler for AppSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let state = self.0.state.read().unwrap(); - let caps = if let Some(ref caps) = state.configured_caps { + let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -362,20 +419,29 @@ impl PadSrcHandler for AppSrcPadHandler { } } +#[derive(Debug, Eq, PartialEq)] +enum AppSrcState { + Paused, + RejectBuffers, + Started, +} + +#[derive(Debug)] struct AppSrc { src_pad: PadSrc, - src_pad_handler: AppSrcPadHandler, + src_pad_handler: StdMutex>, + state: StdMutex, sender: StdMutex>>, settings: StdMutex, } impl AppSrc { fn push_buffer(&self, element: &gst::Element, mut buffer: gst::Buffer) -> bool { - let mut sender = self.sender.lock().unwrap(); - let sender = match sender.as_mut() { - Some(sender) => sender, - None => return false, - }; + let state = self.state.lock().unwrap(); + if *state == AppSrcState::RejectBuffers { + gst_debug!(CAT, obj: element, "Rejecting buffer due to element state"); + return false; + } let do_timestamp = self.settings.lock().unwrap().do_timestamp; if do_timestamp { @@ -392,7 +458,14 @@ impl AppSrc { } } - match sender.try_send(StreamItem::Buffer(buffer)) { + match self + .sender + .lock() + .unwrap() + .as_mut() + .unwrap() + .try_send(StreamItem::Buffer(buffer)) + { Ok(_) => true, Err(err) => { gst_error!(CAT, obj: element, "Failed to queue buffer: {}", err); @@ -407,6 +480,7 @@ impl AppSrc { Some(sender) => sender, None => return false, }; + let eos = StreamItem::Event(gst::Event::new_eos().build()); match sender.try_send(eos) { Ok(_) => true, @@ -421,8 +495,6 @@ impl AppSrc { let settings = self.settings.lock().unwrap(); gst_debug!(CAT, obj: element, "Preparing"); - self.src_pad_handler.0.state.write().unwrap().caps = settings.caps.clone(); - let context = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst_error_msg!( @@ -431,8 +503,14 @@ impl AppSrc { ) })?; + let max_buffers = settings.max_buffers.try_into().unwrap(); + let (sender, receiver) = mpsc::channel(max_buffers); + *self.sender.lock().unwrap() = Some(sender); + + let src_pad_handler = AppSrcPadHandler::new(receiver, settings.caps.clone()); + self.src_pad - .prepare(context, &self.src_pad_handler) + .prepare(context, &src_pad_handler) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -440,6 +518,8 @@ impl AppSrc { ) })?; + *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + gst_debug!(CAT, obj: element, "Prepared"); Ok(()) @@ -449,8 +529,9 @@ impl AppSrc { gst_debug!(CAT, obj: element, "Unpreparing"); let _ = self.src_pad.unprepare(); + *self.src_pad_handler.lock().unwrap() = None; - *self.src_pad_handler.0.state.write().unwrap() = Default::default(); + *self.sender.lock().unwrap() = None; gst_debug!(CAT, obj: element, "Unprepared"); @@ -460,16 +541,18 @@ impl AppSrc { fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Stopping"); + *self.state.lock().unwrap() = AppSrcState::RejectBuffers; + // Now stop the task if it was still running, blocking // until this has actually happened self.src_pad.stop_task(); self.src_pad_handler - .0 - .state - .write() + .lock() .unwrap() - .need_initial_events = true; + .as_ref() + .unwrap() + .reset(&self.src_pad.as_ref()); gst_debug!(CAT, obj: element, "Stopped"); @@ -477,15 +560,15 @@ impl AppSrc { } fn start(&self, element: &gst::Element) -> Result<(), ()> { - let mut sender = self.sender.lock().unwrap(); - if sender.is_some() { + let mut state = self.state.lock().unwrap(); + if *state == AppSrcState::Started { gst_debug!(CAT, obj: element, "Already started"); return Ok(()); } gst_debug!(CAT, obj: element, "Starting"); - self.start_unchecked(element, &mut sender); + self.start_unchecked(element, &mut state); gst_debug!(CAT, obj: element, "Started"); @@ -493,12 +576,10 @@ impl AppSrc { } fn flush_stop(&self, element: &gst::Element) { - // Keep the lock on the `sender` until `flush_stop` is complete + // Keep the lock on the `state` until `flush_stop` is complete // so as to prevent race conditions due to concurrent state transitions. - // Note that this won't deadlock as `sender` is not used - // within the `src_pad`'s `Task`. - let mut sender = self.sender.lock().unwrap(); - if sender.is_some() { + let mut state = self.state.lock().unwrap(); + if *state == AppSrcState::Started { gst_debug!(CAT, obj: element, "Already started"); return; } @@ -506,38 +587,52 @@ impl AppSrc { gst_debug!(CAT, obj: element, "Stopping Flush"); self.src_pad.stop_task(); - self.start_unchecked(element, &mut sender); - - gst_debug!(CAT, obj: element, "Stopped Flush"); - } - - fn start_unchecked( - &self, - element: &gst::Element, - sender: &mut Option>, - ) { - let max_buffers = self - .settings - .lock() - .unwrap() - .max_buffers - .try_into() - .unwrap(); - let (new_sender, receiver) = mpsc::channel(max_buffers); - *sender = Some(new_sender); self.src_pad_handler - .start_task(self.src_pad.as_ref(), element, receiver); + .lock() + .unwrap() + .as_ref() + .unwrap() + .flush(&self.src_pad.as_ref()); + + self.start_unchecked(element, &mut state); + + gst_debug!(CAT, obj: element, "Flush Stopped"); + } + + fn start_unchecked(&self, element: &gst::Element, state: &mut AppSrcState) { + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .start_task(self.src_pad.as_ref(), element); + + *state = AppSrcState::Started; + } + + fn flush_start(&self, element: &gst::Element) { + // Keep the lock on the `state` until `flush_start` is complete + // so as to prevent race conditions due to concurrent state transitions. + let mut state = self.state.lock().unwrap(); + + gst_debug!(CAT, obj: element, "Starting Flush"); + + *state = AppSrcState::RejectBuffers; + self.src_pad.cancel_task(); + + gst_debug!(CAT, obj: element, "Flush Started"); } fn pause(&self, element: &gst::Element) -> Result<(), ()> { - let mut sender = self.sender.lock().unwrap(); + // Lock the state to prevent race condition due to concurrent FlushStop + let mut state = self.state.lock().unwrap(); + gst_debug!(CAT, obj: element, "Pausing"); - self.src_pad.cancel_task(); + self.src_pad.pause_task(); - // Prevent subsequent items from being enqueued - *sender = None; + *state = AppSrcState::Paused; gst_debug!(CAT, obj: element, "Paused"); @@ -616,7 +711,8 @@ impl ObjectSubclass for AppSrc { Self { src_pad, - src_pad_handler: AppSrcPadHandler::default(), + src_pad_handler: StdMutex::new(None), + state: StdMutex::new(AppSrcState::RejectBuffers), sender: StdMutex::new(None), settings: StdMutex::new(Settings::default()), } diff --git a/gst-plugin-threadshare/src/dataqueue.rs b/gst-plugin-threadshare/src/dataqueue.rs index 8434dbd9..9f77d966 100644 --- a/gst-plugin-threadshare/src/dataqueue.rs +++ b/gst-plugin-threadshare/src/dataqueue.rs @@ -185,7 +185,7 @@ impl DataQueue { pub fn push(&self, item: DataQueueItem) -> Result<(), DataQueueItem> { let mut inner = self.0.lock().unwrap(); - if inner.state != DataQueueState::Started { + if inner.state == DataQueueState::Stopped { gst_debug!( DATA_QUEUE_CAT, obj: &inner.element, diff --git a/gst-plugin-threadshare/src/proxy.rs b/gst-plugin-threadshare/src/proxy.rs index 93a525d7..45d7b05e 100644 --- a/gst-plugin-threadshare/src/proxy.rs +++ b/gst-plugin-threadshare/src/proxy.rs @@ -838,7 +838,7 @@ impl ProxySrcPadHandler { let item = match item { Some(item) => item, None => { - gst_log!(SRC_CAT, obj: pad.gst_pad(), "DataQueue Stopped"); + gst_log!(SRC_CAT, obj: pad.gst_pad(), "DataQueue Stopped or Paused"); return glib::Continue(false); } }; @@ -1109,6 +1109,7 @@ impl ProxySrc { self.src_pad.stop_task(); let dataqueue = dataqueue.as_ref().unwrap(); + dataqueue.clear(); dataqueue.stop(); gst_debug!(SRC_CAT, obj: element, "Stopped"); @@ -1171,11 +1172,9 @@ impl ProxySrc { let dataqueue = self.dataqueue.lock().unwrap(); gst_debug!(SRC_CAT, obj: element, "Pausing"); - self.src_pad.cancel_task(); + dataqueue.as_ref().unwrap().pause(); - let dataqueue = dataqueue.as_ref().unwrap(); - dataqueue.pause(); - dataqueue.clear(); + self.src_pad.pause_task(); gst_debug!(SRC_CAT, obj: element, "Paused"); diff --git a/gst-plugin-threadshare/src/runtime/executor.rs b/gst-plugin-threadshare/src/runtime/executor.rs index 2310485e..9ee0ce61 100644 --- a/gst-plugin-threadshare/src/runtime/executor.rs +++ b/gst-plugin-threadshare/src/runtime/executor.rs @@ -157,6 +157,12 @@ pub fn block_on(future: Fut) -> Fut::Output { }) } +/// Yields execution back to the runtime +#[inline] +pub async fn yield_now() { + tokio::task::yield_now().await; +} + struct ContextThread { name: String, } diff --git a/gst-plugin-threadshare/src/runtime/pad.rs b/gst-plugin-threadshare/src/runtime/pad.rs index 63ac597b..20216a5b 100644 --- a/gst-plugin-threadshare/src/runtime/pad.rs +++ b/gst-plugin-threadshare/src/runtime/pad.rs @@ -343,6 +343,11 @@ impl<'a> PadSrcRef<'a> { self.strong.start_task(func); } + /// Pauses the `Started` `Pad` `Task`. + pub fn pause_task(&self) { + self.strong.pause_task(); + } + /// Cancels the `Started` `Pad` `Task`. pub fn cancel_task(&self) { self.strong.cancel_task(); @@ -473,6 +478,11 @@ impl PadSrcStrong { self.0.task.start(func); } + #[inline] + fn pause_task(&self) { + self.0.task.pause(); + } + #[inline] fn cancel_task(&self) { self.0.task.cancel(); @@ -722,6 +732,10 @@ impl PadSrc { self.0.start_task(func); } + pub fn pause_task(&self) { + self.0.pause_task(); + } + pub fn cancel_task(&self) { self.0.cancel_task(); } diff --git a/gst-plugin-threadshare/src/runtime/task.rs b/gst-plugin-threadshare/src/runtime/task.rs index ad4dd0a1..62fe7ba7 100644 --- a/gst-plugin-threadshare/src/runtime/task.rs +++ b/gst-plugin-threadshare/src/runtime/task.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019 François Laignel +// Copyright (C) 2019-2020 François Laignel // Copyright (C) 2020 Sebastian Dröge // // This library is free software; you can redistribute it and/or @@ -18,18 +18,29 @@ //! An execution loop to run asynchronous processing. +use futures::channel::oneshot; use futures::future::{abortable, AbortHandle, Aborted}; use futures::prelude::*; -use gst::TaskState; -use gst::{gst_debug, gst_log, gst_trace, gst_warning}; +use gst::{gst_debug, gst_error, gst_log, gst_trace, gst_warning}; use std::fmt; use std::sync::{Arc, Mutex}; -use super::executor::block_on; +use super::executor::{block_on, yield_now}; use super::{Context, JoinHandle, RUNTIME_CAT}; +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] +pub enum TaskState { + Cancelled, + Started, + Stopped, + Paused, + Pausing, + Preparing, + Unprepared, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub enum TaskError { ActiveTask, @@ -53,27 +64,27 @@ struct TaskInner { prepare_abort_handle: Option, abort_handle: Option, loop_handle: Option>>, + resume_sender: Option>, } impl Default for TaskInner { fn default() -> Self { TaskInner { context: None, - state: TaskState::Stopped, + state: TaskState::Unprepared, prepare_handle: None, prepare_abort_handle: None, abort_handle: None, loop_handle: None, + resume_sender: None, } } } impl Drop for TaskInner { fn drop(&mut self) { - // Check invariant which can't be held automatically in `Task` - // because `drop` can't be `async` - if self.state != TaskState::Stopped { - panic!("Missing call to `Task::stop`"); + if self.state != TaskState::Unprepared { + panic!("Missing call to `Task::unprepared`"); } } } @@ -103,7 +114,7 @@ impl Task { gst_debug!(RUNTIME_CAT, "Preparing task"); let mut inner = self.0.lock().unwrap(); - if inner.state != TaskState::Stopped { + if inner.state != TaskState::Unprepared { return Err(TaskError::ActiveTask); } @@ -138,6 +149,7 @@ impl Task { inner.context = Some(context); + inner.state = TaskState::Preparing; gst_debug!(RUNTIME_CAT, "Task prepared"); Ok(()) @@ -147,7 +159,7 @@ impl Task { gst_debug!(RUNTIME_CAT, "Preparing task"); let mut inner = self.0.lock().unwrap(); - if inner.state != TaskState::Stopped { + if inner.state != TaskState::Unprepared { return Err(TaskError::ActiveTask); } @@ -156,19 +168,25 @@ impl Task { inner.context = Some(context); + inner.state = TaskState::Stopped; gst_debug!(RUNTIME_CAT, "Task prepared"); Ok(()) } pub fn unprepare(&self) -> Result<(), TaskError> { - gst_debug!(RUNTIME_CAT, "Unpreparing task"); - let mut inner = self.0.lock().unwrap(); if inner.state != TaskState::Stopped { + gst_error!( + RUNTIME_CAT, + "Attempt to Unprepare a task in state {:?}", + inner.state + ); return Err(TaskError::ActiveTask); } + gst_debug!(RUNTIME_CAT, "Unpreparing task"); + // Abort any pending preparation if let Some(abort_handle) = inner.prepare_abort_handle.take() { abort_handle.abort(); @@ -176,6 +194,9 @@ impl Task { let prepare_handle = inner.prepare_handle.take(); let context = inner.context.take().unwrap(); + + inner.state = TaskState::Unprepared; + drop(inner); if let Some(prepare_handle) = prepare_handle { @@ -251,8 +272,27 @@ impl Task { gst_log!(RUNTIME_CAT, "Task already Started"); return; } - TaskState::Paused | TaskState::Stopped => (), - other => unreachable!("Unexpected Task state {:?}", other), + TaskState::Pausing => { + gst_debug!(RUNTIME_CAT, "Re-starting a Pausing task"); + + assert!(inner.resume_sender.is_none()); + + inner.state = TaskState::Started; + return; + } + TaskState::Paused => { + inner + .resume_sender + .take() + .expect("Task Paused but the resume_sender is already taken") + .send(()) + .expect("Task Paused but the resume_receiver was dropped"); + + gst_log!(RUNTIME_CAT, "Resume requested"); + return; + } + TaskState::Stopped | TaskState::Cancelled | TaskState::Preparing => (), + TaskState::Unprepared => panic!("Attempt to start an unprepared Task"), } gst_debug!(RUNTIME_CAT, "Starting Task"); @@ -280,21 +320,50 @@ impl Task { let res = prepare_handle.await; if res.is_err() { gst_warning!(RUNTIME_CAT, "Preparing failed"); + inner_clone.lock().unwrap().state = TaskState::Unprepared; + return; } + + inner_clone.lock().unwrap().state = TaskState::Stopped; } gst_trace!(RUNTIME_CAT, "Starting task loop"); // Then loop as long as we're actually running loop { - match inner_clone.lock().unwrap().state { - TaskState::Started => (), - TaskState::Paused | TaskState::Stopped => { - gst_trace!(RUNTIME_CAT, "Stopping task loop"); - break; + let mut resume_receiver = { + let mut inner = inner_clone.lock().unwrap(); + match inner.state { + TaskState::Started => None, + TaskState::Pausing => { + let (sender, receiver) = oneshot::channel(); + inner.resume_sender = Some(sender); + + inner.state = TaskState::Paused; + + Some(receiver) + } + TaskState::Stopped | TaskState::Cancelled => { + gst_trace!(RUNTIME_CAT, "Stopping task loop"); + break; + } + TaskState::Paused => { + unreachable!("The Paused state is controlled by the loop"); + } + other => { + unreachable!("Task loop iteration in state {:?}", other); + } } - other => unreachable!("Unexpected Task state {:?}", other), + }; + + if let Some(resume_receiver) = resume_receiver.take() { + gst_trace!(RUNTIME_CAT, "Task loop paused"); + + let _ = resume_receiver.await; + + gst_trace!(RUNTIME_CAT, "Resuming task loop"); + inner_clone.lock().unwrap().state = TaskState::Started; } if func().await == glib::Continue(false) { @@ -309,12 +378,15 @@ impl Task { .map(|h| h.task_id() == task_id) .unwrap_or(false) { - gst_trace!(RUNTIME_CAT, "Pausing task loop"); - inner.state = TaskState::Paused; + gst_trace!(RUNTIME_CAT, "Exiting task loop"); + inner.state = TaskState::Cancelled; } break; } + + // Make sure the loop can be aborted even if `func` never goes `Pending`. + yield_now().await; } // Once the loop function is finished we can forget the corresponding @@ -332,6 +404,7 @@ impl Task { { inner.abort_handle = None; inner.loop_handle = None; + inner.state = TaskState::Stopped; } } @@ -351,11 +424,29 @@ impl Task { gst_debug!(RUNTIME_CAT, "Task Started"); } + /// Requests the `Task` loop to pause. + /// + /// If an iteration is in progress, it will run to completion, + /// then no more iteration will be executed before `start` is called again. + pub fn pause(&self) { + let mut inner = self.0.lock().unwrap(); + if inner.state != TaskState::Started { + gst_log!(RUNTIME_CAT, "Task not started"); + return; + } + + inner.state = TaskState::Pausing; + gst_debug!(RUNTIME_CAT, "Pause requested"); + } + /// Cancels the `Task` so that it stops running as soon as possible. pub fn cancel(&self) { let mut inner = self.0.lock().unwrap(); - if inner.state != TaskState::Started { - gst_log!(RUNTIME_CAT, "Task already paused or stopped"); + if inner.state != TaskState::Started + && inner.state != TaskState::Paused + && inner.state != TaskState::Pausing + { + gst_log!(RUNTIME_CAT, "Task not Started nor Paused"); return; } @@ -366,14 +457,16 @@ impl Task { abort_handle.abort(); } - inner.state = TaskState::Paused; + inner.resume_sender = None; + + inner.state = TaskState::Cancelled; } /// Stops the `Started` `Task` and wait for it to finish. pub fn stop(&self) { let mut inner = self.0.lock().unwrap(); - if inner.state == TaskState::Stopped { - gst_log!(RUNTIME_CAT, "Task already stopped"); + if inner.state == TaskState::Stopped || inner.state == TaskState::Preparing { + gst_log!(RUNTIME_CAT, "Task loop already stopped"); return; } @@ -388,6 +481,9 @@ impl Task { // And now wait for it to actually stop let loop_handle = inner.loop_handle.take(); + + inner.resume_sender = None; + let context = inner.context.as_ref().unwrap().clone(); drop(inner); @@ -550,4 +646,74 @@ mod tests { task.unprepare().unwrap(); gst_debug!(RUNTIME_CAT, "task test: unprepared"); } + + #[tokio::test] + async fn pause_start() { + use gst::gst_error; + + gst::init().unwrap(); + + let context = Context::acquire("task_pause_start", 2).unwrap(); + + let task = Task::default(); + task.prepare(context).unwrap(); + + let (iter_sender, mut iter_receiver) = mpsc::channel(0); + let iter_sender = Arc::new(Mutex::new(iter_sender)); + + let (mut complete_sender, complete_receiver) = mpsc::channel(0); + let complete_receiver = Arc::new(Mutex::new(complete_receiver)); + + gst_debug!(RUNTIME_CAT, "task_pause_start: starting"); + task.start(move || { + let iter_sender = Arc::clone(&iter_sender); + let complete_receiver = Arc::clone(&complete_receiver); + async move { + gst_debug!(RUNTIME_CAT, "task_pause_start: entering iteration"); + iter_sender.lock().await.send(()).await.unwrap(); + + gst_debug!( + RUNTIME_CAT, + "task_pause_start: iteration awaiting completion" + ); + complete_receiver.lock().await.next().await.unwrap(); + gst_debug!(RUNTIME_CAT, "task_pause_start: iteration complete"); + glib::Continue(true) + } + }); + + gst_debug!(RUNTIME_CAT, "task_pause_start: awaiting 1st iteration"); + iter_receiver.next().await.unwrap(); + + task.pause(); + + gst_debug!( + RUNTIME_CAT, + "task_pause_start: sending 1st iteration completion" + ); + complete_sender.send(()).await.unwrap(); + + // Loop held on + iter_receiver.try_next().unwrap_err(); + + task.start(|| { + gst_error!( + RUNTIME_CAT, + "task_pause_start: reached start to resume closure" + ); + future::pending() + }); + + gst_debug!(RUNTIME_CAT, "task_pause_start: awaiting 2d iteration"); + iter_receiver.next().await.unwrap(); + + gst_debug!( + RUNTIME_CAT, + "task_pause_start: sending 2d iteration completion" + ); + complete_sender.send(()).await.unwrap(); + + task.stop(); + task.unprepare().unwrap(); + } } diff --git a/gst-plugin-threadshare/src/socket.rs b/gst-plugin-threadshare/src/socket.rs index 54bb80b8..790e977b 100644 --- a/gst-plugin-threadshare/src/socket.rs +++ b/gst-plugin-threadshare/src/socket.rs @@ -246,7 +246,7 @@ impl SocketStream { let (read_fut, clock, base_time) = { let mut inner = self.socket.0.lock().unwrap(); if inner.state != SocketState::Started { - gst_debug!(SOCKET_CAT, obj: &inner.element, "DataQueue is not Started"); + gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket is not Started"); return None; } diff --git a/gst-plugin-threadshare/src/tcpclientsrc.rs b/gst-plugin-threadshare/src/tcpclientsrc.rs index ce68ce37..dfda2438 100644 --- a/gst-plugin-threadshare/src/tcpclientsrc.rs +++ b/gst-plugin-threadshare/src/tcpclientsrc.rs @@ -37,8 +37,8 @@ use rand; use std::io; use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::sync::{self, Arc}; use std::u16; use tokio::io::AsyncReadExt; @@ -177,29 +177,73 @@ impl SocketRead for TcpClientReader { #[derive(Debug)] struct TcpClientSrcPadHandlerState { need_initial_events: bool, + need_segment: bool, caps: Option, - configured_caps: Option, } impl Default for TcpClientSrcPadHandlerState { fn default() -> Self { TcpClientSrcPadHandlerState { need_initial_events: true, + need_segment: true, caps: None, - configured_caps: None, } } } -#[derive(Debug, Default)] +#[derive(Debug)] struct TcpClientSrcPadHandlerInner { - state: sync::RwLock, + state: FutMutex, + configured_caps: StdMutex>, } -#[derive(Clone, Debug, Default)] +impl TcpClientSrcPadHandlerInner { + fn new(caps: Option) -> Self { + TcpClientSrcPadHandlerInner { + state: FutMutex::new(TcpClientSrcPadHandlerState { + caps, + ..Default::default() + }), + configured_caps: StdMutex::new(None), + } + } +} + +#[derive(Clone, Debug)] struct TcpClientSrcPadHandler(Arc); impl TcpClientSrcPadHandler { + fn new(caps: Option) -> Self { + TcpClientSrcPadHandler(Arc::new(TcpClientSrcPadHandlerInner::new(caps))) + } + + fn reset(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler"); + + *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); + *self.0.configured_caps.lock().unwrap() = None; + + gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset"); + } + + fn flush(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + + self.0 + .state + .try_lock() + .expect("state is locked elsewhere") + .need_segment = true; + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushed"); + } + fn start_task( &self, pad: PadSrcRef<'_>, @@ -284,37 +328,31 @@ impl TcpClientSrcPadHandler { } async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { - let mut events = Vec::new(); - - // Only `read` the state in the hot path - if self.0.state.read().unwrap().need_initial_events { - // We will need to `write` and we also want to prevent - // any changes on the state while we are handling initial events - let mut state = self.0.state.write().unwrap(); - assert!(state.need_initial_events); - + let mut state = self.0.state.lock().await; + if state.need_initial_events { gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - events.push( - gst::Event::new_stream_start(&stream_id) - .group_id(gst::GroupId::next()) - .build(), - ); + let stream_start_evt = gst::Event::new_stream_start(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + pad.push_event(stream_start_evt).await; if let Some(ref caps) = state.caps { - events.push(gst::Event::new_caps(&caps).build()); - state.configured_caps = Some(caps.clone()); + let caps_evt = gst::Event::new_caps(&caps).build(); + pad.push_event(caps_evt).await; + *self.0.configured_caps.lock().unwrap() = Some(caps.clone()); } - events.push( - gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), - ); state.need_initial_events = false; } - for event in events { - pad.push_event(event).await; + if state.need_segment { + let segment_evt = + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(); + pad.push_event(segment_evt).await; + + state.need_segment = false; } } @@ -324,6 +362,8 @@ impl TcpClientSrcPadHandler { element: &gst::Element, buffer: gst::Buffer, ) -> Result { + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); + self.push_prelude(pad, element).await; if buffer.get_size() == 0 { @@ -352,7 +392,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - tcpclientsrc.pause(element).unwrap(); + tcpclientsrc.flush_start(element); true } @@ -396,8 +436,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let state = self.0.state.read().unwrap(); - let caps = if let Some(ref caps) = state.configured_caps { + let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -426,7 +465,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { struct TcpClientSrc { src_pad: PadSrc, - src_pad_handler: TcpClientSrcPadHandler, + src_pad_handler: StdMutex>, socket: StdMutex>>, settings: StdMutex, } @@ -499,16 +538,13 @@ impl TcpClientSrc { ) })?; - { - let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap(); - src_pad_handler_state.caps = settings.caps; - } - *socket_storage = Some(socket); drop(socket_storage); + let src_pad_handler = TcpClientSrcPadHandler::new(settings.caps); + self.src_pad - .prepare(context, &self.src_pad_handler) + .prepare(context, &src_pad_handler) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -516,6 +552,8 @@ impl TcpClientSrc { ) })?; + *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + gst_debug!(CAT, obj: element, "Prepared"); Ok(()) @@ -529,8 +567,7 @@ impl TcpClientSrc { } let _ = self.src_pad.unprepare(); - - *self.src_pad_handler.0.state.write().unwrap() = Default::default(); + *self.src_pad_handler.lock().unwrap() = None; gst_debug!(CAT, obj: element, "Unprepared"); @@ -545,11 +582,11 @@ impl TcpClientSrc { self.src_pad.stop_task(); self.src_pad_handler - .0 - .state - .write() + .lock() .unwrap() - .need_initial_events = true; + .as_ref() + .unwrap() + .reset(&self.src_pad.as_ref()); gst_debug!(CAT, obj: element, "Stopped"); @@ -591,6 +628,14 @@ impl TcpClientSrc { gst_debug!(CAT, obj: element, "Stopping Flush"); self.src_pad.stop_task(); + + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .flush(&self.src_pad.as_ref()); + self.start_unchecked(element, socket); gst_debug!(CAT, obj: element, "Stopped Flush"); @@ -602,9 +647,26 @@ impl TcpClientSrc { .unwrap(); self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() .start_task(self.src_pad.as_ref(), element, socket_stream); } + fn flush_start(&self, element: &gst::Element) { + let socket = self.socket.lock().unwrap(); + gst_debug!(CAT, obj: element, "Starting Flush"); + + if let Some(socket) = socket.as_ref() { + socket.pause(); + } + + self.src_pad.cancel_task(); + + gst_debug!(CAT, obj: element, "Flush Started"); + } + fn pause(&self, element: &gst::Element) -> Result<(), ()> { let socket = self.socket.lock().unwrap(); gst_debug!(CAT, obj: element, "Pausing"); @@ -613,7 +675,7 @@ impl TcpClientSrc { socket.pause(); } - self.src_pad.cancel_task(); + self.src_pad.pause_task(); gst_debug!(CAT, obj: element, "Paused"); @@ -656,7 +718,7 @@ impl ObjectSubclass for TcpClientSrc { Self { src_pad, - src_pad_handler: TcpClientSrcPadHandler::default(), + src_pad_handler: StdMutex::new(None), socket: StdMutex::new(None), settings: StdMutex::new(Settings::default()), } diff --git a/gst-plugin-threadshare/src/udpsink.rs b/gst-plugin-threadshare/src/udpsink.rs index 785ef2cf..bb96ce68 100644 --- a/gst-plugin-threadshare/src/udpsink.rs +++ b/gst-plugin-threadshare/src/udpsink.rs @@ -649,6 +649,12 @@ impl UdpSinkPadHandler { } } + fn unprepare(&self) { + if let Some(task) = &self.0.read().unwrap().task { + task.unprepare().unwrap(); + } + } + fn stop_task(&self) { if let Some(task) = &self.0.read().unwrap().task { task.stop(); @@ -959,6 +965,7 @@ impl UdpSink { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Unpreparing"); + self.sink_pad_handler.unprepare(); self.sink_pad.unprepare(); gst_debug!(CAT, obj: element, "Unprepared"); diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index a241deae..6fa58357 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -39,8 +39,8 @@ use rand; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::sync::{self, Arc}; use std::u16; use crate::runtime::prelude::*; @@ -230,8 +230,8 @@ impl SocketRead for UdpReader { struct UdpSrcPadHandlerState { retrieve_sender_address: bool, need_initial_events: bool, + need_segment: bool, caps: Option, - configured_caps: Option, } impl Default for UdpSrcPadHandlerState { @@ -239,21 +239,69 @@ impl Default for UdpSrcPadHandlerState { UdpSrcPadHandlerState { retrieve_sender_address: true, need_initial_events: true, + need_segment: true, caps: None, - configured_caps: None, } } } -#[derive(Debug, Default)] +#[derive(Debug)] struct UdpSrcPadHandlerInner { - state: sync::RwLock, + state: FutMutex, + configured_caps: StdMutex>, } -#[derive(Clone, Debug, Default)] +impl UdpSrcPadHandlerInner { + fn new(caps: Option, retrieve_sender_address: bool) -> Self { + UdpSrcPadHandlerInner { + state: FutMutex::new(UdpSrcPadHandlerState { + retrieve_sender_address, + caps, + ..Default::default() + }), + configured_caps: StdMutex::new(None), + } + } +} + +#[derive(Clone, Debug)] struct UdpSrcPadHandler(Arc); impl UdpSrcPadHandler { + fn new(caps: Option, retrieve_sender_address: bool) -> UdpSrcPadHandler { + UdpSrcPadHandler(Arc::new(UdpSrcPadHandlerInner::new( + caps, + retrieve_sender_address, + ))) + } + + fn reset(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Resetting handler"); + + *self.0.state.try_lock().expect("State locked elsewhere") = Default::default(); + *self.0.configured_caps.lock().unwrap() = None; + + gst_debug!(CAT, obj: pad.gst_pad(), "Handler reset"); + } + + fn flush(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + + self.0 + .state + .try_lock() + .expect("state is locked elsewhere") + .need_segment = true; + + gst_debug!(CAT, obj: pad.gst_pad(), "Flushed"); + } + fn start_task( &self, pad: PadSrcRef<'_>, @@ -306,7 +354,7 @@ impl UdpSrcPadHandler { }; if let Some(saddr) = saddr { - if this.0.state.read().unwrap().retrieve_sender_address { + if this.0.state.lock().await.retrieve_sender_address { let inet_addr = match saddr.ip() { IpAddr::V4(ip) => gio::InetAddress::new_from_bytes( gio::InetAddressBytes::V4(&ip.octets()), @@ -354,37 +402,31 @@ impl UdpSrcPadHandler { } async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { - let mut events = Vec::new(); - - // Only `read` the state in the hot path - if self.0.state.read().unwrap().need_initial_events { - // We will need to `write` and we also want to prevent - // any changes on the state while we are handling initial events - let mut state = self.0.state.write().unwrap(); - assert!(state.need_initial_events); - + let mut state = self.0.state.lock().await; + if state.need_initial_events { gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - events.push( - gst::Event::new_stream_start(&stream_id) - .group_id(gst::GroupId::next()) - .build(), - ); + let stream_start_evt = gst::Event::new_stream_start(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + pad.push_event(stream_start_evt).await; if let Some(ref caps) = state.caps { - events.push(gst::Event::new_caps(&caps).build()); - state.configured_caps = Some(caps.clone()); + let caps_evt = gst::Event::new_caps(&caps).build(); + pad.push_event(caps_evt).await; + *self.0.configured_caps.lock().unwrap() = Some(caps.clone()); } - events.push( - gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), - ); state.need_initial_events = false; } - for event in events { - pad.push_event(event).await; + if state.need_segment { + let segment_evt = + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(); + pad.push_event(segment_evt).await; + + state.need_segment = false; } } @@ -394,6 +436,8 @@ impl UdpSrcPadHandler { element: &gst::Element, buffer: gst::Buffer, ) -> Result { + gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); + self.push_prelude(pad, element).await; pad.push(buffer).await @@ -416,7 +460,7 @@ impl PadSrcHandler for UdpSrcPadHandler { let ret = match event.view() { EventView::FlushStart(..) => { - udpsrc.pause(element).unwrap(); + udpsrc.flush_start(element); true } @@ -461,8 +505,7 @@ impl PadSrcHandler for UdpSrcPadHandler { true } QueryView::Caps(ref mut q) => { - let state = self.0.state.read().unwrap(); - let caps = if let Some(ref caps) = state.configured_caps { + let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { q.get_filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -491,7 +534,7 @@ impl PadSrcHandler for UdpSrcPadHandler { struct UdpSrc { src_pad: PadSrc, - src_pad_handler: UdpSrcPadHandler, + src_pad_handler: StdMutex>, socket: StdMutex>>, settings: StdMutex, } @@ -686,19 +729,16 @@ impl UdpSrc { ) })?; - { - let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap(); - src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address; - src_pad_handler_state.caps = settings.caps; - } - *socket_storage = Some(socket); drop(socket_storage); element.notify("used-socket"); + let src_pad_handler = + UdpSrcPadHandler::new(settings.caps, settings.retrieve_sender_address); + self.src_pad - .prepare(context, &self.src_pad_handler) + .prepare(context, &src_pad_handler) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -706,6 +746,8 @@ impl UdpSrc { ) })?; + *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + gst_debug!(CAT, obj: element, "Prepared"); Ok(()) @@ -722,8 +764,7 @@ impl UdpSrc { } let _ = self.src_pad.unprepare(); - - *self.src_pad_handler.0.state.write().unwrap() = Default::default(); + *self.src_pad_handler.lock().unwrap() = None; gst_debug!(CAT, obj: element, "Unprepared"); @@ -738,11 +779,11 @@ impl UdpSrc { self.src_pad.stop_task(); self.src_pad_handler - .0 - .state - .write() + .lock() .unwrap() - .need_initial_events = true; + .as_ref() + .unwrap() + .reset(&self.src_pad.as_ref()); gst_debug!(CAT, obj: element, "Stopped"); @@ -784,6 +825,14 @@ impl UdpSrc { gst_debug!(CAT, obj: element, "Stopping Flush"); self.src_pad.stop_task(); + + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .flush(&self.src_pad.as_ref()); + self.start_unchecked(element, socket); gst_debug!(CAT, obj: element, "Stopped Flush"); @@ -795,9 +844,26 @@ impl UdpSrc { .unwrap(); self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() .start_task(self.src_pad.as_ref(), element, socket_stream); } + fn flush_start(&self, element: &gst::Element) { + let socket = self.socket.lock().unwrap(); + gst_debug!(CAT, obj: element, "Starting Flush"); + + if let Some(socket) = socket.as_ref() { + socket.pause(); + } + + self.src_pad.cancel_task(); + + gst_debug!(CAT, obj: element, "Flush Started"); + } + fn pause(&self, element: &gst::Element) -> Result<(), ()> { let socket = self.socket.lock().unwrap(); gst_debug!(CAT, obj: element, "Pausing"); @@ -806,7 +872,7 @@ impl UdpSrc { socket.pause(); } - self.src_pad.cancel_task(); + self.src_pad.pause_task(); gst_debug!(CAT, obj: element, "Paused"); @@ -865,7 +931,7 @@ impl ObjectSubclass for UdpSrc { Self { src_pad, - src_pad_handler: UdpSrcPadHandler::default(), + src_pad_handler: StdMutex::new(None), socket: StdMutex::new(None), settings: StdMutex::new(Settings::default()), } diff --git a/gst-plugin-threadshare/tests/appsrc.rs b/gst-plugin-threadshare/tests/appsrc.rs index cfe785f0..8610fcb4 100644 --- a/gst-plugin-threadshare/tests/appsrc.rs +++ b/gst-plugin-threadshare/tests/appsrc.rs @@ -18,6 +18,8 @@ use glib::prelude::*; use gst; +use gst::prelude::*; + use gst_check; use gstthreadshare; @@ -33,7 +35,7 @@ fn init() { } #[test] -fn test_push() { +fn push() { init(); let mut h = gst_check::Harness::new("ts-appsrc"); @@ -43,7 +45,7 @@ fn test_push() { let appsrc = h.get_element().unwrap(); appsrc.set_property("caps", &caps).unwrap(); appsrc.set_property("do-timestamp", &true).unwrap(); - appsrc.set_property("context", &"test-push").unwrap(); + appsrc.set_property("context", &"appsrc-push").unwrap(); } h.play(); @@ -97,3 +99,205 @@ fn test_push() { } assert!(n_events >= 2); } + +#[test] +fn pause() { + init(); + + let mut h = gst_check::Harness::new("ts-appsrc"); + + let caps = gst::Caps::new_simple("foo/bar", &[]); + { + let appsrc = h.get_element().unwrap(); + appsrc.set_property("caps", &caps).unwrap(); + appsrc.set_property("do-timestamp", &true).unwrap(); + appsrc.set_property("context", &"appsrc-pause").unwrap(); + } + + h.play(); + + let appsrc = h.get_element().unwrap(); + + // Initial buffer + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![1, 2, 3, 4])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + + appsrc + .change_state(gst::StateChange::PlayingToPaused) + .unwrap(); + + // Pre-pause buffer + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![5, 6, 7])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + appsrc + .change_state(gst::StateChange::PlayingToPaused) + .unwrap(); + + // Buffer is queued during Paused + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![8, 9])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + appsrc + .change_state(gst::StateChange::PausedToPlaying) + .unwrap(); + + // Pull Pre-pause buffer + let _ = h.pull().unwrap(); + + // Pull buffer queued while Paused + let _ = h.pull().unwrap(); + + // Can push again + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + assert!(h.try_pull().is_none()); +} + +#[test] +fn flush() { + init(); + + let mut h = gst_check::Harness::new("ts-appsrc"); + + let caps = gst::Caps::new_simple("foo/bar", &[]); + { + let appsrc = h.get_element().unwrap(); + appsrc.set_property("caps", &caps).unwrap(); + appsrc.set_property("do-timestamp", &true).unwrap(); + appsrc.set_property("context", &"appsrc-flush").unwrap(); + } + + h.play(); + + let appsrc = h.get_element().unwrap(); + + // Initial buffer + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![1, 2, 3, 4])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + + // FlushStart + assert!(h.push_upstream_event(gst::Event::new_flush_start().build())); + + // Can't push buffer while flushing + assert!(!appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + assert!(h.try_pull().is_none()); + + // FlushStop + assert!(h.push_upstream_event(gst::Event::new_flush_stop(true).build())); + + // No buffer available due to flush + assert!(h.try_pull().is_none()); + + // Can push again + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + assert!(h.try_pull().is_none()); +} + +#[test] +fn pause_flush() { + init(); + + let mut h = gst_check::Harness::new("ts-appsrc"); + + let caps = gst::Caps::new_simple("foo/bar", &[]); + { + let appsrc = h.get_element().unwrap(); + appsrc.set_property("caps", &caps).unwrap(); + appsrc.set_property("do-timestamp", &true).unwrap(); + appsrc + .set_property("context", &"appsrc-pause_flush") + .unwrap(); + } + + h.play(); + + let appsrc = h.get_element().unwrap(); + + // Initial buffer + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::from_slice(vec![1, 2, 3, 4])]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + + appsrc + .change_state(gst::StateChange::PlayingToPaused) + .unwrap(); + + // FlushStart + assert!(h.push_upstream_event(gst::Event::new_flush_start().build())); + + // Can't push buffer while flushing + assert!(!appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + assert!(h.try_pull().is_none()); + + // FlushStop + assert!(h.push_upstream_event(gst::Event::new_flush_stop(true).build())); + + appsrc + .change_state(gst::StateChange::PausedToPlaying) + .unwrap(); + + // No buffer available due to flush + assert!(h.try_pull().is_none()); + + // Can push again + assert!(appsrc + .emit("push-buffer", &[&gst::Buffer::new()]) + .unwrap() + .unwrap() + .get_some::() + .unwrap()); + + let _ = h.pull().unwrap(); + assert!(h.try_pull().is_none()); +} diff --git a/gst-plugin-threadshare/tests/pad.rs b/gst-plugin-threadshare/tests/pad.rs index 63d6a93c..ae06dddf 100644 --- a/gst-plugin-threadshare/tests/pad.rs +++ b/gst-plugin-threadshare/tests/pad.rs @@ -29,13 +29,14 @@ use gst; use gst::prelude::*; use gst::subclass::prelude::*; use gst::EventView; -use gst::{gst_debug, gst_error_msg, gst_log}; +use gst::{gst_debug, gst_error_msg, gst_info, gst_log}; use lazy_static::lazy_static; use std::boxed::Box; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use std::sync::Mutex as StdMutex; use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef}; @@ -79,36 +80,93 @@ lazy_static! { ); } -#[derive(Clone, Debug, Default)] -struct PadSrcHandlerTest; +#[derive(Debug)] +struct PadSrcHandlerTestInner { + receiver: FutMutex>, +} + +impl PadSrcHandlerTestInner { + fn new(receiver: mpsc::Receiver) -> PadSrcHandlerTestInner { + PadSrcHandlerTestInner { + receiver: FutMutex::new(receiver), + } + } +} + +#[derive(Clone, Debug)] +struct PadSrcHandlerTest(Arc); impl PadSrcHandlerTest { - fn start_task(&self, pad: PadSrcRef<'_>, receiver: mpsc::Receiver) { + fn new(receiver: mpsc::Receiver) -> PadSrcHandlerTest { + PadSrcHandlerTest(Arc::new(PadSrcHandlerTestInner::new(receiver))) + } + + fn stop(&self, pad: &PadSrcRef<'_>) { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Stopping handler"); + + pad.stop_task(); + // From here on, the task is stopped so it can't hold resources anymore + + self.flush(pad); + + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handler stopped"); + } + + fn flush(&self, pad: &PadSrcRef<'_>) { + // Precondition: task must be stopped + // TODO: assert the task state when Task & PadSrc are separated + + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushing"); + + // Purge the channel + let mut receiver = self + .0 + .receiver + .try_lock() + .expect("Channel receiver is locked elsewhere"); + loop { + match receiver.try_next() { + Ok(Some(_item)) => { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "Dropping pending item"); + } + Err(_) => { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "No more pending item"); + break; + } + Ok(None) => { + panic!("Channel sender dropped"); + } + } + } + + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Flushed"); + } + + fn start_task(&self, pad: PadSrcRef<'_>) { gst_debug!(SRC_CAT, obj: pad.gst_pad(), "SrcPad task starting"); + + let this = self.clone(); let pad_weak = pad.downgrade(); - let receiver = Arc::new(FutMutex::new(receiver)); + pad.start_task(move || { let pad_weak = pad_weak.clone(); - let receiver = Arc::clone(&receiver); + let this = this.clone(); + async move { + let item = this.0.receiver.lock().await.next().await; + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); - let item = { - let mut receiver = receiver.lock().await; - - match receiver.next().await { - Some(item) => item, - None => { - gst_debug!(SRC_CAT, obj: pad.gst_pad(), "SrcPad channel aborted"); - return glib::Continue(false); - } + let item = match item { + Some(item) => item, + None => { + gst_log!(SRC_CAT, obj: pad.gst_pad(), "SrcPad channel aborted"); + return glib::Continue(false); } }; - // We could also check here first if we're flushing but as we're not doing anything - // complicated below we can just defer that to the pushing function - - match Self::push_item(pad, item).await { + let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); + match this.push_item(pad, item).await { Ok(_) => glib::Continue(true), Err(gst::FlowError::Flushing) => glib::Continue(false), Err(err) => panic!("Got error {:?}", err), @@ -117,7 +175,13 @@ impl PadSrcHandlerTest { }); } - async fn push_item(pad: PadSrcRef<'_>, item: Item) -> Result { + async fn push_item( + self, + pad: PadSrcRef<'_>, + item: Item, + ) -> Result { + gst_debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item); + match item { Item::Event(event) => { pad.push_event(event).await; @@ -144,9 +208,7 @@ impl PadSrcHandler for PadSrcHandlerTest { let ret = match event.view() { EventView::FlushStart(..) => { - // Cancel the task so that it finishes ASAP - // and clear the sender - elem_src_test.pause(element).unwrap(); + elem_src_test.flush_start(element); true } EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true, @@ -167,16 +229,34 @@ impl PadSrcHandler for PadSrcHandlerTest { } } +#[derive(Debug, Eq, PartialEq)] +enum ElementSrcTestState { + Paused, + RejectItems, + Started, +} + #[derive(Debug)] struct ElementSrcTest { src_pad: PadSrc, - src_pad_handler: PadSrcHandlerTest, - sender: Mutex>>, - settings: Mutex, + src_pad_handler: StdMutex>, + state: StdMutex, + sender: StdMutex>>, + settings: StdMutex, } impl ElementSrcTest { fn try_push(&self, item: Item) -> Result<(), Item> { + let state = self.state.lock().unwrap(); + if *state == ElementSrcTestState::RejectItems { + gst_debug!( + SRC_CAT, + "ElementSrcTest rejecting item due to element state" + ); + + return Err(item); + } + match self.sender.lock().unwrap().as_mut() { Some(sender) => sender .try_send(item) @@ -196,8 +276,13 @@ impl ElementSrcTest { ) })?; + let (sender, receiver) = mpsc::channel(1); + *self.sender.lock().unwrap() = Some(sender); + + let src_pad_handler = PadSrcHandlerTest::new(receiver); + self.src_pad - .prepare(context, &self.src_pad_handler) + .prepare(context, &src_pad_handler) .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -205,6 +290,8 @@ impl ElementSrcTest { ) })?; + *self.src_pad_handler.lock().unwrap() = Some(src_pad_handler); + gst_debug!(SRC_CAT, obj: element, "Prepared"); Ok(()) @@ -214,6 +301,7 @@ impl ElementSrcTest { gst_debug!(SRC_CAT, obj: element, "Unpreparing"); self.src_pad.unprepare().unwrap(); + *self.src_pad_handler.lock().unwrap() = None; gst_debug!(SRC_CAT, obj: element, "Unprepared"); @@ -221,15 +309,15 @@ impl ElementSrcTest { } fn start(&self, element: &gst::Element) -> Result<(), ()> { - let mut sender = self.sender.lock().unwrap(); - if sender.is_some() { + let mut state = self.state.lock().unwrap(); + if *state == ElementSrcTestState::Started { gst_debug!(SRC_CAT, obj: element, "Already started"); return Err(()); } gst_debug!(SRC_CAT, obj: element, "Starting"); - self.start_unchecked(&mut sender); + self.start_unchecked(&mut state); gst_debug!(SRC_CAT, obj: element, "Started"); @@ -237,12 +325,8 @@ impl ElementSrcTest { } fn flush_stop(&self, element: &gst::Element) { - // Keep the lock on the `sender` until `flush_stop` is complete - // so as to prevent race conditions due to concurrent state transitions. - // Note that this won't deadlock as `sender` is not used - // within the `src_pad`'s `Task`. - let mut sender = self.sender.lock().unwrap(); - if sender.is_some() { + let mut state = self.state.lock().unwrap(); + if *state == ElementSrcTestState::Started { gst_debug!(SRC_CAT, obj: element, "Already started"); return; } @@ -252,30 +336,52 @@ impl ElementSrcTest { // Stop it so we wait for it to actually finish self.src_pad.stop_task(); + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .flush(&self.src_pad.as_ref()); + // And then start it again - self.start_unchecked(&mut sender); + self.start_unchecked(&mut state); gst_debug!(SRC_CAT, obj: element, "Stopped Flush"); } - fn start_unchecked(&self, sender: &mut Option>) { - // Start the task and set up the sender. We only accept - // data in Playing - let (sender_new, receiver) = mpsc::channel(1); - *sender = Some(sender_new); + fn start_unchecked(&self, state: &mut ElementSrcTestState) { self.src_pad_handler - .start_task(self.src_pad.as_ref(), receiver); + .lock() + .unwrap() + .as_ref() + .unwrap() + .start_task(self.src_pad.as_ref()); + + *state = ElementSrcTestState::Started; + } + + fn flush_start(&self, element: &gst::Element) { + // Keep the lock on the `state` until `flush_start` is complete + // so as to prevent race conditions due to concurrent state transitions. + let mut state = self.state.lock().unwrap(); + + gst_debug!(SRC_CAT, obj: element, "Starting Flush"); + + *state = ElementSrcTestState::RejectItems; + self.src_pad.cancel_task(); + + gst_debug!(SRC_CAT, obj: element, "Flush Started"); } fn pause(&self, element: &gst::Element) -> Result<(), ()> { - let mut sender = self.sender.lock().unwrap(); + // Lock the state to prevent race condition due to concurrent FlushStop + let mut state = self.state.lock().unwrap(); + gst_debug!(SRC_CAT, obj: element, "Pausing"); - // Cancel task, we only accept data in Playing - self.src_pad.cancel_task(); + self.src_pad.pause_task(); - // Prevent subsequent items from being enqueued - *sender = None; + *state = ElementSrcTestState::Paused; gst_debug!(SRC_CAT, obj: element, "Paused"); @@ -285,9 +391,14 @@ impl ElementSrcTest { fn stop(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(SRC_CAT, obj: element, "Stopping"); - // Now stop the task if it was still running, blocking - // until this has actually happened - self.src_pad.stop_task(); + *self.state.lock().unwrap() = ElementSrcTestState::RejectItems; + + self.src_pad_handler + .lock() + .unwrap() + .as_ref() + .unwrap() + .stop(&self.src_pad.as_ref()); gst_debug!(SRC_CAT, obj: element, "Stopped"); @@ -334,9 +445,10 @@ impl ObjectSubclass for ElementSrcTest { ElementSrcTest { src_pad, - src_pad_handler: PadSrcHandlerTest::default(), - sender: Mutex::new(None), - settings: Mutex::new(settings), + src_pad_handler: StdMutex::new(None), + state: StdMutex::new(ElementSrcTestState::RejectItems), + sender: StdMutex::new(None), + settings: StdMutex::new(settings), } } } @@ -595,6 +707,24 @@ impl ElementSinkTest { } } +impl ElementSinkTest { + fn push_flush_start(&self, element: &gst::Element) { + gst_debug!(SINK_CAT, obj: element, "Pushing FlushStart"); + self.sink_pad + .gst_pad() + .push_event(gst::Event::new_flush_start().build()); + gst_debug!(SINK_CAT, obj: element, "FlushStart pushed"); + } + + fn push_flush_stop(&self, element: &gst::Element) { + gst_debug!(SINK_CAT, obj: element, "Pushing FlushStop"); + self.sink_pad + .gst_pad() + .push_event(gst::Event::new_flush_stop(true).build()); + gst_debug!(SINK_CAT, obj: element, "FlushStop pushed"); + } +} + lazy_static! { static ref SINK_CAT: gst::DebugCategory = gst::DebugCategory::new( "ts-element-sink-test", @@ -823,10 +953,10 @@ fn nominal_scenario( // Pause the Pad task pipeline.set_state(gst::State::Paused).unwrap(); - // Items not longer accepted + // Item accepted, but not processed before switching to Playing again elem_src_test - .try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))) - .unwrap_err(); + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7]))) + .unwrap(); // Nothing forwarded receiver.try_next().unwrap_err(); @@ -834,8 +964,13 @@ fn nominal_scenario( // Switch back the Pad task to Started pipeline.set_state(gst::State::Playing).unwrap(); - // Still nothing forwarded - receiver.try_next().unwrap_err(); + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![5, 6, 7].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } // Flush src_element.send_event(gst::Event::new_flush_start().build()); @@ -849,6 +984,33 @@ fn nominal_scenario( other => panic!("Unexpected item {:?}", other), } + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![8, 9]))) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![8, 9].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + // EOS elem_src_test .try_push(Item::Event(gst::Event::new_eos().build())) @@ -885,24 +1047,24 @@ fn src_sink_nominal() { nominal_scenario(&name, pipeline, src_element, receiver); } -// #[test] -// fn src_tsqueue_sink_nominal() { -// init(); -// -// let name = "src_tsqueue_sink"; -// -// let ts_queue = gst::ElementFactory::make("ts-queue", Some("ts-queue")).unwrap(); -// ts_queue -// .set_property("context", &format!("{}_queue", name)) -// .unwrap(); -// ts_queue -// .set_property("context-wait", &THROTTLING_DURATION) -// .unwrap(); -// -// let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None); -// -// nominal_scenario(&name, pipeline, src_element, receiver); -// } +#[test] +fn src_tsqueue_sink_nominal() { + init(); + + let name = "src_tsqueue_sink"; + + let ts_queue = gst::ElementFactory::make("ts-queue", Some("ts-queue")).unwrap(); + ts_queue + .set_property("context", &format!("{}_queue", name)) + .unwrap(); + ts_queue + .set_property("context-wait", &THROTTLING_DURATION) + .unwrap(); + + let (pipeline, src_element, _sink_element, receiver) = setup(name, Some(ts_queue), None); + + nominal_scenario(&name, pipeline, src_element, receiver); +} #[test] fn src_queue_sink_nominal() { @@ -916,30 +1078,309 @@ fn src_queue_sink_nominal() { nominal_scenario(&name, pipeline, src_element, receiver); } -// #[test] -// fn src_tsproxy_sink_nominal() { -// init(); -// -// let name = "src_tsproxy_sink"; -// -// let ts_proxy_sink = gst::ElementFactory::make("ts-proxysink", Some("ts-proxysink")).unwrap(); -// ts_proxy_sink -// .set_property("proxy-context", &format!("{}_proxy_context", name)) -// .unwrap(); -// -// let ts_proxy_src = gst::ElementFactory::make("ts-proxysrc", Some("ts-proxysrc")).unwrap(); -// ts_proxy_src -// .set_property("proxy-context", &format!("{}_proxy_context", name)) -// .unwrap(); -// ts_proxy_src -// .set_property("context", &format!("{}_context", name)) -// .unwrap(); -// ts_proxy_src -// .set_property("context-wait", &THROTTLING_DURATION) -// .unwrap(); -// -// let (pipeline, src_element, _sink_element, receiver) = -// setup(name, Some(ts_proxy_sink), Some(ts_proxy_src)); -// -// nominal_scenario(&name, pipeline, src_element, receiver); -// } +#[test] +fn src_tsproxy_sink_nominal() { + init(); + + let name = "src_tsproxy_sink"; + + let ts_proxy_sink = gst::ElementFactory::make("ts-proxysink", Some("ts-proxysink")).unwrap(); + ts_proxy_sink + .set_property("proxy-context", &format!("{}_proxy_context", name)) + .unwrap(); + + let ts_proxy_src = gst::ElementFactory::make("ts-proxysrc", Some("ts-proxysrc")).unwrap(); + ts_proxy_src + .set_property("proxy-context", &format!("{}_proxy_context", name)) + .unwrap(); + ts_proxy_src + .set_property("context", &format!("{}_context", name)) + .unwrap(); + ts_proxy_src + .set_property("context-wait", &THROTTLING_DURATION) + .unwrap(); + + let (pipeline, src_element, _sink_element, receiver) = + setup(name, Some(ts_proxy_sink), Some(ts_proxy_src)); + + nominal_scenario(&name, pipeline, src_element, receiver); +} + +#[test] +fn start_pause_start() { + init(); + + let scenario_name = "start_pause_start"; + + let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None); + + let elem_src_test = ElementSrcTest::from_instance(&src_element); + + pipeline.set_state(gst::State::Playing).unwrap(); + + // Initial events + elem_src_test + .try_push(Item::Event( + gst::Event::new_stream_start(scenario_name) + .group_id(gst::GroupId::next()) + .build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))) + .unwrap(); + + pipeline.set_state(gst::State::Paused).unwrap(); + + pipeline.set_state(gst::State::Playing).unwrap(); + + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7]))) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![1, 2, 3, 4].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![5, 6, 7].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + + // Nothing else forwarded + receiver.try_next().unwrap_err(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn start_stop_start() { + init(); + + let scenario_name = "start_stop_start"; + + let (pipeline, src_element, _sink_element, mut receiver) = setup(&scenario_name, None, None); + + let elem_src_test = ElementSrcTest::from_instance(&src_element); + + pipeline.set_state(gst::State::Playing).unwrap(); + + // Initial events + elem_src_test + .try_push(Item::Event( + gst::Event::new_stream_start(&format!("{}-after_stop", scenario_name)) + .group_id(gst::GroupId::next()) + .build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))) + .unwrap(); + + pipeline.set_state(gst::State::Ready).unwrap(); + + pipeline.set_state(gst::State::Playing).unwrap(); + + // Initial events again + elem_src_test + .try_push(Item::Event( + gst::Event::new_stream_start(scenario_name) + .group_id(gst::GroupId::next()) + .build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(_buffer) => { + gst_info!( + SRC_CAT, + "{}: initial buffer went through, don't expect any pending item to be dropped", + scenario_name + ); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + } + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7]))) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![5, 6, 7].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn start_flush() { + init(); + + let scenario_name = "start_flush"; + + let (pipeline, src_element, sink_element, mut receiver) = setup(&scenario_name, None, None); + + let elem_src_test = ElementSrcTest::from_instance(&src_element); + + pipeline.set_state(gst::State::Playing).unwrap(); + + // Initial events + elem_src_test + .try_push(Item::Event( + gst::Event::new_stream_start(&format!("{}-after_stop", scenario_name)) + .group_id(gst::GroupId::next()) + .build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::StreamStart(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![1, 2, 3, 4]))) + .unwrap(); + + let elem_sink_test = ElementSinkTest::from_instance(&sink_element); + + elem_sink_test.push_flush_start(&sink_element); + + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![5, 6, 7]))) + .unwrap_err(); + + elem_sink_test.push_flush_stop(&sink_element); + + elem_src_test + .try_push(Item::Event( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + )) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Event(event) => match event.view() { + EventView::Segment(_) => (), + other => panic!("Unexpected event {:?}", other), + }, + other => panic!("Unexpected item {:?}", other), + } + + // Post flush buffer + elem_src_test + .try_push(Item::Buffer(gst::Buffer::from_slice(vec![8, 9]))) + .unwrap(); + + match futures::executor::block_on(receiver.next()).unwrap() { + Item::Buffer(buffer) => { + let data = buffer.map_readable().unwrap(); + assert_eq!(data.as_slice(), vec![8, 9].as_slice()); + } + other => panic!("Unexpected item {:?}", other), + } + + pipeline.set_state(gst::State::Null).unwrap(); +} diff --git a/gst-plugin-threadshare/tests/pipeline.rs b/gst-plugin-threadshare/tests/pipeline.rs index 1105d55a..61a7851a 100644 --- a/gst-plugin-threadshare/tests/pipeline.rs +++ b/gst-plugin-threadshare/tests/pipeline.rs @@ -504,7 +504,7 @@ fn premature_shutdown() { appsink.set_property("sync", &false).unwrap(); appsink.set_property("async", &false).unwrap(); - let (sender, receiver) = mpsc::channel(); + let (appsink_sender, appsink_receiver) = mpsc::channel(); let appsink = appsink.dynamic_cast::().unwrap(); appsink.connect_new_sample(move |appsink| { @@ -517,13 +517,18 @@ fn premature_shutdown() { .unwrap() .unwrap(); - sender.send(()).unwrap(); + appsink_sender.send(()).unwrap(); Ok(gst::FlowSuccess::Ok) }); - fn push_buffer(src: &gst::Element) -> bool { - gst_debug!(CAT, obj: src, "premature_shutdown: pushing buffer"); + fn push_buffer(src: &gst::Element, intent: &str) -> bool { + gst_debug!( + CAT, + obj: src, + "premature_shutdown: pushing buffer {}", + intent + ); src.emit("push-buffer", &[&gst::Buffer::from_slice(vec![0; 1024])]) .unwrap() .unwrap() @@ -536,38 +541,41 @@ fn premature_shutdown() { let mut scenario = Some(move || { gst_debug!(CAT, "premature_shutdown: STEP 1: Playing"); // Initialize the dataflow - assert!(push_buffer(&src)); + assert!(push_buffer(&src, "(initial)")); // Wait for the buffer to reach AppSink - receiver.recv().unwrap(); - assert_eq!(receiver.try_recv().unwrap_err(), mpsc::TryRecvError::Empty); + appsink_receiver.recv().unwrap(); + assert_eq!( + appsink_receiver.try_recv().unwrap_err(), + mpsc::TryRecvError::Empty + ); - assert!(push_buffer(&src)); + assert!(push_buffer(&src, "before Playing -> Paused")); + gst_debug!(CAT, "premature_shutdown: STEP 2: Playing -> Paused"); pipeline_clone.set_state(gst::State::Paused).unwrap(); - // Paused -> can't push_buffer - assert!(!push_buffer(&src)); - - gst_debug!(CAT, "premature_shutdown: STEP 2: Paused -> Playing"); + gst_debug!(CAT, "premature_shutdown: STEP 3: Paused -> Playing"); pipeline_clone.set_state(gst::State::Playing).unwrap(); - gst_debug!(CAT, "premature_shutdown: STEP 3: Playing"); + gst_debug!(CAT, "premature_shutdown: Playing again"); - receiver.recv().unwrap(); + gst_debug!(CAT, "Waiting for buffer sent before Playing -> Paused"); + appsink_receiver.recv().unwrap(); - assert!(push_buffer(&src)); - receiver.recv().unwrap(); + assert!(push_buffer(&src, "after Paused -> Playing")); + gst_debug!(CAT, "Waiting for buffer sent after Paused -> Playing"); + appsink_receiver.recv().unwrap(); // Fill up the (dataqueue) and abruptly shutdown - assert!(push_buffer(&src)); - assert!(push_buffer(&src)); + assert!(push_buffer(&src, "filling 1")); + assert!(push_buffer(&src, "filling 2")); - gst_debug!(CAT, "premature_shutdown: STEP 4: Shutdown"); + gst_debug!(CAT, "premature_shutdown: STEP 4: Playing -> Null"); pipeline_clone.set_state(gst::State::Null).unwrap(); - assert!(!push_buffer(&src)); + assert!(!push_buffer(&src, "after Null")); l_clone.quit(); });