forked from mirrors/gstreamer-rs
bus: Don't use the bus sync handler in the bus Stream to notify about messages being available
This is racy and can cause the consumer of the messages to never be woken up anymore: 1. Waker is stored because no message on the bus 2. Sync handler is called, waker is woken up 3. Bus is polled again and no message is on it (yet), new waker is registered 4. Bus stores the message from 2. in its queue (after the sync handler has returned BusSyncReply::Pass) 5. No new message ever appears on the bus because all this happened for the very last message Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/issues/235
This commit is contained in:
parent
63a8afafa5
commit
fe4f074e90
1 changed files with 10 additions and 23 deletions
|
@ -6,8 +6,9 @@
|
||||||
// option. This file may not be copied, modified, or distributed
|
// option. This file may not be copied, modified, or distributed
|
||||||
// except according to those terms.
|
// except according to those terms.
|
||||||
|
|
||||||
|
use futures_channel::mpsc;
|
||||||
use futures_core::stream::Stream;
|
use futures_core::stream::Stream;
|
||||||
use futures_core::task::{Context, Poll, Waker};
|
use futures_core::task::{Context, Poll};
|
||||||
use glib;
|
use glib;
|
||||||
use glib::source::{Continue, Priority, SourceId};
|
use glib::source::{Continue, Priority, SourceId};
|
||||||
use glib::translate::*;
|
use glib::translate::*;
|
||||||
|
@ -18,7 +19,6 @@ use std::cell::RefCell;
|
||||||
use std::mem::transmute;
|
use std::mem::transmute;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::ptr;
|
use std::ptr;
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use Bus;
|
use Bus;
|
||||||
use BusSyncReply;
|
use BusSyncReply;
|
||||||
|
@ -251,24 +251,20 @@ impl<'a> Iterator for Iter<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct BusStream(Bus, Arc<Mutex<Option<Waker>>>);
|
struct BusStream(Bus, mpsc::UnboundedReceiver<Message>);
|
||||||
|
|
||||||
impl BusStream {
|
impl BusStream {
|
||||||
fn new(bus: &Bus) -> Self {
|
fn new(bus: &Bus) -> Self {
|
||||||
skip_assert_initialized!();
|
skip_assert_initialized!();
|
||||||
let waker = Arc::new(Mutex::new(None::<Waker>));
|
|
||||||
let waker_clone = Arc::clone(&waker);
|
|
||||||
|
|
||||||
bus.set_sync_handler(move |_, _| {
|
let (sender, receiver) = mpsc::unbounded();
|
||||||
let mut waker = waker_clone.lock().unwrap();
|
|
||||||
if let Some(waker) = waker.take() {
|
|
||||||
waker.wake();
|
|
||||||
}
|
|
||||||
|
|
||||||
BusSyncReply::Pass
|
bus.set_sync_handler(move |_, msg| {
|
||||||
|
let _ = sender.unbounded_send(msg.to_owned());
|
||||||
|
BusSyncReply::Drop
|
||||||
});
|
});
|
||||||
|
|
||||||
BusStream(bus.clone(), waker)
|
BusStream(bus.clone(), receiver)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,17 +277,8 @@ impl Drop for BusStream {
|
||||||
impl Stream for BusStream {
|
impl Stream for BusStream {
|
||||||
type Item = Message;
|
type Item = Message;
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
let BusStream(ref bus, ref waker) = *self;
|
Pin::new(&mut self.1).poll_next(ctx)
|
||||||
|
|
||||||
let mut waker = waker.lock().unwrap();
|
|
||||||
let msg = bus.pop();
|
|
||||||
if let Some(msg) = msg {
|
|
||||||
Poll::Ready(Some(msg))
|
|
||||||
} else {
|
|
||||||
*waker = Some(ctx.waker().clone());
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue