diff --git a/tests/check/libs/aggregator.c b/tests/check/libs/aggregator.c index 692d86706f..24d7a95947 100644 --- a/tests/check/libs/aggregator.c +++ b/tests/check/libs/aggregator.c @@ -194,8 +194,7 @@ gst_test_aggregator_plugin_register (void) typedef struct { - GstEvent *event; - GstBuffer *buffer; + GQueue *queue; GstElement *aggregator; GstPad *sinkpad, *srcpad; GstFlowReturn expected_result; @@ -254,48 +253,44 @@ start_flow (ChainData * chain_data) } static gpointer -push_buffer (gpointer user_data) -{ - GstFlowReturn flow; - ChainData *chain_data = (ChainData *) user_data; - - start_flow (chain_data); - - GST_INFO_OBJECT (chain_data->sinkpad, "Pushing buffer %" GST_PTR_FORMAT, - chain_data->buffer); - flow = gst_pad_push (chain_data->srcpad, chain_data->buffer); - fail_unless (flow == chain_data->expected_result, - "got flow %s instead of %s on %s:%s", gst_flow_get_name (flow), - gst_flow_get_name (chain_data->expected_result), - GST_DEBUG_PAD_NAME (chain_data->sinkpad)); - chain_data->buffer = NULL; - - return NULL; -} - -static gpointer -push_event (gpointer user_data) +push_data (gpointer user_data) { ChainData *chain_data = (ChainData *) user_data; GstTestAggregator *aggregator = (GstTestAggregator *) chain_data->aggregator; - GstEventType event_type = GST_EVENT_TYPE (chain_data->event); + GstPad *sinkpad = chain_data->sinkpad; + GstPad *srcpad = chain_data->srcpad; + gpointer data; start_flow (chain_data); - GST_INFO_OBJECT (chain_data->sinkpad, "Pushing event: %" GST_PTR_FORMAT, - chain_data->event); + while ((data = g_queue_pop_head (chain_data->queue))) { + GST_DEBUG_OBJECT (sinkpad, "Pushing %" GST_PTR_FORMAT, data); - switch (event_type) { - case GST_EVENT_GAP: - aggregator->gap_expected = TRUE; - break; - default: - break; + /* switch on the data type and push */ + if (GST_IS_BUFFER (data)) { + GstFlowReturn flow = gst_pad_push (srcpad, GST_BUFFER_CAST (data)); + fail_unless (flow == chain_data->expected_result, + "got flow %s instead of %s on %s:%s", gst_flow_get_name (flow), + gst_flow_get_name (chain_data->expected_result), + GST_DEBUG_PAD_NAME (sinkpad)); + } else if (GST_IS_EVENT (data)) { + switch (GST_EVENT_TYPE (data)) { + case GST_EVENT_GAP: + aggregator->gap_expected = TRUE; + break; + default: + break; + } + fail_unless (gst_pad_push_event (srcpad, GST_EVENT_CAST (data))); + } else if (GST_IS_QUERY (data)) { + /* we don't care whether the query actualy got handled */ + gst_pad_peer_query (srcpad, GST_QUERY_CAST (data)); + gst_query_unref (GST_QUERY_CAST (data)); + } else { + GST_WARNING_OBJECT (sinkpad, "bad queue entry: %" GST_PTR_FORMAT, data); + } } - - fail_unless (gst_pad_push_event (chain_data->srcpad, - chain_data->event) == TRUE); - chain_data->event = NULL; + GST_DEBUG_OBJECT (sinkpad, "All data from queue sent"); return NULL; } @@ -352,10 +347,12 @@ _downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, TestData * test) * clear with _chain_data_clear after. */ static void -_chain_data_init (ChainData * data, GstElement * agg) +_chain_data_init (ChainData * data, GstElement * agg, ...) { static gint num_src_pads = 0; gchar *pad_name = g_strdup_printf ("src%d", num_src_pads); + va_list var_args; + gpointer d; num_src_pads += 1; @@ -363,21 +360,46 @@ _chain_data_init (ChainData * data, GstElement * agg) g_free (pad_name); gst_pad_set_active (data->srcpad, TRUE); data->aggregator = agg; - data->buffer = gst_buffer_new (); data->sinkpad = gst_element_get_request_pad (agg, "sink_%u"); fail_unless (GST_IS_PAD (data->sinkpad)); fail_unless (gst_pad_link (data->srcpad, data->sinkpad) == GST_PAD_LINK_OK); + + /* add data items */ + data->queue = g_queue_new (); + va_start (var_args, agg); + while (TRUE) { + if (!(d = va_arg (var_args, gpointer))) + break; + g_queue_push_tail (data->queue, d); + GST_DEBUG_OBJECT (data->sinkpad, "Adding to queue: %" GST_PTR_FORMAT, d); + } + va_end (var_args); } static void -_chain_data_clear (ChainData * data) +_chain_data_clear (ChainData * chain_data) { - gst_buffer_replace (&data->buffer, NULL); - gst_event_replace (&data->event, NULL); - if (data->srcpad) - gst_object_unref (data->srcpad); - if (data->sinkpad) - gst_object_unref (data->sinkpad); + gpointer data; + + while ((data = g_queue_pop_head (chain_data->queue))) { + /* switch on the data type and free */ + if (GST_IS_BUFFER (data)) { + gst_buffer_unref (GST_BUFFER_CAST (data)); + } else if (GST_IS_EVENT (data)) { + gst_event_unref (GST_EVENT_CAST (data)); + } else if (GST_IS_QUERY (data)) { + gst_query_unref (GST_QUERY_CAST (data)); + } else { + GST_WARNING_OBJECT (chain_data->sinkpad, "bad queue entry: %" + GST_PTR_FORMAT, data); + } + } + g_queue_free (chain_data->queue); + + if (chain_data->srcpad) + gst_object_unref (chain_data->srcpad); + if (chain_data->sinkpad) + gst_object_unref (chain_data->sinkpad); } static GstFlowReturn @@ -441,11 +463,11 @@ GST_START_TEST (test_aggregate) TestData test = { 0, }; _test_data_init (&test, FALSE); - _chain_data_init (&data1, test.aggregator); - _chain_data_init (&data2, test.aggregator); + _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL); + _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL); - thread1 = g_thread_try_new ("gst-check", push_buffer, &data1, NULL); - thread2 = g_thread_try_new ("gst-check", push_buffer, &data2, NULL); + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); g_main_loop_run (test.ml); g_source_remove (test.timeout_id); @@ -470,13 +492,11 @@ GST_START_TEST (test_aggregate_eos) TestData test = { 0, }; _test_data_init (&test, FALSE); - _chain_data_init (&data1, test.aggregator); - _chain_data_init (&data2, test.aggregator); + _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL); + _chain_data_init (&data2, test.aggregator, gst_event_new_eos (), NULL); - data2.event = gst_event_new_eos (); - - thread1 = g_thread_try_new ("gst-check", push_buffer, &data1, NULL); - thread2 = g_thread_try_new ("gst-check", push_event, &data2, NULL); + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); g_main_loop_run (test.ml); g_source_remove (test.timeout_id); @@ -500,11 +520,10 @@ GST_START_TEST (test_aggregate_gap) TestData test = { 0, }; _test_data_init (&test, FALSE); - _chain_data_init (&data, test.aggregator); + _chain_data_init (&data, test.aggregator, + gst_event_new_gap (TEST_GAP_PTS, TEST_GAP_DURATION), NULL); - data.event = gst_event_new_gap (TEST_GAP_PTS, TEST_GAP_DURATION); - - thread = g_thread_try_new ("gst-check", push_event, &data, NULL); + thread = g_thread_try_new ("gst-check", push_data, &data, NULL); g_main_loop_run (test.ml); g_source_remove (test.timeout_id); @@ -708,6 +727,7 @@ GST_START_TEST (test_flushing_seek) ChainData data1 = { 0, }; ChainData data2 = { 0, }; TestData test = { 0, }; + GstBuffer *buf; _test_data_init (&test, TRUE); @@ -716,9 +736,11 @@ GST_START_TEST (test_flushing_seek) * buffers queued in collectpads should get flushed. Only one FLUSH_START and * one FLUSH_STOP should be forwarded downstream. */ - _chain_data_init (&data1, test.aggregator); - _chain_data_init (&data2, test.aggregator); - GST_BUFFER_TIMESTAMP (data2.buffer) = 0; + _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL); + + buf = gst_buffer_new (); + GST_BUFFER_TIMESTAMP (buf) = 0; + _chain_data_init (&data2, test.aggregator, buf, NULL); gst_segment_init (&GST_AGGREGATOR (test.aggregator)->segment, GST_FORMAT_TIME); @@ -740,7 +762,7 @@ GST_START_TEST (test_flushing_seek) /* expect this buffer to be flushed */ data2.expected_result = GST_FLOW_FLUSHING; - thread2 = g_thread_try_new ("gst-check", push_buffer, &data2, NULL); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); fail_unless (gst_pad_push_event (data1.srcpad, gst_event_new_flush_start ())); fail_unless_equals_int (test.flush_start_events, 1); @@ -758,7 +780,7 @@ GST_START_TEST (test_flushing_seek) /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify * that flushing completes once all the pads have been flushed */ - thread1 = g_thread_try_new ("gst-check", push_buffer, &data1, NULL); + thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL); /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is * sent downstream */ @@ -771,8 +793,8 @@ GST_START_TEST (test_flushing_seek) gst_pad_add_probe (test.srcpad, GST_PAD_PROBE_TYPE_BUFFER, (GstPadProbeCallback) _aggregated_cb, test.ml, NULL); - data2.event = gst_event_new_eos (); - thread2 = g_thread_try_new ("gst-check", push_event, &data2, NULL); + g_queue_push_tail (data2.queue, gst_event_new_eos ()); + thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL); g_main_loop_run (test.ml); g_source_remove (test.timeout_id);