diff --git a/subprojects/gstreamer/gst/gstbus.c b/subprojects/gstreamer/gst/gstbus.c index 3f6eb23408..aaa32e8588 100644 --- a/subprojects/gstreamer/gst/gstbus.c +++ b/subprojects/gstreamer/gst/gstbus.c @@ -73,7 +73,7 @@ #endif #include -#include "gstatomicqueue.h" +#include "gstvecdeque.h" #include "gstinfo.h" #include "gstpoll.h" @@ -140,8 +140,8 @@ sync_handler_unref (SyncHandler * handler) struct _GstBusPrivate { - GstAtomicQueue *queue; GMutex queue_lock; + GstVecDeque *queue; SyncHandler *sync_handler; @@ -252,7 +252,7 @@ gst_bus_init (GstBus * bus) bus->priv = gst_bus_get_instance_private (bus); bus->priv->enable_async = DEFAULT_ENABLE_ASYNC; g_mutex_init (&bus->priv->queue_lock); - bus->priv->queue = gst_atomic_queue_new (32); + bus->priv->queue = gst_vec_deque_new (32); GST_DEBUG_OBJECT (bus, "created"); } @@ -267,11 +267,11 @@ gst_bus_dispose (GObject * object) g_mutex_lock (&bus->priv->queue_lock); do { - message = gst_atomic_queue_pop (bus->priv->queue); + message = gst_vec_deque_pop_head (bus->priv->queue); if (message) gst_message_unref (message); } while (message != NULL); - gst_atomic_queue_unref (bus->priv->queue); + gst_vec_deque_free (bus->priv->queue); bus->priv->queue = NULL; g_mutex_unlock (&bus->priv->queue_lock); g_mutex_clear (&bus->priv->queue_lock); @@ -381,18 +381,21 @@ gst_bus_post (GstBus * bus, GstMessage * message) GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message); break; case GST_BUS_PASS:{ - guint length = gst_atomic_queue_length (bus->priv->queue); + g_mutex_lock (&bus->priv->queue_lock); + gsize length = gst_vec_deque_get_length (bus->priv->queue); if (G_UNLIKELY (length > 0 && length % WARN_QUEUE_SIZE == 0)) { - GST_WARNING_OBJECT (bus, "queue overflows with %d messages. " + GST_WARNING_OBJECT (bus, + "queue overflows with %" G_GSIZE_FORMAT " messages. " "Application is too slow or is not handling messages. " "Please add a message handler, otherwise the queue will grow " "infinitely.", length); } /* pass the message to the async queue, refcount passed in the queue */ GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message); - gst_atomic_queue_push (bus->priv->queue, message); + gst_vec_deque_push_tail (bus->priv->queue, message); gst_poll_write_control (bus->priv->poll); GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message); + g_mutex_unlock (&bus->priv->queue_lock); break; } @@ -415,8 +418,10 @@ gst_bus_post (GstBus * bus, GstMessage * message) * the cond will be signalled and we can continue */ g_mutex_lock (lock); - gst_atomic_queue_push (bus->priv->queue, message); + g_mutex_lock (&bus->priv->queue_lock); + gst_vec_deque_push_tail (bus->priv->queue, message); gst_poll_write_control (bus->priv->poll); + g_mutex_unlock (&bus->priv->queue_lock); /* now block till the message is freed */ g_cond_wait (cond, lock); @@ -470,7 +475,9 @@ gst_bus_have_pending (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), FALSE); /* see if there is a message on the bus */ - result = gst_atomic_queue_length (bus->priv->queue) != 0; + g_mutex_lock (&bus->priv->queue_lock); + result = gst_vec_deque_get_length (bus->priv->queue) != 0; + g_mutex_unlock (&bus->priv->queue_lock); return result; } @@ -547,10 +554,10 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, while (TRUE) { gint ret; - GST_LOG_OBJECT (bus, "have %d messages", - gst_atomic_queue_length (bus->priv->queue)); + GST_LOG_OBJECT (bus, "have %" G_GSIZE_FORMAT " messages", + gst_vec_deque_get_length (bus->priv->queue)); - while ((message = gst_atomic_queue_pop (bus->priv->queue))) { + while ((message = gst_vec_deque_pop_head (bus->priv->queue))) { if (bus->priv->poll) { while (!gst_poll_read_control (bus->priv->poll)) { if (errno == EWOULDBLOCK) { @@ -710,7 +717,7 @@ gst_bus_peek (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_mutex_lock (&bus->priv->queue_lock); - message = gst_atomic_queue_peek (bus->priv->queue); + message = gst_vec_deque_peek_head (bus->priv->queue); if (message) gst_message_ref (message); g_mutex_unlock (&bus->priv->queue_lock);