diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index fefb6d154..42e5a1388 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -4,6 +4,7 @@ use std::{ future, mem::transmute, pin::Pin, + sync::{Arc, Mutex}, task::{Context, Poll}, }; @@ -345,20 +346,33 @@ impl BusStream { fn new(bus: &Bus) -> Self { skip_assert_initialized!(); + let mutex = Arc::new(Mutex::new(())); let (sender, receiver) = mpsc::unbounded(); - bus.set_sync_handler(move |bus, message| { - // First pop all messages that might've been previously queued before creating - // the bus stream. - while let Some(message) = bus.pop() { - let _ = sender.unbounded_send(message); + // Use a mutex to ensure that the sync handler is not putting any messages into the sender + // until we have removed all previously queued messages from the bus. + // This makes sure that the messages are staying in order. + // + // We could use the bus' object lock here but a separate mutex seems safer. + let _mutex_guard = mutex.lock().unwrap(); + bus.set_sync_handler({ + let sender = sender.clone(); + let mutex = mutex.clone(); + + move |_bus, message| { + let _mutex_guard = mutex.lock().unwrap(); + + let _ = sender.unbounded_send(message.to_owned()); + + BusSyncReply::Drop } - - let _ = sender.unbounded_send(message.to_owned()); - - BusSyncReply::Drop }); + // First pop all messages that might've been previously queued before creating the bus stream. + while let Some(message) = bus.pop() { + let _ = sender.unbounded_send(message); + } + Self { bus: bus.downgrade(), receiver,