threadshare: Implement pending futures that could be scheduled downstream as result of a push

This is used by the queue to schedule putting data into the queue once
it has space again.

Also implement blocking-wait in the queue on the sinkpad if there is no
IOContext upstream and generally clean up various things.
This commit is contained in:
Sebastian Dröge 2018-03-26 17:49:42 +03:00
parent 1e26ca6365
commit e03c27814b
4 changed files with 468 additions and 63 deletions

View file

@ -20,8 +20,10 @@ use std::sync::{Arc, Mutex, Weak};
use std::sync::atomic;
use std::thread;
use std::io;
use std::mem;
use futures::Future;
use futures::{Future, Stream};
use futures::stream::futures_unordered::FuturesUnordered;
use tokio::executor::thread_pool;
use tokio::reactor;
@ -235,6 +237,10 @@ struct IOContextInner {
pool: Either<thread_pool::ThreadPool, IOContextExecutor>,
// Only used for dropping
_shutdown: IOContextShutdown,
pending_futures: Mutex<(
u64,
HashMap<u64, FuturesUnordered<Box<Future<Item = (), Error = ()> + Send + 'static>>>,
)>,
}
impl Drop for IOContextInner {
@ -283,6 +289,7 @@ impl IOContext {
name: name.into(),
pool,
_shutdown: shutdown,
pending_futures: Mutex::new((0, HashMap::new())),
});
contexts.insert(name.into(), Arc::downgrade(&context));
@ -299,4 +306,42 @@ impl IOContext {
Either::Right(ref pool) => pool.spawn(future),
}
}
pub fn acquire_pending_future_id(&self) -> PendingFutureId {
let mut pending_futures = self.0.pending_futures.lock().unwrap();
let id = pending_futures.0;
pending_futures.0 += 1;
pending_futures.1.insert(id, FuturesUnordered::new());
PendingFutureId(id)
}
pub fn release_pending_future_id(&self, id: PendingFutureId) {
let mut pending_futures = self.0.pending_futures.lock().unwrap();
if let Some(fs) = pending_futures.1.remove(&id.0) {
self.spawn(fs.for_each(|_| Ok(())));
}
}
pub fn add_pending_future<F>(&self, id: PendingFutureId, future: F)
where
F: Future<Item = (), Error = ()> + Send + 'static,
{
let mut pending_futures = self.0.pending_futures.lock().unwrap();
let fs = pending_futures.1.get_mut(&id.0).unwrap();
fs.push(Box::new(future))
}
pub fn drain_pending_futures(
&self,
id: PendingFutureId,
) -> FuturesUnordered<Box<Future<Item = (), Error = ()> + Send + 'static>> {
let mut pending_futures = self.0.pending_futures.lock().unwrap();
let fs = pending_futures.1.get_mut(&id.0).unwrap();
mem::replace(fs, FuturesUnordered::new())
}
}
#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
pub struct PendingFutureId(u64);

View file

