From a276c226fd34e43b0c7da71745421045477d7cb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 23 Apr 2018 17:55:31 +0300 Subject: [PATCH] Port BusStream to futures 0.2 --- gstreamer/Cargo.toml | 3 ++- gstreamer/src/bus.rs | 34 +++++++++++++++++----------------- gstreamer/src/lib.rs | 2 +- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/gstreamer/Cargo.toml b/gstreamer/Cargo.toml index 129e78970..118e4005a 100644 --- a/gstreamer/Cargo.toml +++ b/gstreamer/Cargo.toml @@ -22,7 +22,7 @@ gstreamer-sys = { git = "https://github.com/sdroege/gstreamer-sys", features = [ glib = { git = "https://github.com/gtk-rs/glib" } num-rational = { version = "0.1.38", default-features = false, features = [] } lazy_static = "1.0" -futures = { version = "0.1", optional = true } +futures-core = { version = "0.2", optional = true } muldiv = "0.1.1" [build-dependencies.rustdoc-stripper] @@ -36,6 +36,7 @@ v1_14 = ["gstreamer-sys/v1_14", "v1_12"] embed-lgpl-docs = ["rustdoc-stripper"] purge-lgpl-docs = ["rustdoc-stripper"] dox = ["gstreamer-sys/dox", "glib/dox", "futures"] +futures = ["futures-core"] default-features = [] [badges] diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 66e2c5bc7..07cab600e 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -143,32 +143,31 @@ impl Bus { #[cfg(any(feature = "futures", feature = "dox"))] mod futures { use super::*; - use futures; - use futures::stream::Stream; - use futures::task::Task; - use futures::{Async, Poll}; + use futures_core::stream::Stream; + use futures_core::task::{Context, Waker}; + use futures_core::{Async, Poll}; use std::sync::{Arc, Mutex}; - pub struct BusStream(Bus, Arc>>); + pub struct BusStream(Bus, Arc>>); impl BusStream { pub fn new(bus: &Bus) -> Self { skip_assert_initialized!(); - let task = Arc::new(Mutex::new(None)); - let task_clone = Arc::clone(&task); + let waker = Arc::new(Mutex::new(None)); + let waker_clone = Arc::clone(&waker); bus.set_sync_handler(move |_, _| { - let mut task = task_clone.lock().unwrap(); - if let Some(task) = task.take() { + let mut waker = waker_clone.lock().unwrap(); + if let Some(waker) = waker.take() { // FIXME: Force type... - let task: Task = task; - task.notify(); + let waker: Waker = waker; + waker.wake(); } BusSyncReply::Pass }); - BusStream(bus.clone(), task) + BusStream(bus.clone(), waker) } } @@ -182,15 +181,16 @@ mod futures { type Item = Message; type Error = (); - fn poll(&mut self) -> Poll, Self::Error> { - let mut task = self.1.lock().unwrap(); + fn poll_next(&mut self, ctx: &mut Context) -> Poll, Self::Error> { + let BusStream(ref bus, ref waker) = *self; - let msg = self.0.pop(); + let msg = bus.pop(); if let Some(msg) = msg { Ok(Async::Ready(Some(msg))) } else { - *task = Some(futures::task::current()); - Ok(Async::NotReady) + let mut waker = waker.lock().unwrap(); + *waker = Some(ctx.waker().clone()); + Ok(Async::Pending) } } } diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index e09ed5afe..7b18c5570 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -26,7 +26,7 @@ extern crate glib; extern crate num_rational; #[cfg(any(feature = "futures", feature = "dox"))] -extern crate futures; +extern crate futures_core; extern crate muldiv;