diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 677667789..31d014421 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -6,8 +6,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use futures_channel::mpsc; use futures_core::stream::Stream; -use futures_core::task::{Context, Poll, Waker}; +use futures_core::task::{Context, Poll}; use glib; use glib::source::{Continue, Priority, SourceId}; use glib::translate::*; @@ -18,7 +19,6 @@ use std::cell::RefCell; use std::mem::transmute; use std::pin::Pin; use std::ptr; -use std::sync::{Arc, Mutex}; use Bus; use BusSyncReply; @@ -251,24 +251,20 @@ impl<'a> Iterator for Iter<'a> { } #[derive(Debug)] -struct BusStream(Bus, Arc>>); +struct BusStream(Bus, mpsc::UnboundedReceiver); impl BusStream { fn new(bus: &Bus) -> Self { skip_assert_initialized!(); - let waker = Arc::new(Mutex::new(None::)); - let waker_clone = Arc::clone(&waker); - bus.set_sync_handler(move |_, _| { - let mut waker = waker_clone.lock().unwrap(); - if let Some(waker) = waker.take() { - waker.wake(); - } + let (sender, receiver) = mpsc::unbounded(); - BusSyncReply::Pass + bus.set_sync_handler(move |_, msg| { + let _ = sender.unbounded_send(msg.to_owned()); + BusSyncReply::Drop }); - BusStream(bus.clone(), waker) + BusStream(bus.clone(), receiver) } } @@ -281,17 +277,8 @@ impl Drop for BusStream { impl Stream for BusStream { type Item = Message; - fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { - let BusStream(ref bus, ref waker) = *self; - - let mut waker = waker.lock().unwrap(); - let msg = bus.pop(); - if let Some(msg) = msg { - Poll::Ready(Some(msg)) - } else { - *waker = Some(ctx.waker().clone()); - Poll::Pending - } + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + Pin::new(&mut self.1).poll_next(ctx) } }