gstreamer: Implement FusedStream for the Bus stream

This commit is contained in:
Sebastian Dröge 2022-04-30 21:10:59 +03:00
parent 9ee22baa07
commit 08953dfe66

View file

@ -2,7 +2,7 @@
use futures_channel::mpsc::{self, UnboundedReceiver}; use futures_channel::mpsc::{self, UnboundedReceiver};
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::{stream::FusedStream, StreamExt};
use glib::ffi::{gboolean, gpointer}; use glib::ffi::{gboolean, gpointer};
use glib::prelude::*; use glib::prelude::*;
use glib::source::{Continue, Priority, SourceId}; use glib::source::{Continue, Priority, SourceId};
@ -295,7 +295,7 @@ impl Bus {
pub fn stream_filtered<'a>( pub fn stream_filtered<'a>(
&self, &self,
message_types: &'a [MessageType], message_types: &'a [MessageType],
) -> impl Stream<Item = Message> + Unpin + Send + 'a { ) -> impl Stream<Item = Message> + Unpin + FusedStream + Send + 'a {
self.stream().filter(move |message| { self.stream().filter(move |message| {
let message_type = message.type_(); let message_type = message.type_();
@ -359,6 +359,12 @@ impl Stream for BusStream {
} }
} }
impl FusedStream for BusStream {
fn is_terminated(&self) -> bool {
self.receiver.is_terminated()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;