threadshare: Refactor pending future draining to get rid of some duplicated code

This commit is contained in:
Sebastian Dröge 2018-04-05 12:49:12 +03:00
parent a9d979a988
commit 099093e9be
5 changed files with 70 additions and 141 deletions

View file

@ -29,7 +29,7 @@ use std::{u16, u32};
use futures::future;
use futures::sync::{mpsc, oneshot};
use futures::{Future, IntoFuture, Stream};
use futures::{Future, Stream};
use either::Either;
@ -443,45 +443,22 @@ impl AppSrc {
Ok(()) => {
let mut state = self.state.lock().unwrap();
let State {
ref pending_future_id,
ref io_context,
if let State {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
ref mut pending_future_cancel,
..
} = *state;
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(pending_future_id, io_context)
} = *state
{
let pending_futures = io_context.drain_pending_futures(*pending_future_id);
let (cancel, future) = io_context.drain_pending_futures(*pending_future_id);
*pending_future_cancel = cancel;
if !pending_futures.is_empty() {
gst_log!(
self.cat,
obj: element,
"Scheduling {} pending futures",
pending_futures.len()
);
let (sender, receiver) = oneshot::channel();
*pending_future_cancel = Some(sender);
let future = pending_futures
.for_each(|_| Ok(()))
.select(receiver.then(|_| Ok(())))
.then(|_| Ok(()));
future::Either::A(Box::new(future))
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
future
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
future::Either::B(future::ok(()))
}
}
Err(_) => future::Either::B(Err(()).into_future()),
Err(_) => future::Either::B(future::err(())),
}
}

View file

@ -22,7 +22,9 @@ use std::sync::atomic;
use std::sync::{Arc, Mutex, Weak};
use std::thread;
use futures::future;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::sync::oneshot;
use futures::{Future, Stream};
use tokio::executor::thread_pool;
use tokio::reactor;
@ -355,16 +357,40 @@ impl IOContext {
fs.push(Box::new(future))
}
pub fn drain_pending_futures(
pub fn drain_pending_futures<E: Send + 'static>(
&self,
id: PendingFutureId,
) -> FuturesUnordered<Box<Future<Item = (), Error = ()> + Send + 'static>> {
) -> (Option<oneshot::Sender<()>>, PendingFuturesFuture<E>) {
let mut pending_futures = self.0.pending_futures.lock().unwrap();
let fs = pending_futures.1.get_mut(&id.0).unwrap();
mem::replace(fs, FuturesUnordered::new())
let pending_futures = mem::replace(fs, FuturesUnordered::new());
if !pending_futures.is_empty() {
gst_log!(
CONTEXT_CAT,
"Scheduling {} pending futures for context '{}' with pending future id {:?}",
pending_futures.len(),
self.0.name,
id,
);
let (sender, receiver) = oneshot::channel();
let future = pending_futures
.for_each(|_| Ok(()))
.select(receiver.then(|_| Ok(())))
.then(|_| Ok(()));
(Some(sender), future::Either::A(Box::new(future)))
} else {
(None, future::Either::B(future::ok(())))
}
}
}
pub type PendingFuturesFuture<E> =
future::Either<Box<Future<Item = (), Error = E> + Send + 'static>, future::FutureResult<(), E>>;
#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
pub struct PendingFutureId(u64);

View file

