forked from mirrors/gstreamer-rs
gstreamer: Implement FusedStream
for the Bus
stream
This commit is contained in:
parent
e04bb8b524
commit
2b7a63fc8e
1 changed files with 8 additions and 2 deletions
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
use futures_channel::mpsc::{self, UnboundedReceiver};
|
use futures_channel::mpsc::{self, UnboundedReceiver};
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::StreamExt;
|
use futures_util::{stream::FusedStream, StreamExt};
|
||||||
use glib::ffi::{gboolean, gpointer};
|
use glib::ffi::{gboolean, gpointer};
|
||||||
use glib::prelude::*;
|
use glib::prelude::*;
|
||||||
use glib::source::{Continue, Priority, SourceId};
|
use glib::source::{Continue, Priority, SourceId};
|
||||||
|
@ -295,7 +295,7 @@ impl Bus {
|
||||||
pub fn stream_filtered<'a>(
|
pub fn stream_filtered<'a>(
|
||||||
&self,
|
&self,
|
||||||
message_types: &'a [MessageType],
|
message_types: &'a [MessageType],
|
||||||
) -> impl Stream<Item = Message> + Unpin + Send + 'a {
|
) -> impl Stream<Item = Message> + Unpin + FusedStream + Send + 'a {
|
||||||
self.stream().filter(move |message| {
|
self.stream().filter(move |message| {
|
||||||
let message_type = message.type_();
|
let message_type = message.type_();
|
||||||
|
|
||||||
|
@ -359,6 +359,12 @@ impl Stream for BusStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FusedStream for BusStream {
|
||||||
|
fn is_terminated(&self) -> bool {
|
||||||
|
self.receiver.is_terminated()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
Loading…
Reference in a new issue