Port BusStream to futures 0.2

This commit is contained in:
Sebastian Dröge 2018-04-23 17:55:31 +03:00
parent 5dd17d6248
commit a276c226fd
3 changed files with 20 additions and 19 deletions

View file

@ -22,7 +22,7 @@ gstreamer-sys = { git = "https://github.com/sdroege/gstreamer-sys", features = [
glib = { git = "https://github.com/gtk-rs/glib" } glib = { git = "https://github.com/gtk-rs/glib" }
num-rational = { version = "0.1.38", default-features = false, features = [] } num-rational = { version = "0.1.38", default-features = false, features = [] }
lazy_static = "1.0" lazy_static = "1.0"
futures = { version = "0.1", optional = true } futures-core = { version = "0.2", optional = true }
muldiv = "0.1.1" muldiv = "0.1.1"
[build-dependencies.rustdoc-stripper] [build-dependencies.rustdoc-stripper]
@ -36,6 +36,7 @@ v1_14 = ["gstreamer-sys/v1_14", "v1_12"]
embed-lgpl-docs = ["rustdoc-stripper"] embed-lgpl-docs = ["rustdoc-stripper"]
purge-lgpl-docs = ["rustdoc-stripper"] purge-lgpl-docs = ["rustdoc-stripper"]
dox = ["gstreamer-sys/dox", "glib/dox", "futures"] dox = ["gstreamer-sys/dox", "glib/dox", "futures"]
futures = ["futures-core"]
default-features = [] default-features = []
[badges] [badges]

View file

@ -143,32 +143,31 @@ impl Bus {
#[cfg(any(feature = "futures", feature = "dox"))] #[cfg(any(feature = "futures", feature = "dox"))]
mod futures { mod futures {
use super::*; use super::*;
use futures; use futures_core::stream::Stream;
use futures::stream::Stream; use futures_core::task::{Context, Waker};
use futures::task::Task; use futures_core::{Async, Poll};
use futures::{Async, Poll};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
pub struct BusStream(Bus, Arc<Mutex<Option<Task>>>); pub struct BusStream(Bus, Arc<Mutex<Option<Waker>>>);
impl BusStream { impl BusStream {
pub fn new(bus: &Bus) -> Self { pub fn new(bus: &Bus) -> Self {
skip_assert_initialized!(); skip_assert_initialized!();
let task = Arc::new(Mutex::new(None)); let waker = Arc::new(Mutex::new(None));
let task_clone = Arc::clone(&task); let waker_clone = Arc::clone(&waker);
bus.set_sync_handler(move |_, _| { bus.set_sync_handler(move |_, _| {
let mut task = task_clone.lock().unwrap(); let mut waker = waker_clone.lock().unwrap();
if let Some(task) = task.take() { if let Some(waker) = waker.take() {
// FIXME: Force type... // FIXME: Force type...
let task: Task = task; let waker: Waker = waker;
task.notify(); waker.wake();
} }
BusSyncReply::Pass BusSyncReply::Pass
}); });
BusStream(bus.clone(), task) BusStream(bus.clone(), waker)
} }
} }
@ -182,15 +181,16 @@ mod futures {
type Item = Message; type Item = Message;
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(&mut self, ctx: &mut Context) -> Poll<Option<Self::Item>, Self::Error> {
let mut task = self.1.lock().unwrap(); let BusStream(ref bus, ref waker) = *self;
let msg = self.0.pop(); let msg = bus.pop();
if let Some(msg) = msg { if let Some(msg) = msg {
Ok(Async::Ready(Some(msg))) Ok(Async::Ready(Some(msg)))
} else { } else {
*task = Some(futures::task::current()); let mut waker = waker.lock().unwrap();
Ok(Async::NotReady) *waker = Some(ctx.waker().clone());
Ok(Async::Pending)
} }
} }
} }

View file

@ -26,7 +26,7 @@ extern crate glib;
extern crate num_rational; extern crate num_rational;
#[cfg(any(feature = "futures", feature = "dox"))] #[cfg(any(feature = "futures", feature = "dox"))]
extern crate futures; extern crate futures_core;
extern crate muldiv; extern crate muldiv;