From b17f04e86604ccbb6a33fb4dedab1b0832d4678a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 22 Jan 2020 09:23:10 +0200 Subject: [PATCH] bus: Make bus Stream private and add functions on the bus directly for it Also add a helper function that allows filtering the stream directly. --- examples/src/bin/futures.rs | 3 +-- examples/src/bin/glib-futures.rs | 3 +-- gstreamer/src/bus.rs | 25 ++++++++++++++++++++----- gstreamer/src/lib.rs | 1 - 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/examples/src/bin/futures.rs b/examples/src/bin/futures.rs index 62dadac65..abeee96ad 100644 --- a/examples/src/bin/futures.rs +++ b/examples/src/bin/futures.rs @@ -16,8 +16,7 @@ use std::env; mod examples_common; async fn message_loop(bus: gst::Bus) { - // BusStream implements the Stream trait - let mut messages = gst::BusStream::new(&bus); + let mut messages = bus.stream(); while let Some(msg) = messages.next().await { use gst::MessageView; diff --git a/examples/src/bin/glib-futures.rs b/examples/src/bin/glib-futures.rs index 493d691b0..976e71884 100644 --- a/examples/src/bin/glib-futures.rs +++ b/examples/src/bin/glib-futures.rs @@ -12,8 +12,7 @@ use std::env; mod examples_common; async fn message_handler(loop_: glib::MainLoop, bus: gst::Bus) { - // BusStream implements the Stream trait - let mut messages = gst::BusStream::new(&bus); + let mut messages = bus.stream(); while let Some(msg) = messages.next().await { use gst::MessageView; diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 14bd54db7..677667789 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -217,6 +217,23 @@ impl Bus { } } } + + pub fn stream(&self) -> impl Stream + Unpin + Send + 'static { + BusStream::new(self) + } + + pub fn stream_filtered<'a>( + &self, + msg_types: &'a [MessageType], + ) -> impl Stream + Unpin + Send + 'a { + use futures_util::future; + use futures_util::StreamExt; + + BusStream::new(self).filter(move |msg| { + let type_ = msg.get_type(); + future::ready(msg_types.contains(&type_)) + }) + } } #[derive(Debug)] @@ -234,19 +251,17 @@ impl<'a> Iterator for Iter<'a> { } #[derive(Debug)] -pub struct BusStream(Bus, Arc>>); +struct BusStream(Bus, Arc>>); impl BusStream { - pub fn new(bus: &Bus) -> Self { + fn new(bus: &Bus) -> Self { skip_assert_initialized!(); - let waker = Arc::new(Mutex::new(None)); + let waker = Arc::new(Mutex::new(None::)); let waker_clone = Arc::clone(&waker); bus.set_sync_handler(move |_, _| { let mut waker = waker_clone.lock().unwrap(); if let Some(waker) = waker.take() { - // FIXME: Force type... - let waker: Waker = waker; waker.wake(); } diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index 605eb20a3..74cb3d7b7 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -225,7 +225,6 @@ cfg_if! { } pub use self::iterator::{Iterator, IteratorError, IteratorImpl, StdIterator}; -pub use bus::BusStream; pub use child_proxy::ChildProxyExtManual; pub use clock_time::ClockTime; pub use device_monitor::{DeviceMonitorExtManual, DeviceMonitorFilterId};