diff --git a/gst/gstbus.c b/gst/gstbus.c index 78396f44b9..ff6e87d66b 100644 --- a/gst/gstbus.c +++ b/gst/gstbus.c @@ -97,13 +97,34 @@ static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; struct _GstBusPrivate { guint num_sync_message_emitters; - GCond *queue_cond; GSource *watch_id; GstPoll *poll; GPollFD pollfd; }; +static void +ensure_poll (GstBus * bus) +{ + if (g_atomic_pointer_get (&bus->priv->poll) == NULL) { + g_mutex_lock (bus->queue_lock); + if (!bus->priv->poll) { + GstPoll *poll; + + poll = gst_poll_new_timer (); + gst_poll_get_read_gpollfd (poll, &bus->priv->pollfd); + g_atomic_pointer_set (&bus->priv->poll, poll); + } + g_mutex_unlock (bus->queue_lock); + } +} + +static GstPoll * +get_poll (GstBus * bus) +{ + return g_atomic_pointer_get (&bus->priv->poll); +} + G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT); /* fixme: do something about this */ @@ -189,10 +210,9 @@ gst_bus_init (GstBus * bus) bus->queue_lock = g_mutex_new (); bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate); - bus->priv->queue_cond = g_cond_new (); - bus->priv->poll = gst_poll_new_timer (); - gst_poll_get_read_gpollfd (bus->priv->poll, &bus->priv->pollfd); + /* Created on demand */ + bus->priv->poll = NULL; GST_DEBUG_OBJECT (bus, "created"); } @@ -216,10 +236,10 @@ gst_bus_dispose (GObject * object) g_mutex_unlock (bus->queue_lock); g_mutex_free (bus->queue_lock); bus->queue_lock = NULL; - g_cond_free (bus->priv->queue_cond); - bus->priv->queue_cond = NULL; - gst_poll_free (bus->priv->poll); + if (bus->priv->poll) + gst_poll_free (bus->priv->poll); + bus->priv->poll = NULL; } G_OBJECT_CLASS (parent_class)->dispose (object); @@ -262,6 +282,7 @@ gst_bus_post (GstBus * bus, GstMessage * message) GstBusSyncHandler handler; gboolean emit_sync_message; gpointer handler_data; + GstPoll *poll; g_return_val_if_fail (GST_IS_BUS (bus), FALSE); g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE); @@ -281,6 +302,8 @@ gst_bus_post (GstBus * bus, GstMessage * message) emit_sync_message = bus->priv->num_sync_message_emitters > 0; GST_OBJECT_UNLOCK (bus); + poll = get_poll (bus); + /* first call the sync handler if it is installed */ if (handler) reply = handler (bus, message, handler_data); @@ -301,7 +324,8 @@ gst_bus_post (GstBus * bus, GstMessage * message) /* pass the message to the async queue, refcount passed in the queue */ GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message); gst_atomic_queue_push (bus->queue, message); - gst_poll_write_control (bus->priv->poll); + if (poll) + gst_poll_write_control (poll); GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message); break; @@ -323,7 +347,8 @@ gst_bus_post (GstBus * bus, GstMessage * message) g_mutex_lock (lock); gst_atomic_queue_push (bus->queue, message); - gst_poll_write_control (bus->priv->poll); + if (poll) + gst_poll_write_control (poll); /* now block till the message is freed */ g_cond_wait (cond, lock); @@ -444,6 +469,8 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, g_return_val_if_fail (GST_IS_BUS (bus), NULL); g_return_val_if_fail (types != 0, NULL); + ensure_poll (bus); + g_mutex_lock (bus->queue_lock); while (TRUE) { @@ -453,7 +480,7 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, gst_atomic_queue_length (bus->queue)); while ((message = gst_atomic_queue_pop (bus->queue))) { - gst_poll_read_control (bus->priv->poll); + gst_poll_read_control (get_poll (bus)); GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u", message, GST_MESSAGE_TYPE_NAME (message), (guint) types); if ((GST_MESSAGE_TYPE (message) & types) != 0) { @@ -488,7 +515,7 @@ gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout, } g_mutex_unlock (bus->queue_lock); - ret = gst_poll_wait (bus->priv->poll, timeout); + ret = gst_poll_wait (get_poll (bus), timeout); g_mutex_lock (bus->queue_lock); if (ret == 0) { @@ -769,6 +796,7 @@ gst_bus_create_watch (GstBus * bus) source = (GstBusSource *) g_source_new (&gst_bus_source_funcs, sizeof (GstBusSource)); source->bus = gst_object_ref (bus); + ensure_poll (bus); g_source_add_poll ((GSource *) source, &bus->priv->pollfd); return (GSource *) source;