@ -32,7 +32,7 @@ use std::{u16, u32, u64};
use futures;
use futures::future;
use futures::task;
use futures::{Async, Future, IntoFuture, Stream};
use futures::{Async, Future};
use tokio::executor;
@ -1004,48 +1004,22 @@ impl ProxySrc {
Ok(()) => {
let state = self.state.lock().unwrap();
let StateSrc {
ref pending_future_id,
ref io_context,
ref queue,
if let StateSrc {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
queue: Some(ref queue),
..
} = *state;
let mut queue = queue.as_ref().unwrap().0.lock().unwrap();
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(pending_future_id, io_context)
} = *state
{
// FIXME: This should all go into a helper function
let pending_futures = io_context.drain_pending_futures(*pending_future_id);
let (cancel, future) = io_context.drain_pending_futures(*pending_future_id);
queue.0.lock().unwrap().pending_future_cancel = cancel;
if !pending_futures.is_empty() {
gst_log!(
self.cat,
obj: element,
"Scheduling {} pending futures",
pending_futures.len()
);
let (sender, receiver) = futures::sync::oneshot::channel();
queue.pending_future_cancel = Some(sender);
let future = pending_futures
.for_each(|_| Ok(()))
.select(receiver.then(|_| Ok(())))
.then(|_| Ok(()));
future::Either::A(Box::new(future))
} else {
queue.pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
future
} else {
queue.pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
future::Either::B(future::ok(()))
}
}
Err(err) => future::Either::B(Err(err).into_future()),
Err(err) => future::Either::B(future::err(err)),
}
}

View file

@ -31,7 +31,7 @@ use std::{u16, u32, u64};
use futures;
use futures::future;
use futures::task;
use futures::{Async, Future, IntoFuture, Stream};
use futures::{Async, Future};
use tokio::executor;
@ -637,46 +637,22 @@ impl Queue {
Ok(()) => {
let mut state = self.state.lock().unwrap();
let State {
ref pending_future_id,
ref io_context,
if let State {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
ref mut pending_future_cancel,
..
} = *state;
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(pending_future_id, io_context)
} = *state
{
// FIXME: This should all go into a helper function
let pending_futures = io_context.drain_pending_futures(*pending_future_id);
let (cancel, future) = io_context.drain_pending_futures(*pending_future_id);
*pending_future_cancel = cancel;
if !pending_futures.is_empty() {
gst_log!(
self.cat,
obj: element,
"Scheduling {} pending futures",
pending_futures.len()
);
let (sender, receiver) = futures::sync::oneshot::channel();
*pending_future_cancel = Some(sender);
let future = pending_futures
.for_each(|_| Ok(()))
.select(receiver.then(|_| Ok(())))
.then(|_| Ok(()));
future::Either::A(Box::new(future))
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
future
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
future::Either::B(future::ok(()))
}
}
Err(err) => future::Either::B(Err(err).into_future()),
Err(err) => future::Either::B(future::err(err)),
}
}

View file

@ -28,8 +28,8 @@ use std::sync::Mutex;
use std::u16;
use futures;
use futures::Future;
use futures::future;
use futures::{Future, IntoFuture, Stream};
use tokio::net;
use either::Either;
@ -381,46 +381,22 @@ impl UdpSrc {
Ok(()) => {
let mut state = self.state.lock().unwrap();
let State {
ref pending_future_id,
ref io_context,
if let State {
io_context: Some(ref io_context),
pending_future_id: Some(ref pending_future_id),
ref mut pending_future_cancel,
..
} = *state;
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(pending_future_id, io_context)
} = *state
{
// FIXME: This should all go into a helper function
let pending_futures = io_context.drain_pending_futures(*pending_future_id);
let (cancel, future) = io_context.drain_pending_futures(*pending_future_id);
*pending_future_cancel = cancel;
if !pending_futures.is_empty() {
gst_log!(
self.cat,
obj: element,
"Scheduling {} pending futures",
pending_futures.len()
);
let (sender, receiver) = futures::sync::oneshot::channel();
*pending_future_cancel = Some(sender);
let future = pending_futures
.for_each(|_| Ok(()))
.select(receiver.then(|_| Ok(())))
.then(|_| Ok(()));
future::Either::A(Box::new(future))
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
}
future
} else {
*pending_future_cancel = None;
future::Either::B(Ok(()).into_future())
future::Either::B(future::ok(()))
}
}
Err(err) => future::Either::B(Err(err).into_future()),
Err(err) => future::Either::B(future::err(err)),
}
}