diff --git a/gst/gst_private.h b/gst/gst_private.h index da8a50a80c..2cafbee611 100644 --- a/gst/gst_private.h +++ b/gst/gst_private.h @@ -123,6 +123,7 @@ gboolean priv_gst_structure_append_to_gstring (const GstStructure * structure, gboolean gst_registry_binary_read_cache (GstRegistry * registry, const char *location); gboolean gst_registry_binary_write_cache (GstRegistry * registry, const char *location); + /* used in gstvalue.c and gststructure.c */ #define GST_ASCII_IS_STRING(c) (g_ascii_isalnum((c)) || ((c) == '_') || \ ((c) == '-') || ((c) == '+') || ((c) == '/') || ((c) == ':') || \ diff --git a/gst/gstbin.c b/gst/gstbin.c index d4375f843c..73758213e9 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -540,7 +540,7 @@ gst_bin_init (GstBin * bin, GstBinClass * klass) bin->clock_dirty = FALSE; /* Set up a bus for listening to child elements */ - bus = g_object_new (GST_TYPE_BUS, "enable-async", FALSE, NULL); + bus = gst_bus_new (); bin->child_bus = bus; GST_DEBUG_OBJECT (bin, "using bus %" GST_PTR_FORMAT " to listen to children", bus); diff --git a/gst/gstbus.c b/gst/gstbus.c index caff420ed1..bca4558016 100644 --- a/gst/gstbus.c +++ b/gst/gstbus.c @@ -75,7 +75,6 @@ #include #include "gstinfo.h" -#include "gstpoll.h" #include "gstbus.h" @@ -89,27 +88,19 @@ enum LAST_SIGNAL }; -#define DEFAULT_ENABLE_ASYNC (TRUE) - -enum -{ - PROP_0, - PROP_ENABLE_ASYNC -}; - static void gst_bus_dispose (GObject * object); +static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx); + static GstObjectClass *parent_class = NULL; static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; struct _GstBusPrivate { guint num_sync_message_emitters; + GCond *queue_cond; GSource *watch_id; - - gboolean enable_async; - GstPoll *poll; - GPollFD pollfd; + GMainContext *main_context; }; G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT); @@ -142,33 +133,6 @@ marshal_VOID__MINIOBJECT (GClosure * closure, GValue * return_value, callback (data1, gst_value_get_mini_object (param_values + 1), data2); } -static void -gst_bus_set_property (GObject * object, - guint prop_id, const GValue * value, GParamSpec * pspec) -{ - GstBus *bus = GST_BUS_CAST (object); - - switch (prop_id) { - case PROP_ENABLE_ASYNC: - bus->priv->enable_async = g_value_get_boolean (value); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_bus_constructed (GObject * object) -{ - GstBus *bus = GST_BUS_CAST (object); - - if (bus->priv->enable_async) { - bus->priv->poll = gst_poll_new_timer (); - gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd); - } -} - static void gst_bus_class_init (GstBusClass * klass) { @@ -177,25 +141,6 @@ gst_bus_class_init (GstBusClass * klass) parent_class = g_type_class_peek_parent (klass); gobject_class->dispose = gst_bus_dispose; - gobject_class->set_property = gst_bus_set_property; - gobject_class->constructed = gst_bus_constructed; - - /* GstBus:enable-async: - * - * Enable async message delivery support for bus watches, - * gst_bus_pop() and similar API. Without this only the - * synchronous message handlers are called. - * - * This property is used to create the child element buses - * in #GstBin. - * - * Since: 0.10.33 - */ - g_object_class_install_property (gobject_class, PROP_ENABLE_ASYNC, - g_param_spec_boolean ("enable-async", "Enable Async", - "Enable async message delivery for bus watches and gst_bus_pop()", - DEFAULT_ENABLE_ASYNC, - G_PARAM_CONSTRUCT_ONLY | G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS)); /** * GstBus::sync-message: @@ -239,11 +184,11 @@ gst_bus_class_init (GstBusClass * klass) static void gst_bus_init (GstBus * bus) { - bus->queue = gst_atomic_queue_new (32); + bus->queue = g_queue_new (); bus->queue_lock = g_mutex_new (); bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate); - bus->priv->enable_async = DEFAULT_ENABLE_ASYNC; + bus->priv->queue_cond = g_cond_new (); GST_DEBUG_OBJECT (bus, "created"); } @@ -258,24 +203,63 @@ gst_bus_dispose (GObject * object) g_mutex_lock (bus->queue_lock); do { - message = gst_atomic_queue_pop (bus->queue); + message = g_queue_pop_head (bus->queue); if (message) gst_message_unref (message); } while (message != NULL); - gst_atomic_queue_unref (bus->queue); + g_queue_free (bus->queue); bus->queue = NULL; g_mutex_unlock (bus->queue_lock); g_mutex_free (bus->queue_lock); bus->queue_lock = NULL; + g_cond_free (bus->priv->queue_cond); + bus->priv->queue_cond = NULL; + } - if (bus->priv->poll) - gst_poll_free (bus->priv->poll); - bus->priv->poll = NULL; + if (bus->priv->main_context) { + g_main_context_unref (bus->priv->main_context); + bus->priv->main_context = NULL; } G_OBJECT_CLASS (parent_class)->dispose (object); } +static void +gst_bus_wakeup_main_context (GstBus * bus) +{ + GMainContext *ctx; + + GST_OBJECT_LOCK (bus); + if ((ctx = bus->priv->main_context)) + g_main_context_ref (ctx); + GST_OBJECT_UNLOCK (bus); + + g_main_context_wakeup (ctx); + + if (ctx) + g_main_context_unref (ctx); +} + +static void +gst_bus_set_main_context (GstBus * bus, GMainContext * ctx) +{ + GST_OBJECT_LOCK (bus); + + if (bus->priv->main_context != NULL) { + g_main_context_unref (bus->priv->main_context); + bus->priv->main_context = NULL; + } + + if (ctx != NULL) { + bus->priv->main_context = g_main_context_ref (ctx); + } + + GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p", + ctx, g_main_context_default ()); + + GST_OBJECT_UNLOCK (bus); +} + /** * gst_bus_new: * @@ -342,11 +326,6 @@ gst_bus_post (GstBus * bus, GstMessage * message) && handler != gst_bus_sync_signal_handler) gst_bus_sync_signal_handler (bus, message, NULL); - /* If this is a bus without async message delivery - * always drop the message */ - if (!bus->priv->poll) - reply = GST_BUS_DROP; - /* now see what we should do with the message */ switch (reply) { case GST_BUS_DROP: @@ -356,10 +335,14 @@ gst_bus_post (GstBus * bus, GstMessage * message) case GST_BUS_PASS: /* pass the message to the async queue, refcount passed in the queue */ GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message); - gst_atomic_queue_push (bus->queue, message); - gst_poll_write_control (bus->priv->poll); + g_mutex_lock (bus->queue_lock); + g_queue_push_tail (bus->queue, message); + g_cond_broadcast (bus->priv->queue_cond); + g_mutex_unlock (bus->queue_lock); GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message); + gst_bus_wakeup_main_context (bus); + break; case GST_BUS_ASYNC: { @@ -377,9 +360,12 @@ gst_bus_post (GstBus * bus, GstMessage * message) * queue. When the message is handled by the app and destroyed, * the cond will be signalled and we can continue */ g_mutex_lock (lock); + g_mutex_lock (bus->queue_lock); + g_queue_push_tail (bus->queue, message); + g_cond_broadcast (bus->priv->queue_cond); + g_mutex_unlock (bus->queue_lock); - gst_atomic_queue_push (bus->queue, message); - gst_poll_write_control (bus->priv->poll); + gst_bus_wakeup_main_context (bus); /* now block till the message is freed */ g_cond_wait (cond, lock); @@ -427,8 +413,10 @@ gst_bus_have_pending (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), FALSE); + g_mutex_lock (bus->queue_lock); /* see if there is a message on the bus */ - result = gst_atomic_queue_length (bus->queue) != 0; + result = !g_queue_is_empty (bus->queue); + g_mutex_unlock (bus->queue_lock); return result; } @@ -494,24 +482,18 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, GstMessageType types) { GstMessage *message; - GTimeVal now, then; + GTimeVal *timeval, abstimeout; gboolean first_round = TRUE; g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_return_val_if_fail (types != 0, NULL); - g_return_val_if_fail (timeout == 0 || bus->priv->poll != NULL, NULL); g_mutex_lock (bus->queue_lock); while (TRUE) { - gint ret; + GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue)); - GST_LOG_OBJECT (bus, "have %d messages", - gst_atomic_queue_length (bus->queue)); - - while ((message = gst_atomic_queue_pop (bus->queue))) { - if (bus->priv->poll) - gst_poll_read_control (bus->priv->poll); + while ((message = g_queue_pop_head (bus->queue))) { GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u", message, GST_MESSAGE_TYPE_NAME (message), (guint) types); if ((GST_MESSAGE_TYPE (message) & types) != 0) { @@ -528,30 +510,28 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, if (timeout == 0) break; - else if (timeout != GST_CLOCK_TIME_NONE) { - if (first_round) { - g_get_current_time (&then); - first_round = FALSE; - } else { - GstClockTime elapsed; + if (timeout == GST_CLOCK_TIME_NONE) { + /* wait forever */ + timeval = NULL; + } else if (first_round) { + glong add = timeout / 1000; - g_get_current_time (&now); + if (add == 0) + /* no need to wait */ + break; - elapsed = GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (then); - if (timeout > elapsed) - timeout -= elapsed; - else - timeout = 0; - } + /* make timeout absolute */ + g_get_current_time (&abstimeout); + g_time_val_add (&abstimeout, add); + timeval = &abstimeout; + first_round = FALSE; + GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add); + } else { + /* calculated the absolute end time already, no need to do it again */ + GST_DEBUG_OBJECT (bus, "blocking for message, again"); + timeval = &abstimeout; /* fool compiler */ } - - /* only here in timeout case */ - g_assert (bus->priv->poll); - g_mutex_unlock (bus->queue_lock); - ret = gst_poll_wait (bus->priv->poll, timeout); - g_mutex_lock (bus->queue_lock); - - if (ret == 0) { + if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) { GST_INFO_OBJECT (bus, "timed out, breaking loop"); break; } else { @@ -664,7 +644,7 @@ gst_bus_peek (GstBus * bus) g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_mutex_lock (bus->queue_lock); - message = gst_atomic_queue_peek (bus->queue); + message = g_queue_peek_head (bus->queue); if (message) gst_message_ref (message); g_mutex_unlock (bus->queue_lock); @@ -722,13 +702,24 @@ typedef struct { GSource source; GstBus *bus; + gboolean inited; } GstBusSource; static gboolean gst_bus_source_prepare (GSource * source, gint * timeout) { + GstBusSource *bsrc = (GstBusSource *) source; + + /* we do this here now that we know that we're attached to a main context + * (we don't support detaching a source from a main context and then + * re-attaching it to a different main context) */ + if (G_UNLIKELY (!bsrc->inited)) { + gst_bus_set_main_context (bsrc->bus, g_source_get_context (source)); + bsrc->inited = TRUE; + } + *timeout = -1; - return FALSE; + return gst_bus_have_pending (bsrc->bus); } static gboolean @@ -736,7 +727,7 @@ gst_bus_source_check (GSource * source) { GstBusSource *bsrc = (GstBusSource *) source; - return bsrc->bus->priv->pollfd.revents & (G_IO_IN | G_IO_HUP | G_IO_ERR); + return gst_bus_have_pending (bsrc->bus); } static gboolean @@ -798,6 +789,7 @@ gst_bus_source_finalize (GSource * source) bus->priv->watch_id = NULL; GST_OBJECT_UNLOCK (bus); + gst_bus_set_main_context (bsource->bus, NULL); gst_object_unref (bsource->bus); bsource->bus = NULL; } @@ -825,12 +817,11 @@ gst_bus_create_watch (GstBus * bus) GstBusSource *source; g_return_val_if_fail (GST_IS_BUS (bus), NULL); - g_return_val_if_fail (bus->priv->poll != NULL, NULL); source = (GstBusSource *) g_source_new (&gst_bus_source_funcs, sizeof (GstBusSource)); source->bus = gst_object_ref (bus); - g_source_add_poll ((GSource *) source, &bus->priv->pollfd); + source->inited = FALSE; return (GSource *) source; } diff --git a/gst/gstbus.h b/gst/gstbus.h index 732591f27a..30afe6006a 100644 --- a/gst/gstbus.h +++ b/gst/gstbus.h @@ -28,7 +28,6 @@ typedef struct _GstBusClass GstBusClass; #include #include -#include G_BEGIN_DECLS @@ -116,7 +115,7 @@ struct _GstBus GstObject object; /*< private >*/ - GstAtomicQueue *queue; + GQueue *queue; GMutex *queue_lock; GstBusSyncHandler sync_handler;