@ -28,10 +28,14 @@ use std::sync::{Arc, Mutex};
use std::{u16, u32, u64};
use std::collections::VecDeque;
use futures;
use futures::future;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use futures::task;
use futures::sync::oneshot;
use tokio::executor;
use iocontext::*;
const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200;
@ -191,7 +195,6 @@ struct DataQueueInner {
max_size_time: Option<u64>,
current_task: Option<task::Task>,
current_task_in: Option<task::Task>,
shutdown_receiver: Option<oneshot::Receiver<()>>,
}
@ -219,7 +222,6 @@ impl DataQueue {
Some(settings.max_size_time)
},
current_task: None,
current_task_in: None,
shutdown_receiver: None,
})))
}
@ -355,7 +357,7 @@ impl DataQueue {
}
}
fn push(&self, item: DataQueueItem) -> bool {
fn push(&self, item: DataQueueItem) -> Result<(), DataQueueItem> {
let mut inner = self.0.lock().unwrap();
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Pushing item {:?}", item);
@ -367,16 +369,14 @@ impl DataQueue {
if let Some(max) = inner.max_size_buffers {
if max <= inner.cur_size_buffers + count {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (buffers): {} <= {}", max, inner.cur_size_buffers + count);
inner.current_task_in = Some(task::current());
return false;
return Err(item);
}
}
if let Some(max) = inner.max_size_bytes {
if max <= inner.cur_size_bytes + bytes {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (bytes): {} <= {}", max, inner.cur_size_bytes + bytes);
inner.current_task_in = Some(task::current());
return false;
return Err(item);
}
}
@ -389,8 +389,7 @@ impl DataQueue {
};
if max <= level {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (time): {} <= {}", max, level);
inner.current_task_in = Some(task::current());
return false;
return Err(item);
}
}
@ -402,7 +401,7 @@ impl DataQueue {
task.notify();
}
true
Ok(())
}
}
@ -443,10 +442,6 @@ impl Stream for DataQueue {
inner.cur_size_buffers -= count;
inner.cur_size_bytes -= bytes;
if let Some(task) = inner.current_task_in.take() {
task.notify();
}
Ok(Async::Ready(Some(item)))
}
}
@ -455,16 +450,26 @@ impl Stream for DataQueue {
struct State {
io_context: Option<IOContext>,
pending_future_id: Option<PendingFutureId>,
io_context_in: Option<IOContext>,
pending_future_id_in: Option<PendingFutureId>,
queue: Option<DataQueue>,
pending_queue: Option<(Option<task::Task>, VecDeque<DataQueueItem>)>,
last_ret: gst::FlowReturn,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>,
}
impl Default for State {
fn default() -> State {
State {
io_context: None,
pending_future_id: None,
io_context_in: None,
pending_future_id_in: None,
queue: None,
pending_queue: None,
last_ret: gst::FlowReturn::Ok,
pending_future_cancel: None,
}
}
}
@ -587,49 +592,183 @@ impl Queue {
element.catch_panic(fallback, |element| f(queue, element))
}
fn sink_chain(
&self,
pad: &gst::Pad,
_element: &Element,
buffer: gst::Buffer,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
fn create_io_context_event(state: &State) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
{
let s = gst::Structure::new(
"ts-io-context",
&[
("io-context", &glib::AnySendValue::new(io_context.clone())),
(
"pending-future-id",
&glib::AnySendValue::new(*pending_future_id),
),
],
);
Some(gst::Event::new_custom_downstream_sticky(s).build())
} else {
None
}
}
let state = self.state.lock().unwrap();
let queue = match state.queue {
fn enqueue_item(
&self,
_pad: &gst::Pad,
element: &Element,
item: DataQueueItem,
) -> gst::FlowReturn {
let wait_future = {
let mut state = self.state.lock().unwrap();
let State {
ref queue,
ref mut pending_queue,
ref io_context_in,
pending_future_id_in,
..
} = *state;
let queue = match *queue {
None => return gst::FlowReturn::Error,
Some(ref queue) => queue,
};
queue.push(DataQueueItem::Buffer(buffer));
let item = if pending_queue.is_none() {
queue.push(item)
} else {
Err(item)
};
if let Err(item) = item {
if pending_queue.is_none() {
*pending_queue = Some((None, VecDeque::new()));
pending_queue.as_mut().unwrap().1.push_back(item);
state.last_ret
gst_log!(
self.cat,
obj: element,
"Queue is full - Pushing first item on pending queue"
);
let element_clone = element.clone();
let future = future::poll_fn(move || {
let queue = element_clone.get_impl().downcast_ref::<Queue>().unwrap();
let mut state = queue.state.lock().unwrap();
let State {
queue: ref dq,
ref mut pending_queue,
..
} = *state;
gst_log!(
queue.cat,
obj: &element_clone,
"Trying to empty pending queue"
);
let res = if let Some((ref mut task, ref mut items)) = *pending_queue {
let mut failed_item = None;
for item in items.drain(..) {
if let Err(item) = dq.as_ref().unwrap().push(item) {
failed_item = Some(item);
break;
}
}
if let Some(item) = failed_item {
items.push_front(item);
*task = Some(task::current());
gst_log!(
queue.cat,
obj: &element_clone,
"Waiting for more queue space"
);
Ok(Async::NotReady)
} else {
gst_log!(
queue.cat,
obj: &element_clone,
"Pending queue is empty now"
);
Ok(Async::Ready(()))
}
} else {
gst_log!(
queue.cat,
obj: &element_clone,
"Flushing, dropping pending queue"
);
Ok(Async::Ready(()))
};
if res == Ok(Async::Ready(())) {
*pending_queue = None;
}
res
});
if let (Some(io_context_in), Some(pending_future_id_in)) =
(io_context_in.as_ref(), pending_future_id_in.as_ref())
{
io_context_in.add_pending_future(*pending_future_id_in, future);
None
} else {
Some(future)
}
} else {
assert!(io_context_in.is_some());
pending_queue.as_mut().unwrap().1.push_back(item);
None
}
} else {
None
}
};
if let Some(wait_future) = wait_future {
gst_log!(self.cat, obj: element, "Blocking until queue becomes empty");
match executor::current_thread::block_on_all(wait_future) {
Err(_) => {
gst_element_error!(
element,
gst::StreamError::Failed,
["failed to wait for queue to become empty again"]
);
return gst::FlowReturn::Error;
}
Ok(_) => (),
}
}
self.state.lock().unwrap().last_ret
}
fn sink_chain(
&self,
pad: &gst::Pad,
element: &Element,
buffer: gst::Buffer,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
self.enqueue_item(pad, element, DataQueueItem::Buffer(buffer))
}
fn sink_chain_list(
&self,
pad: &gst::Pad,
_element: &Element,
element: &Element,
list: gst::BufferList,
) -> gst::FlowReturn {
gst_log!(self.cat, obj: pad, "Handling buffer list {:?}", list);
let state = self.state.lock().unwrap();
let queue = match state.queue {
None => return gst::FlowReturn::Error,
Some(ref queue) => queue,
};
queue.push(DataQueueItem::BufferList(list));
state.last_ret
self.enqueue_item(pad, element, DataQueueItem::BufferList(list))
}
fn sink_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
fn sink_event(&self, pad: &gst::Pad, element: &Element, mut event: gst::Event) -> bool {
use gst::EventView;
gst_log!(self.cat, obj: pad, "Handling event {:?}", event);
let mut new_event = None;
match event.view() {
EventView::FlushStart(..) => {
let _ = self.stop(element);
@ -642,16 +781,43 @@ impl Queue {
let _ = self.start(element);
}
}
EventView::CustomDownstreamSticky(e) => {
let s = e.get_structure().unwrap();
if s.get_name() == "ts-io-context" {
let mut state = self.state.lock().unwrap();
let io_context = s.get::<&glib::AnySendValue>("io-context").unwrap();
let io_context = io_context.downcast_ref::<IOContext>().unwrap();
let pending_future_id =
s.get::<&glib::AnySendValue>("pending-future-id").unwrap();
let pending_future_id =
pending_future_id.downcast_ref::<PendingFutureId>().unwrap();
gst_debug!(
self.cat,
obj: element,
"Got upstream pending future id {:?}",
pending_future_id
);
state.io_context_in = Some(io_context.clone());
state.pending_future_id_in = Some(*pending_future_id);
new_event = Self::create_io_context_event(&state);
// Get rid of reconfigure flag
self.src_pad.check_reconfigure();
}
}
_ => (),
};
if let Some(new_event) = new_event {
event = new_event;
}
if event.is_serialized() {
gst_log!(self.cat, obj: pad, "Queuing event {:?}", event);
let state = self.state.lock().unwrap();
state
.queue
.as_ref()
.map(|q| q.push(DataQueueItem::Event(event)));
let _ = self.enqueue_item(pad, element, DataQueueItem::Event(event));
true
} else {
gst_log!(self.cat, obj: pad, "Forwarding event {:?}", event);
@ -728,7 +894,31 @@ impl Queue {
self.sink_pad.peer_query(query)
}
fn push_item(&self, element: &Element, item: DataQueueItem) -> Result<(), gst::FlowError> {
fn push_item(
&self,
element: &Element,
item: DataQueueItem,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
future::FutureResult<(), gst::FlowError>,
> {
let event = {
let state = self.state.lock().unwrap();
if let Some((Some(ref task), _)) = state.pending_queue {
task.notify();
}
if self.src_pad.check_reconfigure() {
Self::create_io_context_event(&state)
} else {
None
}
};
if let Some(event) = event {
self.src_pad.push_event(event);
}
let res = match item {
DataQueueItem::Buffer(buffer) => {
gst_log!(self.cat, obj: element, "Forwarding buffer {:?}", buffer);
@ -745,7 +935,7 @@ impl Queue {
}
};
match res {
let res = match res {
Ok(_) => {
gst_log!(self.cat, obj: element, "Successfully pushed item");
let mut state = self.state.lock().unwrap();
@ -782,6 +972,51 @@ impl Queue {
state.last_ret = gst::FlowReturn::from_error(err);
Err(gst::FlowError::CustomError)
}
};
match res {
Ok(()) => {
let mut state = self.state.lock().unwrap();
let State {
ref pending_future_id,
ref io_context,
ref mut pending_future_cancel,
..
} = *state;
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(pending_future_id, io_context)
{
let pending_futures = io_context.drain_pending_futures(*pending_future_id);
if !pending_futures.is_empty() {
gst_log!(
self.cat,
obj: element,
"Scheduling {} pending futures",
pending_futures.len()
);
let (sender, receiver) = futures::sync::oneshot::channel();
*pending_future_cancel = Some(sender);
let future = pending_futures
.for_each(|_| Ok(()))
.select(receiver.then(|_| Ok(())))
.then(|_| Ok(()));
future::Either::A(Box::new(future))
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
}
Err(err) => future::Either::B(Err(err).into_future()),
}
}
@ -837,8 +1072,17 @@ impl Queue {
)
})?;;
let pending_future_id = io_context.acquire_pending_future_id();
gst_debug!(
self.cat,
obj: element,
"Got pending future id {:?}",
pending_future_id
);
state.io_context = Some(io_context);
state.queue = Some(dataqueue);
state.pending_future_id = Some(pending_future_id);
gst_debug!(self.cat, obj: element, "Prepared");
@ -854,6 +1098,12 @@ impl Queue {
queue.shutdown();
}
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
{
io_context.release_pending_future_id(*pending_future_id);
}
*state = State::default();
gst_debug!(self.cat, obj: element, "Unprepared");
@ -882,6 +1132,10 @@ impl Queue {
queue.pause();
queue.clear(&self.src_pad);
}
if let Some((Some(task), _)) = state.pending_queue.take() {
task.notify();
}
let _ = state.pending_future_cancel.take();
state.last_ret = gst::FlowReturn::Flushing;
gst_debug!(self.cat, obj: element, "Stopped");

View file

@ -27,6 +27,9 @@ use gst_plugin::element::*;
use std::sync::Mutex;
use std::u16;
use futures;
use futures::{Future, IntoFuture, Stream};
use futures::future;
use tokio::net;
use either::Either;
@ -127,18 +130,22 @@ static PROPERTIES: [Property; 7] = [
struct State {
io_context: Option<IOContext>,
pending_future_id: Option<PendingFutureId>,
socket: Option<Socket>,
need_initial_events: bool,
configured_caps: Option<gst::Caps>,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>,
}
impl Default for State {
fn default() -> State {
State {
io_context: None,
pending_future_id: None,
socket: None,
need_initial_events: true,
configured_caps: None,
pending_future_cancel: None,
}
}
}
@ -291,7 +298,34 @@ impl UdpSrc {
ret
}
fn push_buffer(&self, element: &Element, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
fn create_io_context_event(state: &State) -> Option<gst::Event> {
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
{
let s = gst::Structure::new(
"ts-io-context",
&[
("io-context", &glib::AnySendValue::new(io_context.clone())),
(
"pending-future-id",
&glib::AnySendValue::new(*pending_future_id),
),
],
);
Some(gst::Event::new_custom_downstream_sticky(s).build())
} else {
None
}
}
fn push_buffer(
&self,
element: &Element,
buffer: gst::Buffer,
) -> future::Either<
Box<Future<Item = (), Error = gst::FlowError> + Send + 'static>,
future::FutureResult<(), gst::FlowError>,
> {
let mut events = Vec::new();
let mut state = self.state.lock().unwrap();
if state.need_initial_events {
@ -306,7 +340,18 @@ impl UdpSrc {
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
);
if let Some(event) = Self::create_io_context_event(&state) {
events.push(event);
// Get rid of reconfigure flag
self.src_pad.check_reconfigure();
}
state.need_initial_events = false;
} else if self.src_pad.check_reconfigure() {
if let Some(event) = Self::create_io_context_event(&state) {
events.push(event);
}
}
drop(state);
@ -314,7 +359,7 @@ impl UdpSrc {
self.src_pad.push_event(event);
}
match self.src_pad.push(buffer).into_result() {
let res = match self.src_pad.push(buffer).into_result() {
Ok(_) => {
gst_log!(self.cat, obj: element, "Successfully pushed buffer");
Ok(())
@ -345,6 +390,51 @@ impl UdpSrc {
);
Err(gst::FlowError::CustomError)
}
};
match res {
Ok(()) => {
let mut state = self.state.lock().unwrap();
let State {
ref pending_future_id,
ref io_context,
ref mut pending_future_cancel,
..
} = *state;
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(pending_future_id, io_context)
{
let pending_futures = io_context.drain_pending_futures(*pending_future_id);
if !pending_futures.is_empty() {
gst_log!(
self.cat,
obj: element,
"Scheduling {} pending futures",
pending_futures.len()
);
let (sender, receiver) = futures::sync::oneshot::channel();
*pending_future_cancel = Some(sender);
let future = pending_futures
.for_each(|_| Ok(()))
.select(receiver.then(|_| Ok(())))
.then(|_| Ok(()));
future::Either::A(Box::new(future))
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
}
Err(err) => future::Either::B(Err(err).into_future()),
}
}
@ -497,8 +587,17 @@ impl UdpSrc {
gst_error_msg!(gst::ResourceError::OpenRead, ["Failed to schedule socket"])
})?;
let pending_future_id = io_context.acquire_pending_future_id();
gst_debug!(
self.cat,
obj: element,
"Got pending future id {:?}",
pending_future_id
);
state.socket = Some(socket);
state.io_context = Some(io_context);
state.pending_future_id = Some(pending_future_id);
gst_debug!(self.cat, obj: element, "Prepared");
@ -514,6 +613,12 @@ impl UdpSrc {
socket.shutdown();
}
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
{
io_context.release_pending_future_id(*pending_future_id);
}
*state = State::default();
gst_debug!(self.cat, obj: element, "Unprepared");
@ -535,11 +640,12 @@ impl UdpSrc {
fn stop(&self, element: &Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Stopping");
let state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket {
socket.pause();
}
let _ = state.pending_future_cancel.take();
gst_debug!(self.cat, obj: element, "Stopped");