bin: Fix race conditions in tests

The latency messages are non-deterministic and can arrive before/after
async-done or during state-changes as they are posted by e.g. sinks from
their streaming thread but bins are finishing asynchronous state changes
from a secondary helper thread.

To solve this, expect latency messages at any time and assert that we
receive one at some point during the test.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2643>
This commit is contained in:
Sebastian Dröge 2022-06-21 11:51:35 +03:00 committed by GStreamer Marge Bot
parent f0d4135b3a
commit e1e2d8d58c

View file

@ -27,50 +27,95 @@
#include <gst/base/gstbasesrc.h> #include <gst/base/gstbasesrc.h>
static void static void
pop_async_done (GstBus * bus) pop_async_done (GstBus * bus, gboolean * had_latency)
{ {
GstMessage *message; GstMessage *message;
GstMessageType types = GST_MESSAGE_ASYNC_DONE;
if (!*had_latency)
types |= GST_MESSAGE_LATENCY;
GST_DEBUG ("popping async-done message"); GST_DEBUG ("popping async-done message");
message = gst_bus_poll (bus, GST_MESSAGE_ASYNC_DONE, -1);
fail_unless (message && GST_MESSAGE_TYPE (message) do {
message = gst_bus_poll (bus, types, -1);
fail_unless (message);
GST_DEBUG ("popped message %s",
gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_LATENCY) {
fail_unless (*had_latency == FALSE);
*had_latency = TRUE;
gst_clear_message (&message);
types &= ~GST_MESSAGE_LATENCY;
continue;
}
fail_unless (GST_MESSAGE_TYPE (message)
== GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE"); == GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE");
gst_message_unref (message); gst_clear_message (&message);
GST_DEBUG ("popped message"); break;
} while (TRUE);
} }
static void static void
pop_latency (GstBus * bus) pop_latency (GstBus * bus, gboolean * had_latency)
{ {
GstMessage *message; GstMessage *message;
GST_DEBUG ("popping async-done message"); if (*had_latency)
return;
GST_DEBUG ("popping latency message");
message = gst_bus_poll (bus, GST_MESSAGE_LATENCY, -1); message = gst_bus_poll (bus, GST_MESSAGE_LATENCY, -1);
fail_unless (message && GST_MESSAGE_TYPE (message) fail_unless (message);
fail_unless (GST_MESSAGE_TYPE (message)
== GST_MESSAGE_LATENCY, "did not get GST_MESSAGE_LATENCY"); == GST_MESSAGE_LATENCY, "did not get GST_MESSAGE_LATENCY");
gst_message_unref (message); GST_DEBUG ("popped message %s",
GST_DEBUG ("popped message"); gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
gst_clear_message (&message);
*had_latency = TRUE;
} }
static void static void
pop_state_changed (GstBus * bus, int count) pop_state_changed (GstBus * bus, int count, gboolean * had_latency)
{ {
GstMessage *message; GstMessage *message;
GstMessageType types = GST_MESSAGE_STATE_CHANGED;
int i; int i;
if (!*had_latency)
types |= GST_MESSAGE_LATENCY;
GST_DEBUG ("popping %d messages", count); GST_DEBUG ("popping %d messages", count);
for (i = 0; i < count; ++i) { for (i = 0; i < count; ++i) {
message = gst_bus_poll (bus, GST_MESSAGE_STATE_CHANGED, -1); do {
message = gst_bus_poll (bus, types, -1);
fail_unless (message && GST_MESSAGE_TYPE (message) fail_unless (message);
== GST_MESSAGE_STATE_CHANGED, "did not get GST_MESSAGE_STATE_CHANGED"); GST_DEBUG ("popped message %s",
gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_LATENCY) {
fail_unless (*had_latency == FALSE);
*had_latency = TRUE;
gst_clear_message (&message);
types &= ~GST_MESSAGE_LATENCY;
continue;
}
fail_unless (GST_MESSAGE_TYPE (message)
== GST_MESSAGE_STATE_CHANGED,
"did not get GST_MESSAGE_STATE_CHANGED");
gst_message_unref (message); gst_message_unref (message);
break;
} while (TRUE);
} }
GST_DEBUG ("popped %d messages", count); GST_DEBUG ("popped %d messages", count);
} }
@ -538,6 +583,7 @@ GST_START_TEST (test_message_state_changed_children)
GstBus *bus; GstBus *bus;
GstStateChangeReturn ret; GstStateChangeReturn ret;
GstState current, pending; GstState current, pending;
gboolean had_latency = FALSE;
pipeline = GST_PIPELINE (gst_pipeline_new (NULL)); pipeline = GST_PIPELINE (gst_pipeline_new (NULL));
fail_unless (pipeline != NULL, "Could not create pipeline"); fail_unless (pipeline != NULL, "Could not create pipeline");
@ -576,7 +622,7 @@ GST_START_TEST (test_message_state_changed_children)
ASSERT_OBJECT_REFCOUNT (sink, "sink", 2); ASSERT_OBJECT_REFCOUNT (sink, "sink", 2);
ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 2); ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 2);
pop_state_changed (bus, 3); pop_state_changed (bus, 3, &had_latency);
fail_if (gst_bus_have_pending (bus), "unexpected pending messages"); fail_if (gst_bus_have_pending (bus), "unexpected pending messages");
ASSERT_OBJECT_REFCOUNT (bus, "bus", 2); ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@ -619,9 +665,9 @@ GST_START_TEST (test_message_state_changed_children)
* its state_change message */ * its state_change message */
ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 3, 4); ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 3, 4);
pop_state_changed (bus, 3); pop_state_changed (bus, 3, &had_latency);
pop_async_done (bus); pop_async_done (bus, &had_latency);
pop_latency (bus); pop_latency (bus, &had_latency);
fail_if ((gst_bus_pop (bus)) != NULL); fail_if ((gst_bus_pop (bus)) != NULL);
ASSERT_OBJECT_REFCOUNT_BETWEEN (bus, "bus", 2, 3); ASSERT_OBJECT_REFCOUNT_BETWEEN (bus, "bus", 2, 3);
@ -648,7 +694,7 @@ GST_START_TEST (test_message_state_changed_children)
ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 4); ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 2, 4);
ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3); ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
pop_state_changed (bus, 3); pop_state_changed (bus, 3, &had_latency);
fail_if ((gst_bus_pop (bus)) != NULL); fail_if ((gst_bus_pop (bus)) != NULL);
ASSERT_OBJECT_REFCOUNT (bus, "bus", 2); ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@ -669,7 +715,7 @@ GST_START_TEST (test_message_state_changed_children)
ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 3, 4); ASSERT_OBJECT_REFCOUNT_BETWEEN (sink, "sink", 3, 4);
ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3); ASSERT_OBJECT_REFCOUNT (pipeline, "pipeline", 3);
pop_state_changed (bus, 6); pop_state_changed (bus, 6, &had_latency);
fail_if ((gst_bus_pop (bus)) != NULL); fail_if ((gst_bus_pop (bus)) != NULL);
ASSERT_OBJECT_REFCOUNT (src, "src", 1); ASSERT_OBJECT_REFCOUNT (src, "src", 1);
@ -696,6 +742,7 @@ GST_START_TEST (test_watch_for_state_change)
GstElement *src, *sink, *bin; GstElement *src, *sink, *bin;
GstBus *bus; GstBus *bus;
GstStateChangeReturn ret; GstStateChangeReturn ret;
gboolean had_latency = FALSE;
bin = gst_element_factory_make ("bin", NULL); bin = gst_element_factory_make ("bin", NULL);
fail_unless (bin != NULL, "Could not create bin"); fail_unless (bin != NULL, "Could not create bin");
@ -722,9 +769,9 @@ GST_START_TEST (test_watch_for_state_change)
GST_CLOCK_TIME_NONE); GST_CLOCK_TIME_NONE);
fail_unless (ret == GST_STATE_CHANGE_SUCCESS); fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
pop_state_changed (bus, 6); pop_state_changed (bus, 6, &had_latency);
pop_async_done (bus); pop_async_done (bus, &had_latency);
pop_latency (bus); pop_latency (bus, &had_latency);
fail_unless (gst_bus_have_pending (bus) == FALSE, fail_unless (gst_bus_have_pending (bus) == FALSE,
"Unexpected messages on bus"); "Unexpected messages on bus");
@ -732,16 +779,17 @@ GST_START_TEST (test_watch_for_state_change)
ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING); ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PLAYING);
fail_unless (ret == GST_STATE_CHANGE_SUCCESS); fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
pop_state_changed (bus, 3); pop_state_changed (bus, 3, &had_latency);
had_latency = FALSE;
/* this one might return either SUCCESS or ASYNC, likely SUCCESS */ /* this one might return either SUCCESS or ASYNC, likely SUCCESS */
ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED); ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE); gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE);
pop_state_changed (bus, 3); pop_state_changed (bus, 3, &had_latency);
if (ret == GST_STATE_CHANGE_ASYNC) { if (ret == GST_STATE_CHANGE_ASYNC) {
pop_async_done (bus); pop_async_done (bus, &had_latency);
pop_latency (bus); pop_latency (bus, &had_latency);
} }
fail_unless (gst_bus_have_pending (bus) == FALSE, fail_unless (gst_bus_have_pending (bus) == FALSE,
@ -898,6 +946,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
GstStateChangeReturn ret; GstStateChangeReturn ret;
GstState current, pending; GstState current, pending;
GstBus *bus; GstBus *bus;
gboolean had_latency = FALSE;
pipeline = gst_pipeline_new (NULL); pipeline = gst_pipeline_new (NULL);
fail_unless (pipeline != NULL, "Could not create pipeline"); fail_unless (pipeline != NULL, "Could not create pipeline");
@ -951,10 +1000,11 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107); ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107);
#else #else
pop_state_changed (bus, 2); /* pop remaining ready => paused messages off the bus */ pop_state_changed (bus, 2, &had_latency); /* pop remaining ready => paused messages off the bus */
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED, ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
108); 108);
pop_async_done (bus); pop_async_done (bus, &had_latency);
pop_latency (bus, &had_latency);
#endif #endif
/* PAUSED => PLAYING */ /* PAUSED => PLAYING */
GST_DEBUG ("popping PAUSED -> PLAYING messages"); GST_DEBUG ("popping PAUSED -> PLAYING messages");
@ -972,8 +1022,8 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed"); fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
/* TODO: do we need to check downwards state change order as well? */ /* TODO: do we need to check downwards state change order as well? */
pop_state_changed (bus, 4); /* pop playing => paused messages off the bus */ pop_state_changed (bus, 4, &had_latency); /* pop playing => paused messages off the bus */
pop_state_changed (bus, 4); /* pop paused => ready messages off the bus */ pop_state_changed (bus, 4, &had_latency); /* pop paused => ready messages off the bus */
while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1) while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)
THREAD_SWITCH (); THREAD_SWITCH ();
@ -1002,6 +1052,7 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
GstStateChangeReturn ret; GstStateChangeReturn ret;
GstState current, pending; GstState current, pending;
GstBus *bus; GstBus *bus;
gboolean had_latency = FALSE;
/* (2) Now again, but check other code path where we don't have /* (2) Now again, but check other code path where we don't have
* a proper sink correctly flagged as such, but a 'semi-sink' */ * a proper sink correctly flagged as such, but a 'semi-sink' */
@ -1056,10 +1107,11 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206); ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206);
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207); ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207);
#else #else
pop_state_changed (bus, 2); /* pop remaining ready => paused messages off the bus */ pop_state_changed (bus, 2, &had_latency); /* pop remaining ready => paused messages off the bus */
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED, ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
208); 208);
pop_async_done (bus); pop_async_done (bus, &had_latency);
pop_latency (bus, &had_latency);
/* PAUSED => PLAYING */ /* PAUSED => PLAYING */
GST_DEBUG ("popping PAUSED -> PLAYING messages"); GST_DEBUG ("popping PAUSED -> PLAYING messages");
@ -1076,8 +1128,8 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed"); fail_if (ret != GST_STATE_CHANGE_SUCCESS, "State change to READY failed");
/* TODO: do we need to check downwards state change order as well? */ /* TODO: do we need to check downwards state change order as well? */
pop_state_changed (bus, 4); /* pop playing => paused messages off the bus */ pop_state_changed (bus, 4, &had_latency); /* pop playing => paused messages off the bus */
pop_state_changed (bus, 4); /* pop paused => ready messages off the bus */ pop_state_changed (bus, 4, &had_latency); /* pop paused => ready messages off the bus */
GST_DEBUG ("waiting for pipeline to reach refcount 1"); GST_DEBUG ("waiting for pipeline to reach refcount 1");
while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1) while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 1)