threadshare/queue: Port queue to the new API

This commit is contained in:
François Laignel 2020-03-13 21:21:11 +01:00 committed by Sebastian Dröge
parent ded3af31c1
commit 95b2641056
3 changed files with 239 additions and 315 deletions

View file

@ -16,7 +16,6 @@
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use futures::future::{self, abortable, AbortHandle}; use futures::future::{self, abortable, AbortHandle};
use futures::lock::Mutex;
use gst; use gst;
use gst::gst_debug; use gst::gst_debug;
@ -26,6 +25,7 @@ use lazy_static::lazy_static;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::{u32, u64}; use std::{u32, u64};
lazy_static! { lazy_static! {
@ -66,15 +66,15 @@ impl DataQueueItem {
} }
} }
#[derive(PartialEq, Eq, Debug)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum DataQueueState { pub enum DataQueueState {
Paused, Paused,
Started, Started,
Stopped, Stopped,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DataQueue(Arc<Mutex<DataQueueInner>>); pub struct DataQueue(Arc<StdMutex<DataQueueInner>>);
#[derive(Debug)] #[derive(Debug)]
struct DataQueueInner { struct DataQueueInner {
@ -109,7 +109,7 @@ impl DataQueue {
max_size_bytes: Option<u32>, max_size_bytes: Option<u32>,
max_size_time: Option<u64>, max_size_time: Option<u64>,
) -> DataQueue { ) -> DataQueue {
DataQueue(Arc::new(Mutex::new(DataQueueInner { DataQueue(Arc::new(StdMutex::new(DataQueueInner {
element: element.clone(), element: element.clone(),
src_pad: src_pad.clone(), src_pad: src_pad.clone(),
state: DataQueueState::Stopped, state: DataQueueState::Stopped,
@ -123,8 +123,12 @@ impl DataQueue {
}))) })))
} }
pub async fn start(&self) { pub fn state(&self) -> DataQueueState {
let mut inner = self.0.lock().await; self.0.lock().unwrap().state
}
pub fn start(&self) {
let mut inner = self.0.lock().unwrap();
if inner.state == DataQueueState::Started { if inner.state == DataQueueState::Started {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue already Started"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue already Started");
return; return;
@ -134,8 +138,8 @@ impl DataQueue {
inner.wake(); inner.wake();
} }
pub async fn pause(&self) { pub fn pause(&self) {
let mut inner = self.0.lock().await; let mut inner = self.0.lock().unwrap();
if inner.state == DataQueueState::Paused { if inner.state == DataQueueState::Paused {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue already Paused"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue already Paused");
return; return;
@ -146,8 +150,8 @@ impl DataQueue {
inner.wake(); inner.wake();
} }
pub async fn stop(&self) { pub fn stop(&self) {
let mut inner = self.0.lock().await; let mut inner = self.0.lock().unwrap();
if inner.state == DataQueueState::Stopped { if inner.state == DataQueueState::Stopped {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue already Stopped"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue already Stopped");
return; return;
@ -157,8 +161,8 @@ impl DataQueue {
inner.wake(); inner.wake();
} }
pub async fn clear(&self) { pub fn clear(&self) {
let mut inner = self.0.lock().await; let mut inner = self.0.lock().unwrap();
assert_eq!(inner.state, DataQueueState::Paused); assert_eq!(inner.state, DataQueueState::Paused);
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Clearing data queue"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Clearing data queue");
@ -178,8 +182,8 @@ impl DataQueue {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue cleared"); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue cleared");
} }
pub async fn push(&self, item: DataQueueItem) -> Result<(), DataQueueItem> { pub fn push(&self, item: DataQueueItem) -> Result<(), DataQueueItem> {
let mut inner = self.0.lock().await; let mut inner = self.0.lock().unwrap();
if inner.state != DataQueueState::Started { if inner.state != DataQueueState::Started {
gst_debug!( gst_debug!(
@ -235,13 +239,12 @@ impl DataQueue {
Ok(()) Ok(())
} }
// Implementing `next` as an `async fn` instead of a `Stream` because of the `async` `Mutex` // TODO: implement as a Stream now that we use a StdMutex
// See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/merge_requests/204#note_322774
#[allow(clippy::should_implement_trait)] #[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Option<DataQueueItem> { pub async fn next(&mut self) -> Option<DataQueueItem> {
loop { loop {
let pending_fut = { let pending_fut = {
let mut inner = self.0.lock().await; let mut inner = self.0.lock().unwrap();
match inner.state { match inner.state {
DataQueueState::Started => match inner.queue.pop_front() { DataQueueState::Started => match inner.queue.pop_front() {
None => { None => {

View file

@ -35,10 +35,10 @@ mod udpsink;
mod udpsrc; mod udpsrc;
mod appsrc; mod appsrc;
//pub mod dataqueue; pub mod dataqueue;
//mod jitterbuffer; //mod jitterbuffer;
//mod proxy; //mod proxy;
//mod queue; mod queue;
use glib::translate::*; use glib::translate::*;
use glib_sys as glib_ffi; use glib_sys as glib_ffi;
@ -52,7 +52,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
udpsrc::register(plugin)?; udpsrc::register(plugin)?;
udpsink::register(plugin)?; udpsink::register(plugin)?;
tcpclientsrc::register(plugin)?; tcpclientsrc::register(plugin)?;
//queue::register(plugin)?; queue::register(plugin)?;
//proxy::register(plugin)?; //proxy::register(plugin)?;
appsrc::register(plugin)?; appsrc::register(plugin)?;
//jitterbuffer::jitterbuffer::register(plugin)?; //jitterbuffer::jitterbuffer::register(plugin)?;

View file

@ -15,11 +15,8 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use either::Either;
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::prelude::*; use futures::prelude::*;
use glib; use glib;
@ -36,13 +33,14 @@ use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg, gst_log, gst_t
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::{self, Arc}; use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::{u32, u64}; use std::{u32, u64};
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{self, Context, JoinHandle, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef};
use super::dataqueue::{DataQueue, DataQueueItem}; use super::dataqueue::{DataQueue, DataQueueItem, DataQueueState};
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024;
@ -140,13 +138,8 @@ impl PendingQueue {
} }
} }
#[derive(Debug, Default)] #[derive(Clone)]
struct QueuePadSinkHandlerInner { struct QueuePadSinkHandler;
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
}
#[derive(Clone, Default)]
struct QueuePadSinkHandler(Arc<QueuePadSinkHandlerInner>);
impl PadSinkHandler for QueuePadSinkHandler { impl PadSinkHandler for QueuePadSinkHandler {
type ElementImpl = Queue; type ElementImpl = Queue;
@ -197,64 +190,47 @@ impl PadSinkHandler for QueuePadSinkHandler {
queue: &Queue, queue: &Queue,
element: &gst::Element, element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> { ) -> bool {
use gst::EventView; use gst::EventView;
if event.is_serialized() { gst_debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
let pad_weak = pad.downgrade();
let element = element.clone();
let inner_weak = Arc::downgrade(&self.0);
Either::Right( if let EventView::FlushStart(..) = event.view() {
async move { queue.stop(&element).unwrap();
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); }
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let queue = Queue::from_instance(&element); gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
queue.src_pad.gst_pad().push_event(event)
}
if let EventView::FlushStop(..) = event.view() { fn sink_event_serialized(
let inner = inner_weak.upgrade().unwrap(); &self,
pad: &PadSinkRef,
_queue: &Queue,
element: &gst::Element,
event: gst::Event,
) -> BoxFuture<'static, bool> {
use gst::EventView;
let flush_join_handle = inner.flush_join_handle.lock().unwrap().take(); gst_log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
if let Some(flush_join_handle) = flush_join_handle {
gst_debug!(CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete");
if let Ok(Ok(())) = flush_join_handle.await {
let _ = queue.start(&element).await;
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: FlushStart failed to complete");
}
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
}
gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); let pad_weak = pad.downgrade();
queue let element = element.clone();
.enqueue_item(&element, DataQueueItem::Event(event)) async move {
.await let pad = pad_weak.upgrade().expect("PadSink no longer exists");
.is_ok() let queue = Queue::from_instance(&element);
}
.boxed(), if let EventView::FlushStop(..) = event.view() {
) queue.flush_stop(&element);
} else {
if let EventView::FlushStart(..) = event.view() {
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let element = element.clone();
*flush_join_handle =
Some(queue.src_pad.spawn(async move {
Queue::from_instance(&element).stop(&element).await
}));
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
} }
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
queue
Either::Left(queue.src_pad.gst_pad().push_event(event)) .enqueue_item(&element, DataQueueItem::Event(event))
.await
.is_ok()
} }
.boxed()
} }
fn sink_query( fn sink_query(
@ -277,23 +253,33 @@ impl PadSinkHandler for QueuePadSinkHandler {
} }
} }
#[derive(Debug, Default)] #[derive(Debug)]
struct QueuePadSrcHandlerInner { struct QueuePadSrcHandlerInner {
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>, last_res: Result<gst::FlowSuccess, gst::FlowError>,
}
impl Default for QueuePadSrcHandlerInner {
fn default() -> QueuePadSrcHandlerInner {
QueuePadSrcHandlerInner {
last_res: Ok(gst::FlowSuccess::Ok),
}
}
} }
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
struct QueuePadSrcHandler(Arc<QueuePadSrcHandlerInner>); struct QueuePadSrcHandler(Arc<StdMutex<QueuePadSrcHandlerInner>>);
impl QueuePadSrcHandler { impl QueuePadSrcHandler {
async fn start_task(pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) { fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element, dataqueue: DataQueue) {
let this = self.clone();
let pad_weak = pad.downgrade(); let pad_weak = pad.downgrade();
let element = element.clone(); let element = element.clone();
let dataqueue = dataqueue.clone();
pad.start_task(move || { pad.start_task(move || {
let this = this.clone();
let pad_weak = pad_weak.clone(); let pad_weak = pad_weak.clone();
let element = element.clone(); let element = element.clone();
let mut dataqueue = dataqueue.clone(); let mut dataqueue = dataqueue.clone();
async move { async move {
let item = dataqueue.next().await; let item = dataqueue.next().await;
@ -302,25 +288,56 @@ impl QueuePadSrcHandler {
Some(item) => item, Some(item) => item,
None => { None => {
gst_log!(CAT, obj: pad.gst_pad(), "DataQueue Stopped"); gst_log!(CAT, obj: pad.gst_pad(), "DataQueue Stopped");
pad.pause_task().await; return glib::Continue(false);
return;
} }
}; };
Self::push_item(pad, &element, item).await; match Self::push_item(&pad, &element, item).await {
Ok(()) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item");
this.0.lock().unwrap().last_res = Ok(gst::FlowSuccess::Ok);
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
this.0.lock().unwrap().last_res = Err(gst::FlowError::Flushing);
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
this.0.lock().unwrap().last_res = Err(gst::FlowError::Eos);
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
this.0.lock().unwrap().last_res = Err(err);
glib::Continue(false)
}
}
} }
}) });
.await;
} }
async fn push_item(pad: PadSrcRef<'_>, element: &gst::Element, item: DataQueueItem) { async fn push_item(
pad: &PadSrcRef<'_>,
element: &gst::Element,
item: DataQueueItem,
) -> Result<(), gst::FlowError> {
let queue = Queue::from_instance(element); let queue = Queue::from_instance(element);
if let Some(ref mut pending_queue) = queue.state.lock().await.pending_queue { if let Some(pending_queue) = queue.pending_queue.lock().unwrap().as_mut() {
pending_queue.notify_more_queue_space(); pending_queue.notify_more_queue_space();
} }
let res = match item { match item {
DataQueueItem::Buffer(buffer) => { DataQueueItem::Buffer(buffer) => {
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer);
pad.push(buffer).await.map(drop) pad.push(buffer).await.map(drop)
@ -334,39 +351,12 @@ impl QueuePadSrcHandler {
pad.push_event(event).await; pad.push_event(event).await;
Ok(()) Ok(())
} }
};
match res {
Ok(()) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed item");
let mut state = queue.state.lock().await;
state.last_res = Ok(gst::FlowSuccess::Ok);
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
let mut state = queue.state.lock().await;
pad.pause_task().await;
state.last_res = Err(gst::FlowError::Flushing);
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let mut state = queue.state.lock().await;
pad.pause_task().await;
state.last_res = Err(gst::FlowError::Eos);
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
let mut state = queue.state.lock().await;
state.last_res = Err(err);
}
} }
} }
fn unprepare(&self) {
*self.0.lock().unwrap() = QueuePadSrcHandlerInner::default();
}
} }
impl PadSrcHandler for QueuePadSrcHandler { impl PadSrcHandler for QueuePadSrcHandler {
@ -378,82 +368,19 @@ impl PadSrcHandler for QueuePadSrcHandler {
queue: &Queue, queue: &Queue,
element: &gst::Element, element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> { ) -> bool {
use gst::EventView; use gst::EventView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
if event.is_serialized() { match event.view() {
let element = element.clone(); EventView::FlushStart(..) => queue.stop(element).unwrap(),
let inner_weak = Arc::downgrade(&self.0); EventView::FlushStop(..) => queue.flush_stop(element),
let pad_weak = pad.downgrade(); _ => (),
Either::Right(
async move {
let ret = if let EventView::FlushStop(..) = event.view() {
let mut ret = false;
let pad = pad_weak.upgrade().unwrap();
let inner_weak = inner_weak.upgrade().unwrap();
let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
gst_debug!(CAT, obj: pad.gst_pad(), "Waiting for FlushStart to complete");
if let Ok(Ok(())) = flush_join_handle.await {
ret = Queue::from_instance(&element)
.start(&element)
.await
.is_ok();
gst_log!(CAT, obj: pad.gst_pad(), "FlushStop complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed");
}
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
ret
} else {
true
};
if ret {
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event);
let queue = Queue::from_instance(&element);
queue.sink_pad.gst_pad().push_event(event)
} else {
false
}
}
.boxed(),
)
} else {
if let EventView::FlushStart(..) = event.view() {
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
let element = element.clone();
let pad_weak = pad.downgrade();
*flush_join_handle = Some(pad.spawn(async move {
let res = Queue::from_instance(&element).stop(&element).await;
let pad = pad_weak.upgrade().unwrap();
if res.is_ok() {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed");
}
res
}));
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
Either::Left(queue.sink_pad.gst_pad().push_event(event))
} }
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
queue.sink_pad.gst_pad().push_event(event)
} }
fn src_query( fn src_query(
@ -495,29 +422,14 @@ impl PadSrcHandler for QueuePadSrcHandler {
} }
} }
#[derive(Debug)]
struct State {
queue: Option<DataQueue>,
pending_queue: Option<PendingQueue>,
last_res: Result<gst::FlowSuccess, gst::FlowError>,
}
impl Default for State {
fn default() -> State {
State {
queue: None,
pending_queue: None,
last_res: Ok(gst::FlowSuccess::Ok),
}
}
}
#[derive(Debug)] #[derive(Debug)]
struct Queue { struct Queue {
sink_pad: PadSink, sink_pad: PadSink,
src_pad: PadSrc, src_pad: PadSrc,
state: Mutex<State>, src_pad_handler: QueuePadSrcHandler,
settings: Mutex<Settings>, dataqueue: StdMutex<Option<DataQueue>>,
pending_queue: StdMutex<Option<PendingQueue>>,
settings: StdMutex<Settings>,
} }
lazy_static! { lazy_static! {
@ -533,14 +445,14 @@ impl Queue {
* the current item. Errors out if the DataQueue was full, or the pending queue * the current item. Errors out if the DataQueue was full, or the pending queue
* is already scheduled, in which case the current item should be added to the * is already scheduled, in which case the current item should be added to the
* pending queue */ * pending queue */
async fn queue_until_full( fn queue_until_full(
&self, &self,
queue: &DataQueue, dataqueue: &DataQueue,
pending_queue: &mut Option<PendingQueue>, pending_queue: &mut Option<PendingQueue>,
item: DataQueueItem, item: DataQueueItem,
) -> Result<(), DataQueueItem> { ) -> Result<(), DataQueueItem> {
match pending_queue { match pending_queue {
None => queue.push(item).await, None => dataqueue.push(item),
Some(PendingQueue { Some(PendingQueue {
scheduled: false, scheduled: false,
ref mut items, ref mut items,
@ -548,7 +460,7 @@ impl Queue {
}) => { }) => {
let mut failed_item = None; let mut failed_item = None;
while let Some(item) = items.pop_front() { while let Some(item) = items.pop_front() {
if let Err(item) = queue.push(item).await { if let Err(item) = dataqueue.push(item) {
failed_item = Some(item); failed_item = Some(item);
} }
} }
@ -558,7 +470,7 @@ impl Queue {
Err(item) Err(item)
} else { } else {
queue.push(item).await dataqueue.push(item)
} }
} }
_ => Err(item), _ => Err(item),
@ -583,23 +495,18 @@ impl Queue {
loop { loop {
let more_queue_space_receiver = { let more_queue_space_receiver = {
let mut state = queue.state.lock().await; let dataqueue = queue.dataqueue.lock().unwrap();
let State { if dataqueue.is_none() {
queue: ref dq,
pending_queue: ref mut pq,
..
} = *state;
if dq.is_none() {
return; return;
} }
let mut pending_queue_grd = queue.pending_queue.lock().unwrap();
gst_log!(CAT, obj: &element, "Trying to empty pending queue"); gst_log!(CAT, obj: &element, "Trying to empty pending queue");
if let Some(ref mut pending_queue) = *pq { if let Some(pending_queue) = pending_queue_grd.as_mut() {
let mut failed_item = None; let mut failed_item = None;
while let Some(item) = pending_queue.items.pop_front() { while let Some(item) = pending_queue.items.pop_front() {
if let Err(item) = dq.as_ref().unwrap().push(item).await { if let Err(item) = dataqueue.as_ref().unwrap().push(item) {
failed_item = Some(item); failed_item = Some(item);
} }
} }
@ -612,12 +519,11 @@ impl Queue {
receiver receiver
} else { } else {
gst_log!(CAT, obj: &element, "Pending queue is empty now"); gst_log!(CAT, obj: &element, "Pending queue is empty now");
*pq = None; *pending_queue_grd = None;
return; return;
} }
} else { } else {
gst_log!(CAT, obj: &element, "Flushing, dropping pending queue"); gst_log!(CAT, obj: &element, "Flushing, dropping pending queue");
*pq = None;
return; return;
} }
}; };
@ -634,19 +540,15 @@ impl Queue {
item: DataQueueItem, item: DataQueueItem,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let wait_fut = { let wait_fut = {
let mut state = self.state.lock().await; let dataqueue = self.dataqueue.lock().unwrap();
let State { let dataqueue = dataqueue.as_ref().ok_or_else(|| {
ref queue, gst_error!(CAT, obj: element, "No DataQueue");
ref mut pending_queue,
..
} = *state;
let queue = queue.as_ref().ok_or_else(|| {
gst_error!(CAT, obj: element, "No Queue");
gst::FlowError::Error gst::FlowError::Error
})?; })?;
if let Err(item) = self.queue_until_full(queue, pending_queue, item).await { let mut pending_queue = self.pending_queue.lock().unwrap();
if let Err(item) = self.queue_until_full(&dataqueue, &mut pending_queue, item) {
if pending_queue if pending_queue
.as_ref() .as_ref()
.map(|pq| !pq.scheduled) .map(|pq| !pq.scheduled)
@ -676,7 +578,7 @@ impl Queue {
); );
if schedule_now { if schedule_now {
let wait_fut = self.schedule_pending_queue(element, pending_queue); let wait_fut = self.schedule_pending_queue(element, &mut pending_queue);
Some(wait_fut) Some(wait_fut)
} else { } else {
gst_log!(CAT, obj: element, "Scheduling pending queue later"); gst_log!(CAT, obj: element, "Scheduling pending queue later");
@ -696,38 +598,35 @@ impl Queue {
wait_fut.await; wait_fut.await;
} }
self.state.lock().await.last_res self.src_pad_handler.0.lock().unwrap().last_res
} }
async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().await;
gst_debug!(CAT, obj: element, "Preparing"); gst_debug!(CAT, obj: element, "Preparing");
let settings = self.settings.lock().await; let settings = self.settings.lock().unwrap().clone();
{ let dataqueue = DataQueue::new(
let dataqueue = DataQueue::new( &element.clone().upcast(),
&element.clone().upcast(), self.src_pad.gst_pad(),
self.src_pad.gst_pad(), if settings.max_size_buffers == 0 {
if settings.max_size_buffers == 0 { None
None } else {
} else { Some(settings.max_size_buffers)
Some(settings.max_size_buffers) },
}, if settings.max_size_bytes == 0 {
if settings.max_size_bytes == 0 { None
None } else {
} else { Some(settings.max_size_bytes)
Some(settings.max_size_bytes) },
}, if settings.max_size_time == 0 {
if settings.max_size_time == 0 { None
None } else {
} else { Some(settings.max_size_time)
Some(settings.max_size_time) },
}, );
);
state.queue = Some(dataqueue); *self.dataqueue.lock().unwrap() = Some(dataqueue);
}
let context = let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| { Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
@ -738,77 +637,99 @@ impl Queue {
})?; })?;
self.src_pad self.src_pad
.prepare(context, &QueuePadSrcHandler::default()) .prepare(context, &self.src_pad_handler)
.await
.map_err(|err| { .map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
["Error joining Context: {:?}", err] ["Error joining Context: {:?}", err]
) )
})?; })?;
self.sink_pad.prepare(&QueuePadSinkHandler::default()).await; self.sink_pad.prepare(&QueuePadSinkHandler);
gst_debug!(CAT, obj: element, "Prepared"); gst_debug!(CAT, obj: element, "Prepared");
Ok(()) Ok(())
} }
async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().await;
gst_debug!(CAT, obj: element, "Unpreparing"); gst_debug!(CAT, obj: element, "Unpreparing");
self.src_pad.stop_task().await; self.src_pad.stop_task();
self.sink_pad.unprepare().await; self.sink_pad.unprepare();
let _ = self.src_pad.unprepare().await; let _ = self.src_pad.unprepare();
*state = State::default(); self.src_pad_handler.unprepare();
*self.dataqueue.lock().unwrap() = None;
*self.pending_queue.lock().unwrap() = None;
gst_debug!(CAT, obj: element, "Unprepared"); gst_debug!(CAT, obj: element, "Unprepared");
Ok(()) Ok(())
} }
async fn start(&self, element: &gst::Element) -> Result<(), ()> { fn start(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().await; let dataqueue = self.dataqueue.lock().unwrap();
let dataqueue = dataqueue.as_ref().unwrap();
if dataqueue.state() == DataQueueState::Started {
gst_debug!(CAT, obj: element, "Already started");
return Ok(());
}
gst_debug!(CAT, obj: element, "Starting"); gst_debug!(CAT, obj: element, "Starting");
let dataqueue = state.queue.as_ref().unwrap().clone(); self.start_unchecked(element, dataqueue);
dataqueue.start().await;
QueuePadSrcHandler::start_task(self.src_pad.as_ref(), element, dataqueue).await;
state.last_res = Ok(gst::FlowSuccess::Ok);
gst_debug!(CAT, obj: element, "Started"); gst_debug!(CAT, obj: element, "Started");
Ok(()) Ok(())
} }
async fn stop(&self, element: &gst::Element) -> Result<(), ()> { fn flush_stop(&self, element: &gst::Element) {
let pause_completion = { // Keep the lock on the `dataqueue` until `flush_stop` is complete
let mut state = self.state.lock().await; // so as to prevent race conditions due to concurrent state transitions.
gst_debug!(CAT, obj: element, "Stopping"); // Note that this won't deadlock as `Queue::dataqueue` guards a pointer to
// the `dataqueue` used within the `src_pad`'s `Task`.
let dataqueue = self.dataqueue.lock().unwrap();
let dataqueue = dataqueue.as_ref().unwrap();
if dataqueue.state() == DataQueueState::Started {
gst_debug!(CAT, obj: element, "Already started");
return;
}
let pause_completion = self.src_pad.pause_task().await; gst_debug!(CAT, obj: element, "Stopping Flush");
if let Some(ref dataqueue) = state.queue { self.src_pad.stop_task();
dataqueue.pause().await; self.start_unchecked(element, dataqueue);
dataqueue.clear().await;
dataqueue.stop().await;
}
if let Some(ref mut pending_queue) = state.pending_queue { gst_debug!(CAT, obj: element, "Stopped Flush");
pending_queue.notify_more_queue_space(); }
}
pause_completion fn start_unchecked(&self, element: &gst::Element, dataqueue: &DataQueue) {
}; dataqueue.start();
gst_debug!(CAT, obj: element, "Waiting for Task Pause to complete"); self.src_pad_handler.0.lock().unwrap().last_res = Ok(gst::FlowSuccess::Ok);
pause_completion.await; self.src_pad_handler
.start_task(self.src_pad.as_ref(), element, dataqueue.clone());
}
self.state.lock().await.last_res = Err(gst::FlowError::Flushing); fn stop(&self, element: &gst::Element) -> Result<(), ()> {
let dataqueue = self.dataqueue.lock().unwrap();
gst_debug!(CAT, obj: element, "Stopping");
self.src_pad.stop_task();
if let Some(dataqueue) = dataqueue.as_ref() {
dataqueue.pause();
dataqueue.clear();
dataqueue.stop();
}
if let Some(pending_queue) = self.pending_queue.lock().unwrap().as_mut() {
pending_queue.notify_more_queue_space();
}
self.src_pad_handler.0.lock().unwrap().last_res = Err(gst::FlowError::Flushing);
gst_debug!(CAT, obj: element, "Stopped"); gst_debug!(CAT, obj: element, "Stopped");
@ -865,8 +786,10 @@ impl ObjectSubclass for Queue {
Self { Self {
sink_pad, sink_pad,
src_pad, src_pad,
state: Mutex::new(State::default()), src_pad_handler: QueuePadSrcHandler::default(),
settings: Mutex::new(Settings::default()), dataqueue: StdMutex::new(None),
pending_queue: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
} }
} }
} }
@ -877,7 +800,7 @@ impl ObjectImpl for Queue {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id]; let prop = &PROPERTIES[id];
let mut settings = runtime::executor::block_on(self.settings.lock()); let mut settings = self.settings.lock().unwrap();
match *prop { match *prop {
subclass::Property("max-size-buffers", ..) => { subclass::Property("max-size-buffers", ..) => {
settings.max_size_buffers = value.get_some().expect("type checked upstream"); settings.max_size_buffers = value.get_some().expect("type checked upstream");
@ -904,7 +827,7 @@ impl ObjectImpl for Queue {
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> { fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id]; let prop = &PROPERTIES[id];
let settings = runtime::executor::block_on(self.settings.lock()); let settings = self.settings.lock().unwrap();
match *prop { match *prop {
subclass::Property("max-size-buffers", ..) => Ok(settings.max_size_buffers.to_value()), subclass::Property("max-size-buffers", ..) => Ok(settings.max_size_buffers.to_value()),
subclass::Property("max-size-bytes", ..) => Ok(settings.max_size_bytes.to_value()), subclass::Property("max-size-bytes", ..) => Ok(settings.max_size_bytes.to_value()),
@ -934,18 +857,16 @@ impl ElementImpl for Queue {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
runtime::executor::block_on(self.prepare(element)).map_err(|err| { self.prepare(element).map_err(|err| {
element.post_error_message(&err); element.post_error_message(&err);
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
runtime::executor::block_on(self.stop(element)) self.stop(element).map_err(|_| gst::StateChangeError)?;
.map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
runtime::executor::block_on(self.unprepare(element)) self.unprepare(element).map_err(|_| gst::StateChangeError)?;
.map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }
@ -953,7 +874,7 @@ impl ElementImpl for Queue {
let success = self.parent_change_state(element, transition)?; let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused { if transition == gst::StateChange::ReadyToPaused {
runtime::executor::block_on(self.start(element)).map_err(|_| gst::StateChangeError)?; self.start(element).map_err(|_| gst::StateChangeError)?;
} }
Ok(success) Ok(success)