diff --git a/ChangeLog b/ChangeLog index f595ffe8de..cfff567351 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,15 @@ +2005-07-29 Jan Schmidt + + * gst/gstbus.c: (gst_bus_set_flushing), (gst_bus_pop), + (gst_bus_peek), (gst_bus_source_dispatch), + (gst_bus_add_watch_full), (poll_handler), (poll_timeout), + (poll_destroy), (poll_destroy_timeout), (gst_bus_poll): + gst_bus_poll may be called from other threads. Handle + this nicely by not making poll_data disappear off the + stack once gst_bus_poll returns. + gst_bus_peek now increments the refcount on the returned + message. + 2005-07-29 Wim Taymans * docs/design/part-gstghostpad.txt: diff --git a/gst/gstbus.c b/gst/gstbus.c index 3966416869..9551fd57f5 100644 --- a/gst/gstbus.c +++ b/gst/gstbus.c @@ -306,12 +306,12 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing) if (flushing) { GST_FLAG_SET (bus, GST_BUS_FLUSHING); - GST_DEBUG ("set bus flushing"); + GST_DEBUG_OBJECT (bus, "set bus flushing"); while ((message = gst_bus_pop (bus))) gst_message_unref (message); } else { - GST_DEBUG ("unset bus flushing"); + GST_DEBUG_OBJECT (bus, "unset bus flushing"); GST_FLAG_UNSET (bus, GST_BUS_FLUSHING); } @@ -339,7 +339,7 @@ gst_bus_pop (GstBus * bus) message = g_queue_pop_head (bus->queue); g_mutex_unlock (bus->queue_lock); - GST_DEBUG ("pop on bus, got message %p", message); + GST_DEBUG_OBJECT (bus, "pop on bus, got message %p", message); return message; } @@ -348,15 +348,14 @@ gst_bus_pop (GstBus * bus) * gst_bus_peek: * @bus: a #GstBus * - * Peek the message on the top of the bus' queue. The bus maintains ownership of - * the message, and the message will remain on the bus' message queue. + * Peek the message on the top of the bus' queue. The message will remain + * on the bus' message queue. A reference is returned, and needs to be freed + * by the caller. * * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty. * * MT safe. */ -/* FIXME, dangerous as the bus could be set to flushing while the app holds - * a ref to the message */ GstMessage * gst_bus_peek (GstBus * bus) { @@ -366,9 +365,11 @@ gst_bus_peek (GstBus * bus) g_mutex_lock (bus->queue_lock); message = g_queue_peek_head (bus->queue); + if (message) + gst_message_ref (message); g_mutex_unlock (bus->queue_lock); - GST_DEBUG ("peek on bus, got message %p", message); + GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message); return message; } @@ -433,18 +434,19 @@ gst_bus_source_dispatch (GSource * source, GSourceFunc callback, message = gst_bus_peek (bus); - GST_DEBUG ("have message %p", message); + GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message); g_return_val_if_fail (message != NULL, TRUE); if (!handler) goto no_handler; - GST_DEBUG ("calling dispatch with %p", message); + GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message); needs_pop = handler (bus, message, user_data); + gst_message_unref (message); - GST_DEBUG ("handler returns %d", needs_pop); + GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop); if (needs_pop) { message = gst_bus_pop (bus); if (message) { @@ -463,6 +465,7 @@ no_handler: { g_warning ("GstBus watch dispatched without callback\n" "You must call g_source_connect()."); + gst_message_unref (message); return FALSE; } } @@ -542,6 +545,7 @@ gst_bus_add_watch_full (GstBus * bus, gint priority, id = g_source_attach (source, NULL); g_source_unref (source); + GST_DEBUG_OBJECT (bus, "New source %p\n", source); return id; } @@ -568,6 +572,7 @@ typedef struct { GMainLoop *loop; guint timeout_id; + gboolean source_running; GstMessageType events; GstMessageType revent; } GstBusPollData; @@ -575,10 +580,13 @@ typedef struct static gboolean poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data) { + if (!g_main_loop_is_running (poll_data->loop)) + return FALSE; + if (GST_MESSAGE_TYPE (message) & poll_data->events) { poll_data->revent = GST_MESSAGE_TYPE (message); - if (g_main_loop_is_running (poll_data->loop)) - g_main_loop_quit (poll_data->loop); + g_main_loop_quit (poll_data->loop); + /* keep the message on the queue */ return FALSE; } else { @@ -590,12 +598,32 @@ poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data) static gboolean poll_timeout (GstBusPollData * poll_data) { - poll_data->timeout_id = 0; g_main_loop_quit (poll_data->loop); + /* returning FALSE will remove the source id */ return FALSE; } +static void +poll_destroy (GstBusPollData * poll_data) +{ + poll_data->source_running = FALSE; + if (!poll_data->timeout_id) { + g_main_loop_unref (poll_data->loop); + g_free (poll_data); + } +} + +static void +poll_destroy_timeout (GstBusPollData * poll_data) +{ + poll_data->timeout_id = 0; + if (!poll_data->source_running) { + g_main_loop_unref (poll_data->loop); + g_free (poll_data); + } +} + /** * gst_bus_poll: * @bus: a #GstBus @@ -616,29 +644,37 @@ poll_timeout (GstBusPollData * poll_data) GstMessageType gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout) { - GstBusPollData poll_data; + GstBusPollData *poll_data; GstMessageType ret; guint id; + poll_data = g_new0 (GstBusPollData, 1); + g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN); + + poll_data->source_running = TRUE; + poll_data->loop = g_main_loop_new (NULL, FALSE); + poll_data->events = events; + poll_data->revent = GST_MESSAGE_UNKNOWN; + if (timeout >= 0) - poll_data.timeout_id = g_timeout_add (timeout / GST_MSECOND, - (GSourceFunc) poll_timeout, &poll_data); + poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE, + timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data, + (GDestroyNotify) poll_destroy_timeout); else - poll_data.timeout_id = 0; + poll_data->timeout_id = 0; - poll_data.loop = g_main_loop_new (NULL, FALSE); - poll_data.events = events; - poll_data.revent = GST_MESSAGE_UNKNOWN; + id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE, + (GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy); + g_main_loop_run (poll_data->loop); + ret = poll_data->revent; - id = gst_bus_add_watch (bus, (GstBusHandler) poll_handler, &poll_data); - g_main_loop_run (poll_data.loop); + if (poll_data->timeout_id) + g_source_remove (poll_data->timeout_id); + + /* poll_data may get destroyed at any time now */ g_source_remove (id); - ret = poll_data.revent; - - if (poll_data.timeout_id) - g_source_remove (poll_data.timeout_id); - g_main_loop_unref (poll_data.loop); + GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret); return ret; }