From 2b7a63fc8e908839beacece50eeb44cfcb02882c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Sat, 30 Apr 2022 21:10:59 +0300 Subject: [PATCH] gstreamer: Implement `FusedStream` for the `Bus` stream --- gstreamer/src/bus.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 9e24e58bf..c4b1033f0 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -2,7 +2,7 @@ use futures_channel::mpsc::{self, UnboundedReceiver}; use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{stream::FusedStream, StreamExt}; use glib::ffi::{gboolean, gpointer}; use glib::prelude::*; use glib::source::{Continue, Priority, SourceId}; @@ -295,7 +295,7 @@ impl Bus { pub fn stream_filtered<'a>( &self, message_types: &'a [MessageType], - ) -> impl Stream + Unpin + Send + 'a { + ) -> impl Stream + Unpin + FusedStream + Send + 'a { self.stream().filter(move |message| { let message_type = message.type_(); @@ -359,6 +359,12 @@ impl Stream for BusStream { } } +impl FusedStream for BusStream { + fn is_terminated(&self) -> bool { + self.receiver.is_terminated() + } +} + #[cfg(test)] mod tests { use super::*;