tests/webrtc: fix a race in the tests related to state tracking

If things progress fast enough, some state changes may not be seen be
the waiting code.

Fix by:
1. keeping a list of all the state changes
2. waiting checks each entry and if the relevant state is found, all
   states up to and including then are removed.

This ensures that any waits will see all the state sets.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1664>
This commit is contained in:
Matthew Waters 2021-11-26 21:51:57 +11:00 committed by GStreamer Marge Bot
parent 5257093268
commit f11e0e76c6

View file

@ -76,7 +76,7 @@ struct test_webrtc
GstElement *webrtc2;
GMutex lock;
GCond cond;
TestState state;
GArray *states;
guint offerror;
gpointer user_data;
GDestroyNotify data_notify;
@ -144,7 +144,8 @@ struct test_webrtc
static void
test_webrtc_signal_state_unlocked (struct test_webrtc *t, TestState state)
{
t->state = state;
GST_TRACE ("signal state 0x%x", state);
g_array_append_val (t->states, state);
g_cond_broadcast (&t->cond);
}
@ -156,6 +157,20 @@ test_webrtc_signal_state (struct test_webrtc *t, TestState state)
g_mutex_unlock (&t->lock);
}
#if 0
static gboolean
test_webrtc_state_find_unlocked (struct test_webrtc *t, TestState state,
guint * idx)
{
guint i;
for (i = 0; i < t->states->len; i++) {
if (state == g_array_index (t->states, TestState, i))
return TRUE;
}
return FALSE;
}
#endif
static void
_on_answer_set (GstPromise * promise, gpointer user_data)
{
@ -166,8 +181,7 @@ _on_answer_set (GstPromise * promise, gpointer user_data)
if (++t->answer_set_count >= 2) {
if (t->on_answer_set)
t->on_answer_set (t, answerer, promise, t->answer_set_data);
if (t->state == STATE_ANSWER_CREATED)
t->state = STATE_ANSWER_SET;
test_webrtc_signal_state_unlocked (t, STATE_ANSWER_SET);
g_cond_broadcast (&t->cond);
}
gst_promise_unref (promise);
@ -224,8 +238,7 @@ _on_answer_received (GstPromise * promise, gpointer user_data)
error:
g_clear_error (&error);
if (t->state < STATE_ERROR)
test_webrtc_signal_state_unlocked (t, STATE_ERROR);
test_webrtc_signal_state_unlocked (t, STATE_ERROR);
g_mutex_unlock (&t->lock);
return;
}
@ -240,8 +253,7 @@ _on_offer_set (GstPromise * promise, gpointer user_data)
if (++t->offer_set_count >= 2) {
if (t->on_offer_set)
t->on_offer_set (t, offeror, promise, t->offer_set_data);
if (t->state == STATE_OFFER_CREATED)
t->state = STATE_OFFER_SET;
test_webrtc_signal_state_unlocked (t, STATE_OFFER_SET);
g_cond_broadcast (&t->cond);
}
gst_promise_unref (promise);
@ -283,6 +295,12 @@ _on_offer_received (GstPromise * promise, gpointer user_data)
if (error)
goto error;
test_webrtc_signal_state_unlocked (t, STATE_OFFER_CREATED);
gst_object_ref (offeror);
gst_object_ref (answerer);
g_mutex_unlock (&t->lock);
if (t->offer_desc) {
promise = gst_promise_new_with_change_func (_on_offer_set, t, NULL);
g_signal_emit_by_name (offeror, "set-local-description", t->offer_desc,
@ -295,14 +313,14 @@ _on_offer_received (GstPromise * promise, gpointer user_data)
g_signal_emit_by_name (answerer, "create-answer", NULL, promise);
}
test_webrtc_signal_state_unlocked (t, STATE_OFFER_CREATED);
g_mutex_unlock (&t->lock);
gst_clear_object (&offeror);
gst_clear_object (&answerer);
return;
error:
g_clear_error (&error);
if (t->state < STATE_ERROR)
test_webrtc_signal_state_unlocked (t, STATE_ERROR);
test_webrtc_signal_state_unlocked (t, STATE_ERROR);
g_mutex_unlock (&t->lock);
return;
}
@ -388,8 +406,7 @@ _on_negotiation_needed (GstElement * webrtc, struct test_webrtc *t)
g_mutex_lock (&t->lock);
if (t->on_negotiation_needed)
t->on_negotiation_needed (t, webrtc, t->negotiation_data);
if (t->state == STATE_NEW)
t->state = STATE_NEGOTIATION_NEEDED;
test_webrtc_signal_state_unlocked (t, STATE_NEGOTIATION_NEEDED);
g_cond_broadcast (&t->cond);
g_mutex_unlock (&t->lock);
}
@ -549,6 +566,8 @@ test_webrtc_new (void)
g_mutex_init (&ret->lock);
g_cond_init (&ret->cond);
ret->states = g_array_new (FALSE, TRUE, sizeof (TestState));
ret->test_clock = GST_TEST_CLOCK (gst_test_clock_new ());
ret->thread = g_thread_new ("test-webrtc", (GThreadFunc) _bus_thread, ret);
@ -605,6 +624,7 @@ test_webrtc_new (void)
static void
test_webrtc_reset_negotiation (struct test_webrtc *t)
{
GST_DEBUG ("resetting negotiation");
if (t->offer_desc)
gst_webrtc_session_description_free (t->offer_desc);
t->offer_desc = NULL;
@ -678,6 +698,9 @@ test_webrtc_free (struct test_webrtc *t)
g_mutex_clear (&t->lock);
g_cond_clear (&t->cond);
g_array_free (t->states, TRUE);
t->states = NULL;
g_free (t);
}
@ -691,26 +714,38 @@ test_webrtc_create_offer (struct test_webrtc *t)
g_signal_emit_by_name (offeror, "create-offer", NULL, promise);
}
static void
static TestState
test_webrtc_wait_for_state_mask (struct test_webrtc *t, TestState state)
{
guint i;
g_mutex_lock (&t->lock);
while (((1 << t->state) & state) == 0) {
GST_INFO ("test state 0x%x, current 0x%x", state, (1 << t->state));
GST_LOG ("attempting to wait for state mask 0x%x", state);
while (TRUE) {
for (i = 0; i < t->states->len; i++) {
TestState val = g_array_index (t->states, TestState, i);
if (((1 << val) & state) != 0) {
GST_DEBUG ("found state 0x%x in wait mask 0x%x at idx %u", val, state,
i);
g_array_remove_range (t->states, 0, i + 1);
g_mutex_unlock (&t->lock);
return val;
}
}
g_cond_wait (&t->cond, &t->lock);
}
GST_INFO ("have test state 0x%x, current 0x%x", state, 1 << t->state);
g_mutex_unlock (&t->lock);
}
static void
static TestState
test_webrtc_wait_for_answer_error_eos (struct test_webrtc *t)
{
TestState states = 0;
states |= (1 << STATE_ANSWER_SET);
states |= (1 << STATE_EOS);
states |= (1 << STATE_ERROR);
test_webrtc_wait_for_state_mask (t, states);
return test_webrtc_wait_for_state_mask (t, states);
}
static void
@ -835,8 +870,8 @@ test_validate_sdp_full (struct test_webrtc *t, struct validate_sdp *offer,
test_webrtc_create_offer (t);
if (wait_mask == 0) {
test_webrtc_wait_for_answer_error_eos (t);
fail_unless (t->state == STATE_ANSWER_SET);
fail_unless_equals_int (test_webrtc_wait_for_answer_error_eos (t),
STATE_ANSWER_SET);
} else {
test_webrtc_wait_for_state_mask (t, wait_mask);
}
@ -1314,8 +1349,8 @@ GST_START_TEST (test_no_nice_elements_request_pad)
pad = gst_element_request_pad_simple (t->webrtc1, "sink_0");
fail_unless (pad == NULL);
test_webrtc_wait_for_answer_error_eos (t);
fail_unless_equals_int (STATE_ERROR, t->state);
fail_unless_equals_int (STATE_ERROR,
test_webrtc_wait_for_answer_error_eos (t));
test_webrtc_free (t);
if (nicesrc)
@ -1346,8 +1381,8 @@ GST_START_TEST (test_no_nice_elements_state_change)
t->bus_message = NULL;
gst_element_set_state (t->webrtc1, GST_STATE_READY);
test_webrtc_wait_for_answer_error_eos (t);
fail_unless_equals_int (STATE_ERROR, t->state);
fail_unless_equals_int (STATE_ERROR,
test_webrtc_wait_for_answer_error_eos (t));
test_webrtc_free (t);
if (nicesrc)
@ -4241,8 +4276,6 @@ _pad_added_harness (struct test_webrtc *t, GstElement * element,
if (GST_PAD_DIRECTION (pad) != GST_PAD_SRC)
return;
GST_ERROR ("new pad %" GST_PTR_FORMAT, pad);
h = gst_harness_new_with_element (element, NULL, GST_OBJECT_NAME (pad));
t->harnesses = g_list_prepend (t->harnesses, h);
@ -5119,7 +5152,6 @@ GST_START_TEST (test_simulcast)
fail_unless (gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp));
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
GST_ERROR ("received ssrc %u", ssrc);
for (i = 0; i < ssrcs_received->len; i++) {
if (g_array_index (ssrcs_received, guint, i) == ssrc)
break;