bus: Switch from GstAtomicQueue to GstVecDeque

All accesses to it were protected either by a mutex already, or at least
used yet another mutex for gst_poll_read_control() / gst_poll_write_control().

The usage of GstPoll has to stay for backwards compatibility as it is
used to manage the (public) fd that can be used to wait for the bus to
be ready, but this switch at least simplifies the implementation a bit
and results in fewer atomic operations.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6684>
This commit is contained in:
Sebastian Dröge 2024-04-18 17:45:37 +03:00 committed by GStreamer Marge Bot
parent 8e589eec08
commit 55994477a7

View file

@ -73,7 +73,7 @@
#endif #endif
#include <sys/types.h> #include <sys/types.h>
#include "gstatomicqueue.h" #include "gstvecdeque.h"
#include "gstinfo.h" #include "gstinfo.h"
#include "gstpoll.h" #include "gstpoll.h"
@ -140,8 +140,8 @@ sync_handler_unref (SyncHandler * handler)
struct _GstBusPrivate struct _GstBusPrivate
{ {
GstAtomicQueue *queue;
GMutex queue_lock; GMutex queue_lock;
GstVecDeque *queue;
SyncHandler *sync_handler; SyncHandler *sync_handler;
@ -252,7 +252,7 @@ gst_bus_init (GstBus * bus)
bus->priv = gst_bus_get_instance_private (bus); bus->priv = gst_bus_get_instance_private (bus);
bus->priv->enable_async = DEFAULT_ENABLE_ASYNC; bus->priv->enable_async = DEFAULT_ENABLE_ASYNC;
g_mutex_init (&bus->priv->queue_lock); g_mutex_init (&bus->priv->queue_lock);
bus->priv->queue = gst_atomic_queue_new (32); bus->priv->queue = gst_vec_deque_new (32);
GST_DEBUG_OBJECT (bus, "created"); GST_DEBUG_OBJECT (bus, "created");
} }
@ -267,11 +267,11 @@ gst_bus_dispose (GObject * object)
g_mutex_lock (&bus->priv->queue_lock); g_mutex_lock (&bus->priv->queue_lock);
do { do {
message = gst_atomic_queue_pop (bus->priv->queue); message = gst_vec_deque_pop_head (bus->priv->queue);
if (message) if (message)
gst_message_unref (message); gst_message_unref (message);
} while (message != NULL); } while (message != NULL);
gst_atomic_queue_unref (bus->priv->queue); gst_vec_deque_free (bus->priv->queue);
bus->priv->queue = NULL; bus->priv->queue = NULL;
g_mutex_unlock (&bus->priv->queue_lock); g_mutex_unlock (&bus->priv->queue_lock);
g_mutex_clear (&bus->priv->queue_lock); g_mutex_clear (&bus->priv->queue_lock);
@ -381,18 +381,21 @@ gst_bus_post (GstBus * bus, GstMessage * message)
GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message); GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message);
break; break;
case GST_BUS_PASS:{ case GST_BUS_PASS:{
guint length = gst_atomic_queue_length (bus->priv->queue); g_mutex_lock (&bus->priv->queue_lock);
gsize length = gst_vec_deque_get_length (bus->priv->queue);
if (G_UNLIKELY (length > 0 && length % WARN_QUEUE_SIZE == 0)) { if (G_UNLIKELY (length > 0 && length % WARN_QUEUE_SIZE == 0)) {
GST_WARNING_OBJECT (bus, "queue overflows with %d messages. " GST_WARNING_OBJECT (bus,
"queue overflows with %" G_GSIZE_FORMAT " messages. "
"Application is too slow or is not handling messages. " "Application is too slow or is not handling messages. "
"Please add a message handler, otherwise the queue will grow " "Please add a message handler, otherwise the queue will grow "
"infinitely.", length); "infinitely.", length);
} }
/* pass the message to the async queue, refcount passed in the queue */ /* pass the message to the async queue, refcount passed in the queue */
GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message); GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
gst_atomic_queue_push (bus->priv->queue, message); gst_vec_deque_push_tail (bus->priv->queue, message);
gst_poll_write_control (bus->priv->poll); gst_poll_write_control (bus->priv->poll);
GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message); GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
g_mutex_unlock (&bus->priv->queue_lock);
break; break;
} }
@ -415,8 +418,10 @@ gst_bus_post (GstBus * bus, GstMessage * message)
* the cond will be signalled and we can continue */ * the cond will be signalled and we can continue */
g_mutex_lock (lock); g_mutex_lock (lock);
gst_atomic_queue_push (bus->priv->queue, message); g_mutex_lock (&bus->priv->queue_lock);
gst_vec_deque_push_tail (bus->priv->queue, message);
gst_poll_write_control (bus->priv->poll); gst_poll_write_control (bus->priv->poll);
g_mutex_unlock (&bus->priv->queue_lock);
/* now block till the message is freed */ /* now block till the message is freed */
g_cond_wait (cond, lock); g_cond_wait (cond, lock);
@ -470,7 +475,9 @@ gst_bus_have_pending (GstBus * bus)
g_return_val_if_fail (GST_IS_BUS (bus), FALSE); g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
/* see if there is a message on the bus */ /* see if there is a message on the bus */
result = gst_atomic_queue_length (bus->priv->queue) != 0; g_mutex_lock (&bus->priv->queue_lock);
result = gst_vec_deque_get_length (bus->priv->queue) != 0;
g_mutex_unlock (&bus->priv->queue_lock);
return result; return result;
} }
@ -547,10 +554,10 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
while (TRUE) { while (TRUE) {
gint ret; gint ret;
GST_LOG_OBJECT (bus, "have %d messages", GST_LOG_OBJECT (bus, "have %" G_GSIZE_FORMAT " messages",
gst_atomic_queue_length (bus->priv->queue)); gst_vec_deque_get_length (bus->priv->queue));
while ((message = gst_atomic_queue_pop (bus->priv->queue))) { while ((message = gst_vec_deque_pop_head (bus->priv->queue))) {
if (bus->priv->poll) { if (bus->priv->poll) {
while (!gst_poll_read_control (bus->priv->poll)) { while (!gst_poll_read_control (bus->priv->poll)) {
if (errno == EWOULDBLOCK) { if (errno == EWOULDBLOCK) {
@ -710,7 +717,7 @@ gst_bus_peek (GstBus * bus)
g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_mutex_lock (&bus->priv->queue_lock); g_mutex_lock (&bus->priv->queue_lock);
message = gst_atomic_queue_peek (bus->priv->queue); message = gst_vec_deque_peek_head (bus->priv->queue);
if (message) if (message)
gst_message_ref (message); gst_message_ref (message);
g_mutex_unlock (&bus->priv->queue_lock); g_mutex_unlock (&bus->priv->queue_lock);