bus: clear bus->priv->signal_watch immediately when the source is removed

There is a race-condition that can trigger the assertion in
gst_bus_add_signal_watch_full():

If gst_bus_add_signal_watch_full() is called immediately after
gst_bus_remove_signal_watch() then bus->priv->signal_watch may still be set
because gst_bus_source_dispose() or gst_bus_source_finalize() was not yet
called.
This happens if the corresponding GMainContext has the source queued for
dispatch. In this case, the following dispatch will only unref and delete
the signal_watch because it was already destroyed. Any pending messages
will remain until a new watch is installed.

So bus->priv->signal_watch can be cleared immediately when the watch is
removed. This avoid the race condition.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/543>
This commit is contained in:
Michael Olbrich 2020-06-24 07:54:42 +02:00
parent 996743ed39
commit 1206a60bac
2 changed files with 91 additions and 0 deletions

View file

@ -1096,6 +1096,7 @@ gst_bus_remove_watch (GstBus * bus)
source =
bus->priv->signal_watch ? g_source_ref (bus->priv->signal_watch) : NULL;
bus->priv->signal_watch = NULL;
GST_OBJECT_UNLOCK (bus);
@ -1498,6 +1499,7 @@ gst_bus_remove_signal_watch (GstBus * bus)
source =
bus->priv->signal_watch ? g_source_ref (bus->priv->signal_watch) : NULL;
bus->priv->signal_watch = NULL;
done:
GST_OBJECT_UNLOCK (bus);

View file

@ -429,6 +429,94 @@ GST_START_TEST (test_watch_with_poll)
GST_END_TEST;
static gint check_messages_seen;
static void
send_check_messages (guint index)
{
GstMessage *m;
GstStructure *s;
s = gst_structure_new ("test_message", "index", G_TYPE_INT, index, NULL);
m = gst_message_new_application (NULL, s);
gst_bus_post (test_bus, m);
}
static void
message_check_func (GstBus * bus, GstMessage * message, gpointer data)
{
const GstStructure *s;
int index;
g_return_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION);
s = gst_message_get_structure (message);
if (!gst_structure_get_int (s, "index", &index))
g_critical ("Invalid message");
check_messages_seen |= 1 << index;
}
/* test that removing and adding the signal watch again works */
GST_START_TEST (test_watch_twice)
{
GMainContext *ctx;
guint expect = 0;
test_bus = gst_bus_new ();
ctx = g_main_context_new ();
g_main_context_push_thread_default (ctx);
send_check_messages (0);
expect |= 1 << 0;
gst_bus_add_signal_watch (test_bus);
g_signal_connect (test_bus, "message::application",
(GCallback) message_check_func, NULL);
send_check_messages (1);
expect |= 1 << 1;
fail_unless (g_main_context_pending (ctx));
gst_bus_remove_signal_watch (test_bus);
send_check_messages (2);
expect |= 1 << 2;
gst_bus_add_signal_watch (test_bus);
send_check_messages (3);
expect |= 1 << 3;
while (g_main_context_pending (ctx))
g_main_context_iteration (ctx, FALSE);
/* this message will not arrive */
send_check_messages (4);
expect |= 0 << 4;
fail_unless (g_main_context_pending (ctx));
gst_bus_remove_signal_watch (test_bus);
/* this message should not arrive */
send_check_messages (5);
expect |= 0 << 5;
while (g_main_context_pending (ctx))
g_main_context_iteration (ctx, FALSE);
fail_unless (check_messages_seen == expect);
g_main_context_unref (ctx);
gst_object_unref (test_bus);
}
GST_END_TEST;
/* test that you get the messages with pop. */
GST_START_TEST (test_timed_pop)
{
@ -824,6 +912,7 @@ gst_bus_suite (void)
tcase_add_test (tc_chain, test_hammer_bus);
tcase_add_test (tc_chain, test_watch);
tcase_add_test (tc_chain, test_watch_with_poll);
tcase_add_test (tc_chain, test_watch_twice);
tcase_add_test (tc_chain, test_watch_with_custom_context);
tcase_add_test (tc_chain, test_add_watch_with_custom_context);
tcase_add_test (tc_chain, test_remove_watch);