threadshare: Add code for canceling pending futures

This commit is contained in:
Sebastian Dröge 2018-04-04 19:24:44 +03:00
parent 88933790e7
commit 96bc778f72

View file

@ -28,7 +28,7 @@ use std::sync::Mutex;
use std::{u16, u32};
use futures::future;
use futures::sync::mpsc;
use futures::sync::{mpsc, oneshot};
use futures::{Future, IntoFuture, Stream};
use either::Either;
@ -119,6 +119,7 @@ struct State {
io_context: Option<IOContext>,
pending_future_id: Option<PendingFutureId>,
channel: Option<mpsc::Sender<Either<gst::Buffer, gst::Event>>>,
pending_future_cancel: Option<oneshot::Sender<()>>,
need_initial_events: bool,
configured_caps: Option<gst::Caps>,
}
@ -129,6 +130,7 @@ impl Default for State {
io_context: None,
pending_future_id: None,
channel: None,
pending_future_cancel: None,
need_initial_events: true,
configured_caps: None,
}
@ -454,18 +456,18 @@ impl AppSrc {
match res {
Ok(()) => {
let state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap();
let State {
ref pending_future_id,
ref io_context,
ref mut pending_future_cancel,
..
} = *state;
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(pending_future_id, io_context)
{
// FIXME: This should all go into a helper function
let pending_futures = io_context.drain_pending_futures(*pending_future_id);
if !pending_futures.is_empty() {
@ -476,13 +478,21 @@ impl AppSrc {
pending_futures.len()
);
let future = pending_futures.for_each(|_| Ok(()));
let (sender, receiver) = oneshot::channel();
*pending_future_cancel = Some(sender);
let future = pending_futures
.for_each(|_| Ok(()))
.select(receiver.then(|_| Ok(())))
.then(|_| Ok(()));
future::Either::A(Box::new(future))
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
}
@ -575,6 +585,7 @@ impl AppSrc {
let mut state = self.state.lock().unwrap();
let _ = state.channel.take();
let _ = state.pending_future_cancel.take();
gst_debug!(self.cat, obj: element, "Stopped");