From 29ab13fdf98f9b58a85fd00fec44948173accc15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 27 May 2025 14:28:35 +0300 Subject: [PATCH] gstreamer: Ensure to provide existing messages to bus stream immediately Previously the sync handler would only be called once a new message arrives on the bus and only then any existing messages would be popped from the bus and passed into the mpsc channel. This unnecessarily delayed the messages. Part-of: --- gstreamer/src/bus.rs | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index fefb6d154..42e5a1388 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -4,6 +4,7 @@ use std::{ future, mem::transmute, pin::Pin, + sync::{Arc, Mutex}, task::{Context, Poll}, }; @@ -345,20 +346,33 @@ impl BusStream { fn new(bus: &Bus) -> Self { skip_assert_initialized!(); + let mutex = Arc::new(Mutex::new(())); let (sender, receiver) = mpsc::unbounded(); - bus.set_sync_handler(move |bus, message| { - // First pop all messages that might've been previously queued before creating - // the bus stream. - while let Some(message) = bus.pop() { - let _ = sender.unbounded_send(message); + // Use a mutex to ensure that the sync handler is not putting any messages into the sender + // until we have removed all previously queued messages from the bus. + // This makes sure that the messages are staying in order. + // + // We could use the bus' object lock here but a separate mutex seems safer. + let _mutex_guard = mutex.lock().unwrap(); + bus.set_sync_handler({ + let sender = sender.clone(); + let mutex = mutex.clone(); + + move |_bus, message| { + let _mutex_guard = mutex.lock().unwrap(); + + let _ = sender.unbounded_send(message.to_owned()); + + BusSyncReply::Drop } - - let _ = sender.unbounded_send(message.to_owned()); - - BusSyncReply::Drop }); + // First pop all messages that might've been previously queued before creating the bus stream. + while let Some(message) = bus.pop() { + let _ = sender.unbounded_send(message); + } + Self { bus: bus.downgrade(), receiver,