message, bus: fix async message delivery

Async message delivery (where the posting thread gets blocked
until the message has been processed and/or freed) was pretty
much completely broken.

For one, don't use GMutex implementation details to check
whether a mutex has been initialized or not, esp. not
implementation details that don't hold true any more with
newer GLib versions where atomic ops and futexes are used
(spotted by Josep Torras). This led to async message
delivery no longer blocking with newer GLib versions on
Linux.

Secondly, after async delivery don't free mutex/GCond
embedded inside the just-freed message structure.

Use a new (private) mini object flag to signal GstMessage
that the message being freed is part of an async delivery
on the bus so that the dispose handler can keep the message
alive and the bus can free it once it's done cleaning up
stuff.
This commit is contained in:
Tim-Philipp Müller 2015-02-16 22:39:42 +00:00
parent 9f58aa080a
commit da7847d1ad
3 changed files with 37 additions and 7 deletions

View file

@ -426,5 +426,8 @@ struct _GstDeviceProviderFactoryClass {
gpointer _gst_reserved[GST_PADDING];
};
/* privat flag used by GstBus / GstMessage */
#define GST_MESSAGE_FLAG_ASYNC_DELIVERY (GST_MINI_OBJECT_FLAG_LAST << 0)
G_END_DECLS
#endif /* __GST_PRIVATE_H__ */

View file

@ -308,6 +308,10 @@ gst_bus_post (GstBus * bus, GstMessage * message)
GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus %" GST_PTR_FORMAT, message,
message);
/* check we didn't accidentally add a public flag that maps to same value */
g_assert (!GST_MINI_OBJECT_FLAG_IS_SET (message,
GST_MESSAGE_FLAG_ASYNC_DELIVERY));
GST_OBJECT_LOCK (bus);
/* check if the bus is flushing */
if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
@ -357,6 +361,8 @@ gst_bus_post (GstBus * bus, GstMessage * message)
g_cond_init (cond);
g_mutex_init (lock);
GST_MINI_OBJECT_FLAG_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY);
GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
/* now we lock the message mutex, send the message to the async
@ -369,12 +375,18 @@ gst_bus_post (GstBus * bus, GstMessage * message)
/* now block till the message is freed */
g_cond_wait (cond, lock);
/* we acquired a new ref from gst_message_dispose() so we can clean up */
g_mutex_unlock (lock);
GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
GST_MINI_OBJECT_FLAG_UNSET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY);
g_mutex_clear (lock);
g_cond_clear (cond);
gst_message_unref (message);
break;
}
default:

View file

@ -166,6 +166,26 @@ gst_message_type_to_quark (GstMessageType type)
return 0;
}
static gboolean
_gst_message_dispose (GstMessage * message)
{
gboolean do_free = TRUE;
if (GST_MINI_OBJECT_FLAG_IS_SET (message, GST_MESSAGE_FLAG_ASYNC_DELIVERY)) {
GST_INFO ("[msg %p] signalling async free", message);
GST_MESSAGE_LOCK (message);
GST_MESSAGE_SIGNAL (message);
GST_MESSAGE_UNLOCK (message);
/* don't free it yet, let bus finish with it first */
gst_message_ref (message);
do_free = FALSE;
}
return do_free;
}
static void
_gst_message_free (GstMessage * message)
{
@ -181,12 +201,6 @@ _gst_message_free (GstMessage * message)
GST_MESSAGE_SRC (message) = NULL;
}
if (message->lock.p) {
GST_MESSAGE_LOCK (message);
GST_MESSAGE_SIGNAL (message);
GST_MESSAGE_UNLOCK (message);
}
structure = GST_MESSAGE_STRUCTURE (message);
if (structure) {
gst_structure_set_parent_refcount (structure, NULL);
@ -235,7 +249,8 @@ gst_message_init (GstMessageImpl * message, GstMessageType type,
GstObject * src)
{
gst_mini_object_init (GST_MINI_OBJECT_CAST (message), 0, _gst_message_type,
(GstMiniObjectCopyFunction) _gst_message_copy, NULL,
(GstMiniObjectCopyFunction) _gst_message_copy,
(GstMiniObjectDisposeFunction) _gst_message_dispose,
(GstMiniObjectFreeFunction) _gst_message_free);
GST_MESSAGE_TYPE (message) = type;