adaptivedemux: tests: made adaptive demux test thread safe

https://bugzilla.gnome.org/show_bug.cgi?id=757361
This commit is contained in:
Florin Apostol 2015-10-20 15:46:36 +01:00 committed by Vincent Penquerc'h
parent 7254747bbe
commit 695ec674c6

View file

@ -169,18 +169,29 @@ typedef struct _GstDashDemuxTestData
/* test level scratchData data to be used by test */ /* test level scratchData data to be used by test */
GstDashDemuxTestScratchData *scratchData; GstDashDemuxTestScratchData *scratchData;
/* mutex to lock accesses to this structure when data is shared between threads */
GMutex lockTestData;
/* the number of streams that finished. /* the number of streams that finished.
* Main thread will stop the pipeline when all streams are finished * Main thread will stop the pipeline when all streams are finished
* (countStreamFinished == outputStreamArraySize) * (countStreamFinished == outputStreamArraySize)
*/ */
guint countStreamFinished; guint countStreamFinished;
GMutex lockStreamFinished;
/* true if an error is expected on pipeline
* If this is not set and an error is received, the test is failed
*/
gboolean expectError;
/* callbacks to be registered by test to influence the pipeline */ /* callbacks to be registered by test to influence the pipeline */
const Callbacks *callbacks; const Callbacks *callbacks;
} GstDashDemuxTestData; } GstDashDemuxTestData;
#define GST_TEST_GET_LOCK(d) (&(((GstDashDemuxTestData*)(d))->lockTestData))
#define GST_TEST_LOCK(d) g_mutex_lock (GST_TEST_GET_LOCK (d))
#define GST_TEST_UNLOCK(d) g_mutex_unlock (GST_TEST_GET_LOCK (d))
/* global pointer to test data, set before each test. /* global pointer to test data, set before each test.
* We need it global so that gst_fake_soup_http_src element can get the data. * We need it global so that gst_fake_soup_http_src element can get the data.
* Due to this, tests cannot be run in parallel. * Due to this, tests cannot be run in parallel.
@ -256,6 +267,9 @@ on_appSinkNewSample (GstAppSink * appsink, gpointer user_data)
gboolean ret = TRUE; gboolean ret = TRUE;
fail_unless (testData != NULL); fail_unless (testData != NULL);
GST_TEST_LOCK (testData);
testOutputStreamData = getTestOutputDataByAppsink (testData, appsink); testOutputStreamData = getTestOutputDataByAppsink (testData, appsink);
sample = gst_app_sink_pull_sample (appsink); sample = gst_app_sink_pull_sample (appsink);
@ -271,6 +285,8 @@ on_appSinkNewSample (GstAppSink * appsink, gpointer user_data)
gst_sample_unref (sample); gst_sample_unref (sample);
GST_TEST_UNLOCK (testData);
if (!ret) if (!ret)
return GST_FLOW_EOS; return GST_FLOW_EOS;
@ -286,6 +302,9 @@ on_appSinkEOS (GstAppSink * appsink, gpointer user_data)
gboolean ret = TRUE; gboolean ret = TRUE;
fail_unless (testData != NULL); fail_unless (testData != NULL);
GST_TEST_LOCK (testData);
testOutputStreamData = getTestOutputDataByAppsink (testData, appsink); testOutputStreamData = getTestOutputDataByAppsink (testData, appsink);
testOutputStreamData->scratchData->totalReceivedSize += testOutputStreamData->scratchData->totalReceivedSize +=
@ -298,17 +317,17 @@ on_appSinkEOS (GstAppSink * appsink, gpointer user_data)
if (ret) { if (ret) {
/* signal to the application that another stream has finished */ /* signal to the application that another stream has finished */
g_mutex_lock (&testData->lockStreamFinished);
testData->countStreamFinished++; testData->countStreamFinished++;
if (testData->countStreamFinished == testData->outputStreamArraySize) { if (testData->countStreamFinished == testData->outputStreamArraySize) {
g_main_loop_quit (testData->scratchData->loop); g_main_loop_quit (testData->scratchData->loop);
} }
g_mutex_unlock (&testData->lockStreamFinished);
} else { } else {
/* ignore this eos event, the stream is not finished yet */ /* ignore this eos event, the stream is not finished yet */
} }
GST_TEST_UNLOCK (testData);
} }
/* callback called when dash sends data to AppSink */ /* callback called when dash sends data to AppSink */
@ -322,6 +341,8 @@ on_dashSendsData (GstPad * pad, GstPadProbeInfo * info, gpointer data)
buffer = GST_PAD_PROBE_INFO_BUFFER (info); buffer = GST_PAD_PROBE_INFO_BUFFER (info);
GST_TEST_LOCK (testData);
streamName = gst_pad_get_name (pad); streamName = gst_pad_get_name (pad);
testOutputStreamData = getTestOutputDataByName (testData, streamName); testOutputStreamData = getTestOutputDataByName (testData, streamName);
g_free (streamName); g_free (streamName);
@ -331,6 +352,8 @@ on_dashSendsData (GstPad * pad, GstPadProbeInfo * info, gpointer data)
testOutputStreamData, buffer); testOutputStreamData, buffer);
} }
GST_TEST_UNLOCK (testData);
return GST_PAD_PROBE_OK; return GST_PAD_PROBE_OK;
} }
@ -352,12 +375,16 @@ on_dashReceivesEvent (GstPad * pad, GstPadProbeInfo * info, gpointer data)
*/ */
gst_event_parse_segment (event, &segment); gst_event_parse_segment (event, &segment);
GST_TEST_LOCK (testData);
testOutputStreamData = getTestOutputDataByPad (testData, pad); testOutputStreamData = getTestOutputDataByPad (testData, pad);
testOutputStreamData->scratchData->totalReceivedSize += testOutputStreamData->scratchData->totalReceivedSize +=
testOutputStreamData->scratchData->segmentReceivedSize; testOutputStreamData->scratchData->segmentReceivedSize;
testOutputStreamData->scratchData->segmentReceivedSize = 0; testOutputStreamData->scratchData->segmentReceivedSize = 0;
testOutputStreamData->scratchData->segmentStart = segment->start; testOutputStreamData->scratchData->segmentStart = segment->start;
GST_TEST_UNLOCK (testData);
} }
return GST_PAD_PROBE_OK; return GST_PAD_PROBE_OK;
@ -386,6 +413,8 @@ on_dashNewPad (GstElement * dashdemux, GstPad * pad, gpointer data)
sink = gst_element_factory_make ("appsink", name); sink = gst_element_factory_make ("appsink", name);
fail_unless (sink != NULL); fail_unless (sink != NULL);
GST_TEST_LOCK (testData);
testOutputStreamData = getTestOutputDataByName (testData, name); testOutputStreamData = getTestOutputDataByName (testData, name);
g_free (name); g_free (name);
@ -413,6 +442,8 @@ on_dashNewPad (GstElement * dashdemux, GstPad * pad, gpointer data)
*/ */
testOutputStreamData->scratchData->internalPad = internalPad; testOutputStreamData->scratchData->internalPad = internalPad;
GST_TEST_UNLOCK (testData);
pipeline = GST_ELEMENT (gst_element_get_parent (dashdemux)); pipeline = GST_ELEMENT (gst_element_get_parent (dashdemux));
fail_unless (pipeline != NULL); fail_unless (pipeline != NULL);
ret = gst_bin_add (GST_BIN (pipeline), sink); ret = gst_bin_add (GST_BIN (pipeline), sink);
@ -425,6 +456,32 @@ on_dashNewPad (GstElement * dashdemux, GstPad * pad, gpointer data)
} }
/* callback called when main_loop detects an error message
* We will signal main loop to quit
*/
static void
on_ErrorMessageOnBus (GstBus * bus, GstMessage * msg, gpointer data)
{
GstDashDemuxTestData *testData = (GstDashDemuxTestData *) data;
GError *err = NULL;
gchar *dbg_info = NULL;
gst_message_parse_error (msg, &err, &dbg_info);
GST_DEBUG ("ERROR from element %s: '%s'. Debugging info: %s",
GST_OBJECT_NAME (msg->src), err->message, (dbg_info) ? dbg_info : "none");
g_error_free (err);
g_free (dbg_info);
GST_TEST_LOCK (testData);
fail_unless (testData->expectError == TRUE,
"unexpected error detected on bus");
g_main_loop_quit (testData->scratchData->loop);
GST_TEST_UNLOCK (testData);
}
/* /*
* Create a dashdemux element, run a test using the input data and check * Create a dashdemux element, run a test using the input data and check
* the output data * the output data
@ -435,6 +492,7 @@ runTest (const GstFakeHttpSrcInputData * inputStreamArray,
guint outputStreamArraySize, const Callbacks * callbacks) guint outputStreamArraySize, const Callbacks * callbacks)
{ {
GstElement *pipeline; GstElement *pipeline;
GstBus *bus;
GstElement *dashdemux; GstElement *dashdemux;
GstElement *mpd_source; GstElement *mpd_source;
gboolean ret; gboolean ret;
@ -445,7 +503,7 @@ runTest (const GstFakeHttpSrcInputData * inputStreamArray,
testData->inputStreamArray = inputStreamArray; testData->inputStreamArray = inputStreamArray;
testData->outputStreamArray = outputStreamArray; testData->outputStreamArray = outputStreamArray;
testData->outputStreamArraySize = outputStreamArraySize; testData->outputStreamArraySize = outputStreamArraySize;
g_mutex_init (&testData->lockStreamFinished); g_mutex_init (&testData->lockTestData);
testData->callbacks = callbacks; testData->callbacks = callbacks;
/* allocate the scratchData space */ /* allocate the scratchData space */
@ -461,11 +519,19 @@ runTest (const GstFakeHttpSrcInputData * inputStreamArray,
testData->scratchData->loop = g_main_loop_new (NULL, TRUE); testData->scratchData->loop = g_main_loop_new (NULL, TRUE);
GST_TEST_LOCK (testData);
pipeline = gst_pipeline_new ("pipeline"); pipeline = gst_pipeline_new ("pipeline");
fail_unless (pipeline != NULL); fail_unless (pipeline != NULL);
testData->scratchData->pipeline = pipeline; testData->scratchData->pipeline = pipeline;
GST_DEBUG ("created pipeline %p", pipeline); GST_DEBUG ("created pipeline %p", pipeline);
/* register a callback to listen for error messages */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
gst_bus_add_signal_watch (bus);
g_signal_connect (bus, "message::error",
G_CALLBACK (on_ErrorMessageOnBus), testData);
dashdemux = gst_check_setup_element ("dashdemux"); dashdemux = gst_check_setup_element ("dashdemux");
fail_unless (dashdemux != NULL); fail_unless (dashdemux != NULL);
testData->scratchData->dashdemux = dashdemux; testData->scratchData->dashdemux = dashdemux;
@ -494,6 +560,8 @@ runTest (const GstFakeHttpSrcInputData * inputStreamArray,
if (callbacks->preTestCallback) if (callbacks->preTestCallback)
(*callbacks->preTestCallback) (testData); (*callbacks->preTestCallback) (testData);
GST_TEST_UNLOCK (testData);
GST_DEBUG ("starting pipeline"); GST_DEBUG ("starting pipeline");
gst_element_set_state (pipeline, GST_STATE_PLAYING); gst_element_set_state (pipeline, GST_STATE_PLAYING);
stateChange = gst_element_get_state (pipeline, NULL, NULL, stateChange = gst_element_get_state (pipeline, NULL, NULL,
@ -512,10 +580,14 @@ runTest (const GstFakeHttpSrcInputData * inputStreamArray,
GST_DEBUG ("main thread pipeline stopped"); GST_DEBUG ("main thread pipeline stopped");
gst_object_unref (pipeline); gst_object_unref (pipeline);
GST_TEST_LOCK (testData);
/* call a test callback after the stop of the pipeline */ /* call a test callback after the stop of the pipeline */
if (callbacks->postTestCallback) if (callbacks->postTestCallback)
(*callbacks->postTestCallback) (testData); (*callbacks->postTestCallback) (testData);
GST_TEST_UNLOCK (testData);
for (guint i = 0; i < outputStreamArraySize; ++i) { for (guint i = 0; i < outputStreamArraySize; ++i) {
if (outputStreamArray[i].scratchData->appsink) if (outputStreamArray[i].scratchData->appsink)
gst_object_unref (outputStreamArray[i].scratchData->appsink); gst_object_unref (outputStreamArray[i].scratchData->appsink);
@ -528,7 +600,7 @@ runTest (const GstFakeHttpSrcInputData * inputStreamArray,
} }
g_slice_free (GstDashDemuxTestScratchData, testData->scratchData); g_slice_free (GstDashDemuxTestScratchData, testData->scratchData);
g_mutex_clear (&testData->lockStreamFinished); g_mutex_clear (&testData->lockTestData);
g_testData = NULL; g_testData = NULL;
g_slice_free (GstDashDemuxTestData, testData); g_slice_free (GstDashDemuxTestData, testData);
} }
@ -1151,36 +1223,11 @@ GST_START_TEST (testSeek)
GST_END_TEST; GST_END_TEST;
/* callback called when main_loop detects an error message
* The test is passed. We will signal main loop to quit
*/
static void
testDownloadErrorOnErrorMessage (GstBus * bus, GstMessage * msg, gpointer data)
{
GstDashDemuxTestData *testData = (GstDashDemuxTestData *) data;
GError *err = NULL;
gchar *dbg_info = NULL;
gst_message_parse_error (msg, &err, &dbg_info);
GST_DEBUG ("ERROR from element %s: '%s'. Debugging info: %s",
GST_OBJECT_NAME (msg->src), err->message, (dbg_info) ? dbg_info : "none");
g_error_free (err);
g_free (dbg_info);
g_main_loop_quit (testData->scratchData->loop);
}
static void static void
testDownloadErrorPreTestCallback (GstDashDemuxTestData * testData) testDownloadErrorPreTestCallback (GstDashDemuxTestData * testData)
{ {
GstBus *bus; /* expect error on pipeline */
testData->expectError = TRUE;
/* register a callback to listen for error messages */
bus = gst_pipeline_get_bus (GST_PIPELINE (testData->scratchData->pipeline));
gst_bus_add_signal_watch (bus);
g_signal_connect (bus, "message::error",
G_CALLBACK (testDownloadErrorOnErrorMessage), testData);
} }
/* /*