gst/gstbus.c: gst_bus_poll may be called from other threads. Handle this nicely by not making poll_data disappear off...

Original commit message from CVS:
* gst/gstbus.c: (gst_bus_set_flushing), (gst_bus_pop),
(gst_bus_peek), (gst_bus_source_dispatch),
(gst_bus_add_watch_full), (poll_handler), (poll_timeout),
(poll_destroy), (poll_destroy_timeout), (gst_bus_poll):
gst_bus_poll may be called from other threads. Handle
this nicely by not making poll_data disappear off the
stack once gst_bus_poll returns.
gst_bus_peek now increments the refcount on the returned
message.
This commit is contained in:
Jan Schmidt 2005-07-29 15:34:52 +00:00
parent e3b39f2320
commit 9b1b3593e0
2 changed files with 76 additions and 28 deletions

View file

@ -1,3 +1,15 @@
2005-07-29 Jan Schmidt <thaytan@mad.scientist.com>
* gst/gstbus.c: (gst_bus_set_flushing), (gst_bus_pop),
(gst_bus_peek), (gst_bus_source_dispatch),
(gst_bus_add_watch_full), (poll_handler), (poll_timeout),
(poll_destroy), (poll_destroy_timeout), (gst_bus_poll):
gst_bus_poll may be called from other threads. Handle
this nicely by not making poll_data disappear off the
stack once gst_bus_poll returns.
gst_bus_peek now increments the refcount on the returned
message.
2005-07-29 Wim Taymans <wim@fluendo.com> 2005-07-29 Wim Taymans <wim@fluendo.com>
* docs/design/part-gstghostpad.txt: * docs/design/part-gstghostpad.txt:

View file

