aggregator: refactor the test helper

Make the test helpers use a queue. This lets us also test sequences of events,
queries and data.
This commit is contained in:
Stefan Sauer 2017-10-14 13:26:02 +02:00 committed by Tim-Philipp Müller
parent 9efffd05e2
commit fbe82b7fa1

View file

@ -194,8 +194,7 @@ gst_test_aggregator_plugin_register (void)
typedef struct
{
GstEvent *event;
GstBuffer *buffer;
GQueue *queue;
GstElement *aggregator;
GstPad *sinkpad, *srcpad;
GstFlowReturn expected_result;
@ -254,48 +253,44 @@ start_flow (ChainData * chain_data)
}
static gpointer
push_buffer (gpointer user_data)
{
GstFlowReturn flow;
ChainData *chain_data = (ChainData *) user_data;
start_flow (chain_data);
GST_INFO_OBJECT (chain_data->sinkpad, "Pushing buffer %" GST_PTR_FORMAT,
chain_data->buffer);
flow = gst_pad_push (chain_data->srcpad, chain_data->buffer);
fail_unless (flow == chain_data->expected_result,
"got flow %s instead of %s on %s:%s", gst_flow_get_name (flow),
gst_flow_get_name (chain_data->expected_result),
GST_DEBUG_PAD_NAME (chain_data->sinkpad));
chain_data->buffer = NULL;
return NULL;
}
static gpointer
push_event (gpointer user_data)
push_data (gpointer user_data)
{
ChainData *chain_data = (ChainData *) user_data;
GstTestAggregator *aggregator = (GstTestAggregator *) chain_data->aggregator;
GstEventType event_type = GST_EVENT_TYPE (chain_data->event);
GstPad *sinkpad = chain_data->sinkpad;
GstPad *srcpad = chain_data->srcpad;
gpointer data;
start_flow (chain_data);
GST_INFO_OBJECT (chain_data->sinkpad, "Pushing event: %" GST_PTR_FORMAT,
chain_data->event);
while ((data = g_queue_pop_head (chain_data->queue))) {
GST_DEBUG_OBJECT (sinkpad, "Pushing %" GST_PTR_FORMAT, data);
switch (event_type) {
case GST_EVENT_GAP:
aggregator->gap_expected = TRUE;
break;
default:
break;
/* switch on the data type and push */
if (GST_IS_BUFFER (data)) {
GstFlowReturn flow = gst_pad_push (srcpad, GST_BUFFER_CAST (data));
fail_unless (flow == chain_data->expected_result,
"got flow %s instead of %s on %s:%s", gst_flow_get_name (flow),
gst_flow_get_name (chain_data->expected_result),
GST_DEBUG_PAD_NAME (sinkpad));
} else if (GST_IS_EVENT (data)) {
switch (GST_EVENT_TYPE (data)) {
case GST_EVENT_GAP:
aggregator->gap_expected = TRUE;
break;
default:
break;
}
fail_unless (gst_pad_push_event (srcpad, GST_EVENT_CAST (data)));
} else if (GST_IS_QUERY (data)) {
/* we don't care whether the query actualy got handled */
gst_pad_peer_query (srcpad, GST_QUERY_CAST (data));
gst_query_unref (GST_QUERY_CAST (data));
} else {
GST_WARNING_OBJECT (sinkpad, "bad queue entry: %" GST_PTR_FORMAT, data);
}
}
fail_unless (gst_pad_push_event (chain_data->srcpad,
chain_data->event) == TRUE);
chain_data->event = NULL;
GST_DEBUG_OBJECT (sinkpad, "All data from queue sent");
return NULL;
}
@ -352,10 +347,12 @@ _downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, TestData * test)
* clear with _chain_data_clear after.
*/
static void
_chain_data_init (ChainData * data, GstElement * agg)
_chain_data_init (ChainData * data, GstElement * agg, ...)
{
static gint num_src_pads = 0;
gchar *pad_name = g_strdup_printf ("src%d", num_src_pads);
va_list var_args;
gpointer d;
num_src_pads += 1;
@ -363,21 +360,46 @@ _chain_data_init (ChainData * data, GstElement * agg)
g_free (pad_name);
gst_pad_set_active (data->srcpad, TRUE);
data->aggregator = agg;
data->buffer = gst_buffer_new ();
data->sinkpad = gst_element_get_request_pad (agg, "sink_%u");
fail_unless (GST_IS_PAD (data->sinkpad));
fail_unless (gst_pad_link (data->srcpad, data->sinkpad) == GST_PAD_LINK_OK);
/* add data items */
data->queue = g_queue_new ();
va_start (var_args, agg);
while (TRUE) {
if (!(d = va_arg (var_args, gpointer)))
break;
g_queue_push_tail (data->queue, d);
GST_DEBUG_OBJECT (data->sinkpad, "Adding to queue: %" GST_PTR_FORMAT, d);
}
va_end (var_args);
}
static void
_chain_data_clear (ChainData * data)
_chain_data_clear (ChainData * chain_data)
{
gst_buffer_replace (&data->buffer, NULL);
gst_event_replace (&data->event, NULL);
if (data->srcpad)
gst_object_unref (data->srcpad);
if (data->sinkpad)
gst_object_unref (data->sinkpad);
gpointer data;
while ((data = g_queue_pop_head (chain_data->queue))) {
/* switch on the data type and free */
if (GST_IS_BUFFER (data)) {
gst_buffer_unref (GST_BUFFER_CAST (data));
} else if (GST_IS_EVENT (data)) {
gst_event_unref (GST_EVENT_CAST (data));
} else if (GST_IS_QUERY (data)) {
gst_query_unref (GST_QUERY_CAST (data));
} else {
GST_WARNING_OBJECT (chain_data->sinkpad, "bad queue entry: %"
GST_PTR_FORMAT, data);
}
}
g_queue_free (chain_data->queue);
if (chain_data->srcpad)
gst_object_unref (chain_data->srcpad);
if (chain_data->sinkpad)
gst_object_unref (chain_data->sinkpad);
}
static GstFlowReturn
@ -441,11 +463,11 @@ GST_START_TEST (test_aggregate)
TestData test = { 0, };
_test_data_init (&test, FALSE);
_chain_data_init (&data1, test.aggregator);
_chain_data_init (&data2, test.aggregator);
_chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
_chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
thread1 = g_thread_try_new ("gst-check", push_buffer, &data1, NULL);
thread2 = g_thread_try_new ("gst-check", push_buffer, &data2, NULL);
thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
g_main_loop_run (test.ml);
g_source_remove (test.timeout_id);
@ -470,13 +492,11 @@ GST_START_TEST (test_aggregate_eos)
TestData test = { 0, };
_test_data_init (&test, FALSE);
_chain_data_init (&data1, test.aggregator);
_chain_data_init (&data2, test.aggregator);
_chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
_chain_data_init (&data2, test.aggregator, gst_event_new_eos (), NULL);
data2.event = gst_event_new_eos ();
thread1 = g_thread_try_new ("gst-check", push_buffer, &data1, NULL);
thread2 = g_thread_try_new ("gst-check", push_event, &data2, NULL);
thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
g_main_loop_run (test.ml);
g_source_remove (test.timeout_id);
@ -500,11 +520,10 @@ GST_START_TEST (test_aggregate_gap)
TestData test = { 0, };
_test_data_init (&test, FALSE);
_chain_data_init (&data, test.aggregator);
_chain_data_init (&data, test.aggregator,
gst_event_new_gap (TEST_GAP_PTS, TEST_GAP_DURATION), NULL);
data.event = gst_event_new_gap (TEST_GAP_PTS, TEST_GAP_DURATION);
thread = g_thread_try_new ("gst-check", push_event, &data, NULL);
thread = g_thread_try_new ("gst-check", push_data, &data, NULL);
g_main_loop_run (test.ml);
g_source_remove (test.timeout_id);
@ -708,6 +727,7 @@ GST_START_TEST (test_flushing_seek)
ChainData data1 = { 0, };
ChainData data2 = { 0, };
TestData test = { 0, };
GstBuffer *buf;
_test_data_init (&test, TRUE);
@ -716,9 +736,11 @@ GST_START_TEST (test_flushing_seek)
* buffers queued in collectpads should get flushed. Only one FLUSH_START and
* one FLUSH_STOP should be forwarded downstream.
*/
_chain_data_init (&data1, test.aggregator);
_chain_data_init (&data2, test.aggregator);
GST_BUFFER_TIMESTAMP (data2.buffer) = 0;
_chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
buf = gst_buffer_new ();
GST_BUFFER_TIMESTAMP (buf) = 0;
_chain_data_init (&data2, test.aggregator, buf, NULL);
gst_segment_init (&GST_AGGREGATOR (test.aggregator)->segment,
GST_FORMAT_TIME);
@ -740,7 +762,7 @@ GST_START_TEST (test_flushing_seek)
/* expect this buffer to be flushed */
data2.expected_result = GST_FLOW_FLUSHING;
thread2 = g_thread_try_new ("gst-check", push_buffer, &data2, NULL);
thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
fail_unless (gst_pad_push_event (data1.srcpad, gst_event_new_flush_start ()));
fail_unless_equals_int (test.flush_start_events, 1);
@ -758,7 +780,7 @@ GST_START_TEST (test_flushing_seek)
/* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
* that flushing completes once all the pads have been flushed */
thread1 = g_thread_try_new ("gst-check", push_buffer, &data1, NULL);
thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
/* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
* sent downstream */
@ -771,8 +793,8 @@ GST_START_TEST (test_flushing_seek)
gst_pad_add_probe (test.srcpad, GST_PAD_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) _aggregated_cb, test.ml, NULL);
data2.event = gst_event_new_eos ();
thread2 = g_thread_try_new ("gst-check", push_event, &data2, NULL);
g_queue_push_tail (data2.queue, gst_event_new_eos ());
thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
g_main_loop_run (test.ml);
g_source_remove (test.timeout_id);