bus: make the bus almost lockfree

Use new GstPoll functionality to wakeup the mainloop.
Use an atomic queue on the writer side to post the messages.
The reader side it protected with the lock still because we don't want multiple
concurrent readers.
This commit is contained in:
Wim Taymans 2010-10-28 13:27:43 +01:00
parent 401d73df7f
commit 14d7db1b52
2 changed files with 48 additions and 99 deletions

View file

@ -75,6 +75,7 @@
#include <sys/types.h>
#include "gstinfo.h"
#include "gstpoll.h"
#include "gstbus.h"
@ -90,8 +91,6 @@ enum
static void gst_bus_dispose (GObject * object);
static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx);
static GstObjectClass *parent_class = NULL;
static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
@ -100,7 +99,9 @@ struct _GstBusPrivate
guint num_sync_message_emitters;
GCond *queue_cond;
GSource *watch_id;
GMainContext *main_context;
GstPoll *poll;
GPollFD pollfd;
};
G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
@ -184,12 +185,15 @@ gst_bus_class_init (GstBusClass * klass)
static void
gst_bus_init (GstBus * bus)
{
bus->queue = g_queue_new ();
bus->queue = gst_atomic_queue_new (32);
bus->queue_lock = g_mutex_new ();
bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
bus->priv->queue_cond = g_cond_new ();
bus->priv->poll = gst_poll_new_timer ();
gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd);
GST_DEBUG_OBJECT (bus, "created");
}
@ -203,63 +207,24 @@ gst_bus_dispose (GObject * object)
g_mutex_lock (bus->queue_lock);
do {
message = g_queue_pop_head (bus->queue);
message = gst_atomic_queue_pop (bus->queue);
if (message)
gst_message_unref (message);
} while (message != NULL);
g_queue_free (bus->queue);
gst_atomic_queue_unref (bus->queue);
bus->queue = NULL;
g_mutex_unlock (bus->queue_lock);
g_mutex_free (bus->queue_lock);
bus->queue_lock = NULL;
g_cond_free (bus->priv->queue_cond);
bus->priv->queue_cond = NULL;
}
if (bus->priv->main_context) {
g_main_context_unref (bus->priv->main_context);
bus->priv->main_context = NULL;
gst_poll_free (bus->priv->poll);
}
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_bus_wakeup_main_context (GstBus * bus)
{
GMainContext *ctx;
GST_OBJECT_LOCK (bus);
if ((ctx = bus->priv->main_context))
g_main_context_ref (ctx);
GST_OBJECT_UNLOCK (bus);
g_main_context_wakeup (ctx);
if (ctx)
g_main_context_unref (ctx);
}
static void
gst_bus_set_main_context (GstBus * bus, GMainContext * ctx)
{
GST_OBJECT_LOCK (bus);
if (bus->priv->main_context != NULL) {
g_main_context_unref (bus->priv->main_context);
bus->priv->main_context = NULL;
}
if (ctx != NULL) {
bus->priv->main_context = g_main_context_ref (ctx);
}
GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p",
ctx, g_main_context_default ());
GST_OBJECT_UNLOCK (bus);
}
/**
* gst_bus_new:
*
@ -335,14 +300,10 @@ gst_bus_post (GstBus * bus, GstMessage * message)
case GST_BUS_PASS:
/* pass the message to the async queue, refcount passed in the queue */
GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
g_mutex_lock (bus->queue_lock);
g_queue_push_tail (bus->queue, message);
g_cond_broadcast (bus->priv->queue_cond);
g_mutex_unlock (bus->queue_lock);
gst_atomic_queue_push (bus->queue, message);
gst_poll_write_control (bus->priv->poll);
GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
gst_bus_wakeup_main_context (bus);
break;
case GST_BUS_ASYNC:
{
@ -360,12 +321,9 @@ gst_bus_post (GstBus * bus, GstMessage * message)
* queue. When the message is handled by the app and destroyed,
* the cond will be signalled and we can continue */
g_mutex_lock (lock);
g_mutex_lock (bus->queue_lock);
g_queue_push_tail (bus->queue, message);
g_cond_broadcast (bus->priv->queue_cond);
g_mutex_unlock (bus->queue_lock);
gst_bus_wakeup_main_context (bus);
gst_atomic_queue_push (bus->queue, message);
gst_poll_write_control (bus->priv->poll);
/* now block till the message is freed */
g_cond_wait (cond, lock);
@ -413,10 +371,8 @@ gst_bus_have_pending (GstBus * bus)
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_mutex_lock (bus->queue_lock);
/* see if there is a message on the bus */
result = !g_queue_is_empty (bus->queue);
g_mutex_unlock (bus->queue_lock);
result = gst_atomic_queue_length (bus->queue) != 0;
return result;
}
@ -482,7 +438,7 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
GstMessageType types)
{
GstMessage *message;
GTimeVal *timeval, abstimeout;
GTimeVal now, then;
gboolean first_round = TRUE;
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
@ -491,9 +447,13 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
g_mutex_lock (bus->queue_lock);
while (TRUE) {
GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue));
gint ret;
while ((message = g_queue_pop_head (bus->queue))) {
GST_LOG_OBJECT (bus, "have %d messages",
gst_atomic_queue_length (bus->queue));
while ((message = gst_atomic_queue_pop (bus->queue))) {
gst_poll_read_control (bus->priv->poll);
GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u",
message, GST_MESSAGE_TYPE_NAME (message), (guint) types);
if ((GST_MESSAGE_TYPE (message) & types) != 0) {
@ -510,28 +470,28 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
if (timeout == 0)
break;
if (timeout == GST_CLOCK_TIME_NONE) {
/* wait forever */
timeval = NULL;
} else if (first_round) {
glong add = timeout / 1000;
else if (timeout != GST_CLOCK_TIME_NONE) {
if (first_round) {
g_get_current_time (&then);
first_round = FALSE;
} else {
GstClockTime elapsed;
if (add == 0)
/* no need to wait */
break;
g_get_current_time (&now);
/* make timeout absolute */
g_get_current_time (&abstimeout);
g_time_val_add (&abstimeout, add);
timeval = &abstimeout;
first_round = FALSE;
GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
} else {
/* calculated the absolute end time already, no need to do it again */
GST_DEBUG_OBJECT (bus, "blocking for message, again");
timeval = &abstimeout; /* fool compiler */
elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then);
if (timeout > elapsed)
timeout -= elapsed;
else
timeout = 0;
}
}
if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
g_mutex_unlock (bus->queue_lock);
ret = gst_poll_wait (bus->priv->poll, timeout);
g_mutex_lock (bus->queue_lock);
if (ret == 0) {
GST_INFO_OBJECT (bus, "timed out, breaking loop");
break;
} else {
@ -644,7 +604,7 @@ gst_bus_peek (GstBus * bus)
g_return_val_if_fail (GST_IS_BUS (bus), NULL);
g_mutex_lock (bus->queue_lock);
message = g_queue_peek_head (bus->queue);
message = gst_atomic_queue_peek (bus->queue);
if (message)
gst_message_ref (message);
g_mutex_unlock (bus->queue_lock);
@ -702,24 +662,13 @@ typedef struct
{
GSource source;
GstBus *bus;
gboolean inited;
} GstBusSource;
static gboolean
gst_bus_source_prepare (GSource * source, gint * timeout)
{
GstBusSource *bsrc = (GstBusSource *) source;
/* we do this here now that we know that we're attached to a main context
* (we don't support detaching a source from a main context and then
* re-attaching it to a different main context) */
if (G_UNLIKELY (!bsrc->inited)) {
gst_bus_set_main_context (bsrc->bus, g_source_get_context (source));
bsrc->inited = TRUE;
}
*timeout = -1;
return gst_bus_have_pending (bsrc->bus);
return FALSE;
}
static gboolean
@ -727,7 +676,7 @@ gst_bus_source_check (GSource * source)
{
GstBusSource *bsrc = (GstBusSource *) source;
return gst_bus_have_pending (bsrc->bus);
return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR);
}
static gboolean
@ -789,7 +738,6 @@ gst_bus_source_finalize (GSource * source)
bus->priv->watch_id = NULL;
GST_OBJECT_UNLOCK (bus);
gst_bus_set_main_context (bsource->bus, NULL);
gst_object_unref (bsource->bus);
bsource->bus = NULL;
}
@ -821,7 +769,7 @@ gst_bus_create_watch (GstBus * bus)
source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
sizeof (GstBusSource));
source->bus = gst_object_ref (bus);
source->inited = FALSE;
g_source_add_poll ((GSource *) source, &bus->priv->pollfd);
return (GSource *) source;
}

View file

@ -28,6 +28,7 @@ typedef struct _GstBusClass GstBusClass;
#include <gst/gstmessage.h>
#include <gst/gstclock.h>
#include <gst/gstatomicqueue.h>
G_BEGIN_DECLS
@ -115,7 +116,7 @@ struct _GstBus
GstObject object;
/*< private >*/
GQueue *queue;
GstAtomicQueue *queue;
GMutex *queue_lock;
GstBusSyncHandler sync_handler;