From 06ca2759167f8ea0cdaf34db41d5a4c2cf98b75a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim-Philipp=20M=C3=BCller?= Date: Tue, 26 Apr 2011 15:42:46 +0100 Subject: [PATCH] Revert lockfree GstBus for the release Drop in old GstBus code for the release to play it safe, since regressions that are apparently hard to track down and reproduce have been reported (on windows/OSX mostly) against the lockfree version, and more time is needed to fix them. This reverts commit 03391a897001d35d1d290f27dd12e98a8b729fb4. This reverts commit 43cdbc17e6f944cdf02aeed78d1d5f6bde5190c9. This reverts commit 80eb160e0f62350271f061daa5f289d9d4277cf4. This reverts commit c41b0ade28790ffdb0e484b41cd7929c4e145dec. This reverts commit 874d60e5899dd5b89854679d1a4ad016a58ba4e0. This reverts commit 79370d4b1781af9c9a65f2d1e3498124d8c4c413. This reverts commit 2cb3e5235196eb71fb25e0a4a4b8749d6d0a8453. This reverts commit bd1c40011434c1efaa696dc98ef855ef9cce9b28. This reverts commit 4bf8f1524f6e3374b3f3bc57322337723d06b928. This reverts commit 14d7db1b527b05f029819057aef5c123ac7e013d. https://bugzilla.gnome.org/show_bug.cgi?id=647493 --- gst/gst_private.h | 1 + gst/gstbin.c | 2 +- gst/gstbus.c | 213 ++++++++++++++++++++++------------------------ gst/gstbus.h | 3 +- 4 files changed, 105 insertions(+), 114 deletions(-) diff --git a/gst/gst_private.h b/gst/gst_private.h index da8a50a80c..2cafbee611 100644 --- a/gst/gst_private.h +++ b/gst/gst_private.h @@ -123,6 +123,7 @@ gboolean priv_gst_structure_append_to_gstring (const GstStructure * structure, gboolean gst_registry_binary_read_cache (GstRegistry * registry, const char *location); gboolean gst_registry_binary_write_cache (GstRegistry * registry, const char *location); + /* used in gstvalue.c and gststructure.c */ #define GST_ASCII_IS_STRING(c) (g_ascii_isalnum((c)) || ((c) == '_') || \ ((c) == '-') || ((c) == '+') || ((c) == '/') || ((c) == ':') || \ diff --git a/gst/gstbin.c b/gst/gstbin.c index d4375f843c..73758213e9 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -540,7 +540,7 @@ gst_bin_init (GstBin * bin, GstBinClass * klass) bin->clock_dirty = FALSE; /* Set up a bus for listening to child elements */ - bus = g_object_new (GST_TYPE_BUS, "enable-async", FALSE, NULL); + bus = gst_bus_new (); bin->child_bus = bus; GST_DEBUG_OBJECT (bin, "using bus %" GST_PTR_FORMAT " to listen to children", bus); diff --git a/gst/gstbus.c b/gst/gstbus.c index caff420ed1..bca4558016 100644 --- a/gst/gstbus.c +++ b/gst/gstbus.c @@ -75,7 +75,6 @@ #include #include "gstinfo.h" -#include "gstpoll.h" #include "gstbus.h" @@ -89,27 +88,19 @@ enum LAST_SIGNAL }; -#define DEFAULT_ENABLE_ASYNC (TRUE) - -enum -{ - PROP_0, - PROP_ENABLE_ASYNC -}; - static void gst_bus_dispose (GObject * object); +static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx); + static GstObjectClass *parent_class = NULL; static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; struct _GstBusPrivate { guint num_sync_message_emitters; + GCond *queue_cond; GSource *watch_id; - - gboolean enable_async; - GstPoll *poll; - GPollFD pollfd; + GMainContext *main_context; }; G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT); @@ -142,33 +133,6 @@ marshal_VOID__MINIOBJECT (GClosure * closure, GValue * return_value, callback (data1, gst_value_get_mini_object (param_values + 1), data2); } -static void -gst_bus_set_property (GObject * object, - guint prop_id, const GValue * value, GParamSpec * pspec) -{ - GstBus *bus = GST_BUS_CAST (object); - - switch (prop_id) { - case PROP_ENABLE_ASYNC: - bus->priv->enable_async = g_value_get_boolean (value); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_bus_constructed (GObject * object) -{ - GstBus *bus = GST_BUS_CAST (object); - - if (bus->priv->enable_async) { - bus->priv->poll = gst_poll_new_timer (); - gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd); - } -} - static void gst_bus_class_init (GstBusClass * klass) { @@ -177,25 +141,6 @@ gst_bus_class_init (GstBusClass * klass) parent_class = g_type_class_peek_parent (klass); gobject_class->dispose = gst_bus_dispose; - gobject_class->set_property = gst_bus_set_property; - gobject_class->constructed = gst_bus_constructed; - - /* GstBus:enable-async: - * - * Enable async message delivery support for bus watches, - * gst_bus_pop() and similar API. Without this only the - * synchronous message handlers are called. - * - * This property is used to create the child element buses - * in #GstBin. - * - * Since: 0.10.33 - */ - g_object_class_install_property (gobject_class, PROP_ENABLE_ASYNC, - g_param_spec_boolean ("enable-async", "Enable Async", - "Enable async message delivery for bus watches and gst_bus_pop()", - DEFAULT_ENABLE_ASYNC, - G_PARAM_CONSTRUCT_ONLY | G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS)); /** * GstBus::sync-message: @@ -239,11 +184,11 @@ gst_bus_class_init (GstBusClass * klass) static void gst_bus_init (GstBus * bus) { - bus->queue = gst_atomic_queue_new (32); + bus->queue = g_queue_new (); bus->queue_lock = g_mutex_new (); bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate); - bus->priv->enable_async = DEFAULT_ENABLE_ASYNC; + bus->priv->queue_cond = g_cond_new (); GST_DEBUG_OBJECT (bus, "created"); } @@ -258,24 +203,63 @@ gst_bus_dispose (GObject * object) g_mutex_lock (bus->queue_lock); do { - message = gst_atomic_queue_pop (bus->queue); + message = g_queue_pop_head (bus->queue); if (message) gst_message_unref (message); } while (message != NULL); - gst_atomic_queue_unref (bus->queue); + g_queue_free (bus->queue); bus->queue = NULL; g_mutex_unlock (bus->queue_lock); g_mutex_free (bus->queue_lock); bus->queue_lock = NULL; + g_cond_free (bus->priv->queue_cond); + bus->priv->queue_cond = NULL; + } - if (bus->priv->poll) - gst_poll_free (bus->priv->poll); - bus->priv->poll = NULL; + if (bus->priv->main_context) { + g_main_context_unref (bus->priv->main_context); + bus->priv->main_context = NULL; } G_OBJECT_CLASS (parent_class)->dispose (object); } +static void +gst_bus_wakeup_main_context (GstBus * bus) +{ + GMainContext *ctx; + + GST_OBJECT_LOCK (bus); + if ((ctx = bus->priv->main_context)) + g_main_context_ref (ctx); + GST_OBJECT_UNLOCK (bus); + + g_main_context_wakeup (ctx); + + if (ctx) + g_main_context_unref (ctx); +} + +static void +gst_bus_set_main_context (GstBus * bus, GMainContext * ctx) +{ + GST_OBJECT_LOCK (bus); + + if (bus->priv->main_context != NULL) { + g_main_context_unref (bus->priv->main_context); + bus->priv->main_context = NULL; + } + + if (ctx != NULL) { + bus->priv->main_context = g_main_context_ref (ctx); + } + + GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p", + ctx, g_main_context_default ()); + + GST_OBJECT_UNLOCK (bus); +} + /** * gst_bus_new: * @@ -342,11 +326,6 @@ gst_bus_post (GstBus * bus, GstMessage * message) && handler != gst_bus_sync_signal_handler) gst_bus_sync_signal_handler (bus, message, NULL); - /* If this is a bus without async message delivery - * always drop the message */ - if (!bus->priv->poll) - reply = GST_BUS_DROP; - /* now see what we should do with the message */ switch (reply) { case GST_BUS_DROP: @@ -356,10 +335,14 @@ gst_bus_post (GstBus * bus, GstMessage * message) case GST_BUS_PASS: /* pass the message to the async queue, refcount passed in the queue */ GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message); - gst_atomic_queue_push (bus->queue, message); - gst_poll_write_control (bus->priv->poll); + g_mutex_lock (bus->queue_lock); + g_queue_push_tail (bus->queue, message); + g_cond_broadcast (bus->priv->queue_cond); + g_mutex_unlock (bus->queue_lock); GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message); + gst_bus_wakeup_main_context (bus); + break; case GST_BUS_ASYNC: { @@ -377,9 +360,12 @@ gst_bus_post (GstBus * bus, GstMessage * message) * queue. When the message is handled by the app and destroyed, * the cond will be signalled and we can continue */ g_mutex_lock (lock); + g_mutex_lock (bus->queue_lock); + g_queue_push_tail (bus->queue, message); + g_cond_broadcast (bus->priv->queue_cond); + g_mutex_unlock (bus->queue_lock); - gst_atomic_queue_push (bus->queue, message); - gst_poll_write_control (bus->priv->poll); + gst_bus_wakeup_main_context (bus); /* now block till the message is freed */ g_cond_wait (cond, lock); @@ -427,8 +413,10 @@ gst_bus_have_pending (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), FALSE); + g_mutex_lock (bus->queue_lock); /* see if there is a message on the bus */ - result = gst_atomic_queue_length (bus->queue) != 0; + result = !g_queue_is_empty (bus->queue); + g_mutex_unlock (bus->queue_lock); return result; } @@ -494,24 +482,18 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, GstMessageType types) { GstMessage *message; - GTimeVal now, then; + GTimeVal *timeval, abstimeout; gboolean first_round = TRUE; g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_return_val_if_fail (types != 0, NULL); - g_return_val_if_fail (timeout == 0 || bus->priv->poll != NULL, NULL); g_mutex_lock (bus->queue_lock); while (TRUE) { - gint ret; + GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue)); - GST_LOG_OBJECT (bus, "have %d messages", - gst_atomic_queue_length (bus->queue)); - - while ((message = gst_atomic_queue_pop (bus->queue))) { - if (bus->priv->poll) - gst_poll_read_control (bus->priv->poll); + while ((message = g_queue_pop_head (bus->queue))) { GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u", message, GST_MESSAGE_TYPE_NAME (message), (guint) types); if ((GST_MESSAGE_TYPE (message) & types) != 0) { @@ -528,30 +510,28 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, if (timeout == 0) break; - else if (timeout != GST_CLOCK_TIME_NONE) { - if (first_round) { - g_get_current_time (&then); - first_round = FALSE; - } else { - GstClockTime elapsed; + if (timeout == GST_CLOCK_TIME_NONE) { + /* wait forever */ + timeval = NULL; + } else if (first_round) { + glong add = timeout / 1000; - g_get_current_time (&now); + if (add == 0) + /* no need to wait */ + break; - elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then); - if (timeout > elapsed) - timeout -= elapsed; - else - timeout = 0; - } + /* make timeout absolute */ + g_get_current_time (&abstimeout); + g_time_val_add (&abstimeout, add); + timeval = &abstimeout; + first_round = FALSE; + GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add); + } else { + /* calculated the absolute end time already, no need to do it again */ + GST_DEBUG_OBJECT (bus, "blocking for message, again"); + timeval = &abstimeout; /* fool compiler */ } - - /* only here in timeout case */ - g_assert (bus->priv->poll); - g_mutex_unlock (bus->queue_lock); - ret = gst_poll_wait (bus->priv->poll, timeout); - g_mutex_lock (bus->queue_lock); - - if (ret == 0) { + if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) { GST_INFO_OBJECT (bus, "timed out, breaking loop"); break; } else { @@ -664,7 +644,7 @@ gst_bus_peek (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_mutex_lock (bus->queue_lock); - message = gst_atomic_queue_peek (bus->queue); + message = g_queue_peek_head (bus->queue); if (message) gst_message_ref (message); g_mutex_unlock (bus->queue_lock); @@ -722,13 +702,24 @@ typedef struct { GSource source; GstBus *bus; + gboolean inited; } GstBusSource; static gboolean gst_bus_source_prepare (GSource * source, gint * timeout) { + GstBusSource *bsrc = (GstBusSource *) source; + + /* we do this here now that we know that we're attached to a main context + * (we don't support detaching a source from a main context and then + * re-attaching it to a different main context) */ + if (G_UNLIKELY (!bsrc->inited)) { + gst_bus_set_main_context (bsrc->bus, g_source_get_context (source)); + bsrc->inited = TRUE; + } + *timeout = -1; - return FALSE; + return gst_bus_have_pending (bsrc->bus); } static gboolean @@ -736,7 +727,7 @@ gst_bus_source_check (GSource * source) { GstBusSource *bsrc = (GstBusSource *) source; - return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR); + return gst_bus_have_pending (bsrc->bus); } static gboolean @@ -798,6 +789,7 @@ gst_bus_source_finalize (GSource * source) bus->priv->watch_id = NULL; GST_OBJECT_UNLOCK (bus); + gst_bus_set_main_context (bsource->bus, NULL); gst_object_unref (bsource->bus); bsource->bus = NULL; } @@ -825,12 +817,11 @@ gst_bus_create_watch (GstBus * bus) GstBusSource *source; g_return_val_if_fail (GST_IS_BUS (bus), NULL); - g_return_val_if_fail (bus->priv->poll != NULL, NULL); source = (GstBusSource *) g_source_new (&gst_bus_source_funcs, sizeof (GstBusSource)); source->bus = gst_object_ref (bus); - g_source_add_poll ((GSource *) source, &bus->priv->pollfd); + source->inited = FALSE; return (GSource *) source; } diff --git a/gst/gstbus.h b/gst/gstbus.h index 732591f27a..30afe6006a 100644 --- a/gst/gstbus.h +++ b/gst/gstbus.h @@ -28,7 +28,6 @@ typedef struct _GstBusClass GstBusClass; #include #include -#include G_BEGIN_DECLS @@ -116,7 +115,7 @@ struct _GstBus GstObject object; /*< private >*/ - GstAtomicQueue *queue; + GQueue *queue; GMutex *queue_lock; GstBusSyncHandler sync_handler;