@ -306,12 +306,12 @@ gst_bus_set_flushing (GstBus * bus, gboolean flushing)
if (flushing) { if (flushing) {
GST_FLAG_SET (bus, GST_BUS_FLUSHING); GST_FLAG_SET (bus, GST_BUS_FLUSHING);
GST_DEBUG ("set bus flushing"); GST_DEBUG_OBJECT (bus, "set bus flushing");
while ((message = gst_bus_pop (bus))) while ((message = gst_bus_pop (bus)))
gst_message_unref (message); gst_message_unref (message);
} else { } else {
GST_DEBUG ("unset bus flushing"); GST_DEBUG_OBJECT (bus, "unset bus flushing");
GST_FLAG_UNSET (bus, GST_BUS_FLUSHING); GST_FLAG_UNSET (bus, GST_BUS_FLUSHING);
} }
@ -339,7 +339,7 @@ gst_bus_pop (GstBus * bus)
message = g_queue_pop_head (bus->queue); message = g_queue_pop_head (bus->queue);
g_mutex_unlock (bus->queue_lock); g_mutex_unlock (bus->queue_lock);
GST_DEBUG ("pop on bus, got message %p", message); GST_DEBUG_OBJECT (bus, "pop on bus, got message %p", message);
return message; return message;
} }
@ -348,15 +348,14 @@ gst_bus_pop (GstBus * bus)
* gst_bus_peek: * gst_bus_peek:
* @bus: a #GstBus * @bus: a #GstBus
* *
* Peek the message on the top of the bus' queue. The bus maintains ownership of * Peek the message on the top of the bus' queue. The message will remain
* the message, and the message will remain on the bus' message queue. * on the bus' message queue. A reference is returned, and needs to be freed
* by the caller.
* *
* Returns: The #GstMessage that is on the bus, or NULL if the bus is empty. * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
* *
* MT safe. * MT safe.
*/ */
/* FIXME, dangerous as the bus could be set to flushing while the app holds
* a ref to the message */
GstMessage * GstMessage *
gst_bus_peek (GstBus * bus) gst_bus_peek (GstBus * bus)
{ {
@ -366,9 +365,11 @@ gst_bus_peek (GstBus * bus)
g_mutex_lock (bus->queue_lock); g_mutex_lock (bus->queue_lock);
message = g_queue_peek_head (bus->queue); message = g_queue_peek_head (bus->queue);
if (message)
gst_message_ref (message);
g_mutex_unlock (bus->queue_lock); g_mutex_unlock (bus->queue_lock);
GST_DEBUG ("peek on bus, got message %p", message); GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
return message; return message;
} }
@ -433,18 +434,19 @@ gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
message = gst_bus_peek (bus); message = gst_bus_peek (bus);
GST_DEBUG ("have message %p", message); GST_DEBUG_OBJECT (bus, "source %p have message %p", source, message);
g_return_val_if_fail (message != NULL, TRUE); g_return_val_if_fail (message != NULL, TRUE);
if (!handler) if (!handler)
goto no_handler; goto no_handler;
GST_DEBUG ("calling dispatch with %p", message); GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
needs_pop = handler (bus, message, user_data); needs_pop = handler (bus, message, user_data);
gst_message_unref (message);
GST_DEBUG ("handler returns %d", needs_pop); GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, needs_pop);
if (needs_pop) { if (needs_pop) {
message = gst_bus_pop (bus); message = gst_bus_pop (bus);
if (message) { if (message) {
@ -463,6 +465,7 @@ no_handler:
{ {
g_warning ("GstBus watch dispatched without callback\n" g_warning ("GstBus watch dispatched without callback\n"
"You must call g_source_connect()."); "You must call g_source_connect().");
gst_message_unref (message);
return FALSE; return FALSE;
} }
} }
@ -542,6 +545,7 @@ gst_bus_add_watch_full (GstBus * bus, gint priority,
id = g_source_attach (source, NULL); id = g_source_attach (source, NULL);
g_source_unref (source); g_source_unref (source);
GST_DEBUG_OBJECT (bus, "New source %p\n", source);
return id; return id;
} }
@ -568,6 +572,7 @@ typedef struct
{ {
GMainLoop *loop; GMainLoop *loop;
guint timeout_id; guint timeout_id;
gboolean source_running;
GstMessageType events; GstMessageType events;
GstMessageType revent; GstMessageType revent;
} GstBusPollData; } GstBusPollData;
@ -575,10 +580,13 @@ typedef struct
static gboolean static gboolean
poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data) poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
{ {
if (!g_main_loop_is_running (poll_data->loop))
return FALSE;
if (GST_MESSAGE_TYPE (message) & poll_data->events) { if (GST_MESSAGE_TYPE (message) & poll_data->events) {
poll_data->revent = GST_MESSAGE_TYPE (message); poll_data->revent = GST_MESSAGE_TYPE (message);
if (g_main_loop_is_running (poll_data->loop))
g_main_loop_quit (poll_data->loop); g_main_loop_quit (poll_data->loop);
/* keep the message on the queue */ /* keep the message on the queue */
return FALSE; return FALSE;
} else { } else {
@ -590,12 +598,32 @@ poll_handler (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
static gboolean static gboolean
poll_timeout (GstBusPollData * poll_data) poll_timeout (GstBusPollData * poll_data)
{ {
poll_data->timeout_id = 0;
g_main_loop_quit (poll_data->loop); g_main_loop_quit (poll_data->loop);
/* returning FALSE will remove the source id */ /* returning FALSE will remove the source id */
return FALSE; return FALSE;
} }
static void
poll_destroy (GstBusPollData * poll_data)
{
poll_data->source_running = FALSE;
if (!poll_data->timeout_id) {
g_main_loop_unref (poll_data->loop);
g_free (poll_data);
}
}
static void
poll_destroy_timeout (GstBusPollData * poll_data)
{
poll_data->timeout_id = 0;
if (!poll_data->source_running) {
g_main_loop_unref (poll_data->loop);
g_free (poll_data);
}
}
/** /**
* gst_bus_poll: * gst_bus_poll:
* @bus: a #GstBus * @bus: a #GstBus
@ -616,29 +644,37 @@ poll_timeout (GstBusPollData * poll_data)
GstMessageType GstMessageType
gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout) gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
{ {
GstBusPollData poll_data; GstBusPollData *poll_data;
GstMessageType ret; GstMessageType ret;
guint id; guint id;
poll_data = g_new0 (GstBusPollData, 1);
g_return_val_if_fail (poll_data != NULL, GST_MESSAGE_UNKNOWN);
poll_data->source_running = TRUE;
poll_data->loop = g_main_loop_new (NULL, FALSE);
poll_data->events = events;
poll_data->revent = GST_MESSAGE_UNKNOWN;
if (timeout >= 0) if (timeout >= 0)
poll_data.timeout_id = g_timeout_add (timeout / GST_MSECOND, poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
(GSourceFunc) poll_timeout, &poll_data); timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
(GDestroyNotify) poll_destroy_timeout);
else else
poll_data.timeout_id = 0; poll_data->timeout_id = 0;
poll_data.loop = g_main_loop_new (NULL, FALSE); id = gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT_IDLE,
poll_data.events = events; (GstBusHandler) poll_handler, poll_data, (GDestroyNotify) poll_destroy);
poll_data.revent = GST_MESSAGE_UNKNOWN; g_main_loop_run (poll_data->loop);
ret = poll_data->revent;
id = gst_bus_add_watch (bus, (GstBusHandler) poll_handler, &poll_data); if (poll_data->timeout_id)
g_main_loop_run (poll_data.loop); g_source_remove (poll_data->timeout_id);
/* poll_data may get destroyed at any time now */
g_source_remove (id); g_source_remove (id);
ret = poll_data.revent; GST_DEBUG_OBJECT (bus, "finished poll with messagetype %d", ret);
if (poll_data.timeout_id)
g_source_remove (poll_data.timeout_id);
g_main_loop_unref (poll_data.loop);
return ret; return ret;
} }