gstreamer: Ensure to provide existing messages to bus stream immediately

Previously the sync handler would only be called once a new message
arrives on the bus and only then any existing messages would be popped
from the bus and passed into the mpsc channel. This unnecessarily
delayed the messages.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1737>
This commit is contained in:
Sebastian Dröge 2025-05-27 14:28:35 +03:00 committed by GStreamer Marge Bot
parent 2c6da0be89
commit 29ab13fdf9

View file

@ -4,6 +4,7 @@ use std::{
future, future,
mem::transmute, mem::transmute,
pin::Pin, pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -345,20 +346,33 @@ impl BusStream {
fn new(bus: &Bus) -> Self { fn new(bus: &Bus) -> Self {
skip_assert_initialized!(); skip_assert_initialized!();
let mutex = Arc::new(Mutex::new(()));
let (sender, receiver) = mpsc::unbounded(); let (sender, receiver) = mpsc::unbounded();
bus.set_sync_handler(move |bus, message| { // Use a mutex to ensure that the sync handler is not putting any messages into the sender
// First pop all messages that might've been previously queued before creating // until we have removed all previously queued messages from the bus.
// the bus stream. // This makes sure that the messages are staying in order.
while let Some(message) = bus.pop() { //
let _ = sender.unbounded_send(message); // We could use the bus' object lock here but a separate mutex seems safer.
} let _mutex_guard = mutex.lock().unwrap();
bus.set_sync_handler({
let sender = sender.clone();
let mutex = mutex.clone();
move |_bus, message| {
let _mutex_guard = mutex.lock().unwrap();
let _ = sender.unbounded_send(message.to_owned()); let _ = sender.unbounded_send(message.to_owned());
BusSyncReply::Drop BusSyncReply::Drop
}
}); });
// First pop all messages that might've been previously queued before creating the bus stream.
while let Some(message) = bus.pop() {
let _ = sender.unbounded_send(message);
}
Self { Self {
bus: bus.downgrade(), bus: bus.downgrade(),
receiver, receiver,