From 55994477a7fdd8324f58a77b7f6fe30f2333799f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 18 Apr 2024 17:45:37 +0300 Subject: [PATCH] bus: Switch from GstAtomicQueue to GstVecDeque All accesses to it were protected either by a mutex already, or at least used yet another mutex for gst_poll_read_control() / gst_poll_write_control(). The usage of GstPoll has to stay for backwards compatibility as it is used to manage the (public) fd that can be used to wait for the bus to be ready, but this switch at least simplifies the implementation a bit and results in fewer atomic operations. Part-of: --- subprojects/gstreamer/gst/gstbus.c | 35 ++++++++++++++++++------------ 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/subprojects/gstreamer/gst/gstbus.c b/subprojects/gstreamer/gst/gstbus.c index 3f6eb23408..aaa32e8588 100644 --- a/subprojects/gstreamer/gst/gstbus.c +++ b/subprojects/gstreamer/gst/gstbus.c @@ -73,7 +73,7 @@ #endif #include -#include "gstatomicqueue.h" +#include "gstvecdeque.h" #include "gstinfo.h" #include "gstpoll.h" @@ -140,8 +140,8 @@ sync_handler_unref (SyncHandler * handler) struct _GstBusPrivate { - GstAtomicQueue *queue; GMutex queue_lock; + GstVecDeque *queue; SyncHandler *sync_handler; @@ -252,7 +252,7 @@ gst_bus_init (GstBus * bus) bus->priv = gst_bus_get_instance_private (bus); bus->priv->enable_async = DEFAULT_ENABLE_ASYNC; g_mutex_init (&bus->priv->queue_lock); - bus->priv->queue = gst_atomic_queue_new (32); + bus->priv->queue = gst_vec_deque_new (32); GST_DEBUG_OBJECT (bus, "created"); } @@ -267,11 +267,11 @@ gst_bus_dispose (GObject * object) g_mutex_lock (&bus->priv->queue_lock); do { - message = gst_atomic_queue_pop (bus->priv->queue); + message = gst_vec_deque_pop_head (bus->priv->queue); if (message) gst_message_unref (message); } while (message != NULL); - gst_atomic_queue_unref (bus->priv->queue); + gst_vec_deque_free (bus->priv->queue); bus->priv->queue = NULL; g_mutex_unlock (&bus->priv->queue_lock); g_mutex_clear (&bus->priv->queue_lock); @@ -381,18 +381,21 @@ gst_bus_post (GstBus * bus, GstMessage * message) GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message); break; case GST_BUS_PASS:{ - guint length = gst_atomic_queue_length (bus->priv->queue); + g_mutex_lock (&bus->priv->queue_lock); + gsize length = gst_vec_deque_get_length (bus->priv->queue); if (G_UNLIKELY (length > 0 && length % WARN_QUEUE_SIZE == 0)) { - GST_WARNING_OBJECT (bus, "queue overflows with %d messages. " + GST_WARNING_OBJECT (bus, + "queue overflows with %" G_GSIZE_FORMAT " messages. " "Application is too slow or is not handling messages. " "Please add a message handler, otherwise the queue will grow " "infinitely.", length); } /* pass the message to the async queue, refcount passed in the queue */ GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message); - gst_atomic_queue_push (bus->priv->queue, message); + gst_vec_deque_push_tail (bus->priv->queue, message); gst_poll_write_control (bus->priv->poll); GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message); + g_mutex_unlock (&bus->priv->queue_lock); break; } @@ -415,8 +418,10 @@ gst_bus_post (GstBus * bus, GstMessage * message) * the cond will be signalled and we can continue */ g_mutex_lock (lock); - gst_atomic_queue_push (bus->priv->queue, message); + g_mutex_lock (&bus->priv->queue_lock); + gst_vec_deque_push_tail (bus->priv->queue, message); gst_poll_write_control (bus->priv->poll); + g_mutex_unlock (&bus->priv->queue_lock); /* now block till the message is freed */ g_cond_wait (cond, lock); @@ -470,7 +475,9 @@ gst_bus_have_pending (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), FALSE); /* see if there is a message on the bus */ - result = gst_atomic_queue_length (bus->priv->queue) != 0; + g_mutex_lock (&bus->priv->queue_lock); + result = gst_vec_deque_get_length (bus->priv->queue) != 0; + g_mutex_unlock (&bus->priv->queue_lock); return result; } @@ -547,10 +554,10 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, while (TRUE) { gint ret; - GST_LOG_OBJECT (bus, "have %d messages", - gst_atomic_queue_length (bus->priv->queue)); + GST_LOG_OBJECT (bus, "have %" G_GSIZE_FORMAT " messages", + gst_vec_deque_get_length (bus->priv->queue)); - while ((message = gst_atomic_queue_pop (bus->priv->queue))) { + while ((message = gst_vec_deque_pop_head (bus->priv->queue))) { if (bus->priv->poll) { while (!gst_poll_read_control (bus->priv->poll)) { if (errno == EWOULDBLOCK) { @@ -710,7 +717,7 @@ gst_bus_peek (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_mutex_lock (&bus->priv->queue_lock); - message = gst_atomic_queue_peek (bus->priv->queue); + message = gst_vec_deque_peek_head (bus->priv->queue); if (message) gst_message_ref (message); g_mutex_unlock (&bus->priv->queue_lock);