diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 9e24e58bf..c4b1033f0 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -2,7 +2,7 @@ use futures_channel::mpsc::{self, UnboundedReceiver}; use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{stream::FusedStream, StreamExt}; use glib::ffi::{gboolean, gpointer}; use glib::prelude::*; use glib::source::{Continue, Priority, SourceId}; @@ -295,7 +295,7 @@ impl Bus { pub fn stream_filtered<'a>( &self, message_types: &'a [MessageType], - ) -> impl Stream + Unpin + Send + 'a { + ) -> impl Stream + Unpin + FusedStream + Send + 'a { self.stream().filter(move |message| { let message_type = message.type_(); @@ -359,6 +359,12 @@ impl Stream for BusStream { } } +impl FusedStream for BusStream { + fn is_terminated(&self) -> bool { + self.receiver.is_terminated() + } +} + #[cfg(test)] mod tests { use super::*;