bus: Make bus Stream private and add functions on the bus directly for it

Also add a helper function that allows filtering the stream directly.
This commit is contained in:
Sebastian Dröge 2020-01-22 09:23:10 +02:00
parent 217bbc3e94
commit b17f04e866
4 changed files with 22 additions and 10 deletions

View file

@ -16,8 +16,7 @@ use std::env;
mod examples_common; mod examples_common;
async fn message_loop(bus: gst::Bus) { async fn message_loop(bus: gst::Bus) {
// BusStream implements the Stream trait let mut messages = bus.stream();
let mut messages = gst::BusStream::new(&bus);
while let Some(msg) = messages.next().await { while let Some(msg) = messages.next().await {
use gst::MessageView; use gst::MessageView;

View file

@ -12,8 +12,7 @@ use std::env;
mod examples_common; mod examples_common;
async fn message_handler(loop_: glib::MainLoop, bus: gst::Bus) { async fn message_handler(loop_: glib::MainLoop, bus: gst::Bus) {
// BusStream implements the Stream trait let mut messages = bus.stream();
let mut messages = gst::BusStream::new(&bus);
while let Some(msg) = messages.next().await { while let Some(msg) = messages.next().await {
use gst::MessageView; use gst::MessageView;

View file

@ -217,6 +217,23 @@ impl Bus {
} }
} }
} }
pub fn stream(&self) -> impl Stream<Item = Message> + Unpin + Send + 'static {
BusStream::new(self)
}
pub fn stream_filtered<'a>(
&self,
msg_types: &'a [MessageType],
) -> impl Stream<Item = Message> + Unpin + Send + 'a {
use futures_util::future;
use futures_util::StreamExt;
BusStream::new(self).filter(move |msg| {
let type_ = msg.get_type();
future::ready(msg_types.contains(&type_))
})
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -234,19 +251,17 @@ impl<'a> Iterator for Iter<'a> {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct BusStream(Bus, Arc<Mutex<Option<Waker>>>); struct BusStream(Bus, Arc<Mutex<Option<Waker>>>);
impl BusStream { impl BusStream {
pub fn new(bus: &Bus) -> Self { fn new(bus: &Bus) -> Self {
skip_assert_initialized!(); skip_assert_initialized!();
let waker = Arc::new(Mutex::new(None)); let waker = Arc::new(Mutex::new(None::<Waker>));
let waker_clone = Arc::clone(&waker); let waker_clone = Arc::clone(&waker);
bus.set_sync_handler(move |_, _| { bus.set_sync_handler(move |_, _| {
let mut waker = waker_clone.lock().unwrap(); let mut waker = waker_clone.lock().unwrap();
if let Some(waker) = waker.take() { if let Some(waker) = waker.take() {
// FIXME: Force type...
let waker: Waker = waker;
waker.wake(); waker.wake();
} }

View file

@ -225,7 +225,6 @@ cfg_if! {
} }
pub use self::iterator::{Iterator, IteratorError, IteratorImpl, StdIterator}; pub use self::iterator::{Iterator, IteratorError, IteratorImpl, StdIterator};
pub use bus::BusStream;
pub use child_proxy::ChildProxyExtManual; pub use child_proxy::ChildProxyExtManual;
pub use clock_time::ClockTime; pub use clock_time::ClockTime;
pub use device_monitor::{DeviceMonitorExtManual, DeviceMonitorFilterId}; pub use device_monitor::{DeviceMonitorExtManual, DeviceMonitorFilterId};