tests: collectpads: add flushing seek tests

https://bugzilla.gnome.org/show_bug.cgi?id=708416
This commit is contained in:
Alessandro Decina 2013-09-16 08:35:37 +02:00 committed by Sebastian Dröge
parent d063623851
commit f52b5ddcd2

View file

@ -291,6 +291,7 @@ typedef struct
GstPad *pad;
GstBuffer *buffer;
GstEvent *event;
GstFlowReturn expected_result;
} TestData;
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
@ -305,10 +306,13 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
static GstCollectPads *collect;
static gboolean collected;
static GstPad *srcpad1, *srcpad2;
static GstPad *agg_srcpad, *srcpad1, *srcpad2;
static GstPad *sinkpad1, *sinkpad2;
static TestData *data1, *data2;
static GstBuffer *outbuf1, *outbuf2;
static GstElement *agg;
gboolean fail_seek;
gint flush_start_events, flush_stop_events;
static GMutex lock;
static GCond cond;
@ -362,7 +366,7 @@ push_buffer (gpointer user_data)
gst_pad_push_event (test_data->pad, gst_event_new_segment (&segment));
flow = gst_pad_push (test_data->pad, test_data->buffer);
fail_unless (flow == GST_FLOW_OK, "got flow %s instead of OK",
fail_unless (flow == test_data->expected_result, "got flow %s instead of OK",
gst_flow_get_name (flow));
return NULL;
@ -779,16 +783,237 @@ GST_START_TEST (test_branched_pipeline)
GST_END_TEST;
static GstPadProbeReturn
downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
GST_EVENT_FLUSH_START)
g_atomic_int_inc (&flush_start_events);
else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
GST_EVENT_FLUSH_STOP)
g_atomic_int_inc (&flush_stop_events);
} else if (info->type & GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM) {
g_mutex_lock (&lock);
collected = TRUE;
g_cond_signal (&cond);
g_mutex_unlock (&lock);
}
return GST_PAD_PROBE_DROP;
}
static gboolean
src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
gboolean ret = TRUE;
if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
if (g_atomic_int_compare_and_exchange (&fail_seek, TRUE, FALSE) == TRUE) {
ret = FALSE;
}
}
gst_event_unref (event);
return ret;
}
static gboolean
agg_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
return gst_collect_pads_src_event_default (GST_AGGREGATOR (parent)->collect,
pad, event);
}
static GstPad *
setup_src_pad (GstElement * element,
GstStaticPadTemplate * tmpl, const char *name)
{
GstPad *srcpad, *sinkpad;
srcpad = gst_pad_new_from_static_template (tmpl, "src");
sinkpad = gst_element_get_request_pad (element, name);
fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
"Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
gst_pad_set_event_function (srcpad, src_event);
gst_pad_set_active (srcpad, TRUE);
return srcpad;
}
static void
flush_setup (void)
{
agg = gst_check_setup_element ("aggregator");
agg_srcpad = gst_element_get_static_pad (agg, "src");
srcpad1 = setup_src_pad (agg, &srctemplate, "sink_0");
srcpad2 = setup_src_pad (agg, &srctemplate, "sink_1");
gst_pad_add_probe (agg_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM |
GST_PAD_PROBE_TYPE_EVENT_FLUSH, downstream_probe_cb, NULL, NULL);
gst_pad_set_event_function (agg_srcpad, agg_src_event);
data1 = g_new0 (TestData, 1);
data2 = g_new0 (TestData, 1);
g_atomic_int_set (&flush_start_events, 0);
g_atomic_int_set (&flush_stop_events, 0);
gst_element_set_state (agg, GST_STATE_PLAYING);
}
static void
flush_teardown (void)
{
gst_element_set_state (agg, GST_STATE_NULL);
gst_object_unref (agg);
gst_object_unref (agg_srcpad);
gst_object_unref (srcpad1);
gst_object_unref (srcpad2);
g_free (data1);
g_free (data2);
}
GST_START_TEST (test_flushing_seek_failure)
{
GstBuffer *buf1, *buf2;
GThread *thread1, *thread2;
GstEvent *event;
/* Queue a buffer in agg:sink_1. Do a flushing seek and simulate one upstream
* element failing to handle the seek (see src_event()). Check that the
* flushing seek logic doesn't get triggered by checking that the buffer
* queued on agg:sink_1 doesn't get flushed.
*/
/* queue a buffer in agg:sink_1 */
buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
data2->pad = srcpad2;
data2->buffer = buf2;
thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
fail_unless_collected (FALSE);
/* do the seek */
event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
g_atomic_int_set (&fail_seek, TRUE);
fail_if (gst_pad_send_event (agg_srcpad, event));
/* flush srcpad1 (pretending it's the upstream that didn't fail to seek) */
fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
/* check that the flush events reached agg:src */
fail_unless_equals_int (flush_start_events, 1);
fail_unless_equals_int (flush_stop_events, 1);
/* push a buffer on agg:sink_0. This should trigger a collect since agg:sink_1
* should not have been flushed at this point */
buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
GST_BUFFER_TIMESTAMP (buf1) = 0;
data1->pad = srcpad1;
data1->buffer = buf1;
thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
fail_unless_collected (TRUE);
collected = FALSE;
/* at this point thread1 must have returned */
g_thread_join (thread1);
/* push eos on agg:sink_0 so the buffer queued in agg:sink_1 is collected and
* the pushing thread returns */
data1->pad = srcpad1;
data1->event = gst_event_new_eos ();
thread1 = g_thread_try_new ("gst-check", push_event, data1, NULL);
fail_unless_collected (TRUE);
g_thread_join (thread1);
g_thread_join (thread2);
}
GST_END_TEST;
GST_START_TEST (test_flushing_seek)
{
GstBuffer *buf1, *buf2;
GThread *thread1, *thread2;
GstEvent *event;
/* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the
* new flushing seek logic is triggered. On the first FLUSH_START call the
* buffers queued in collectpads should get flushed. Only one FLUSH_START and
* one FLUSH_STOP should be forwarded downstream.
*/
buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
GST_BUFFER_TIMESTAMP (buf2) = 0;
data2->pad = srcpad2;
data2->buffer = buf2;
/* expect this buffer to be flushed */
data2->expected_result = GST_FLOW_FLUSHING;
thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
/* now do a successful flushing seek */
event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
g_atomic_int_set (&fail_seek, FALSE);
fail_unless (gst_pad_send_event (agg_srcpad, event));
/* flushing starts once one of the upstream elements sends the first
* FLUSH_START */
fail_unless_equals_int (flush_start_events, 0);
fail_unless_equals_int (flush_stop_events, 0);
/* flush ogg:sink_0. This flushs collectpads, calls ::flush() and sends
* FLUSH_START downstream */
fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
fail_unless_equals_int (flush_start_events, 1);
fail_unless_equals_int (flush_stop_events, 0);
/* the first FLUSH_STOP is forwarded downstream */
fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
fail_unless_equals_int (flush_start_events, 1);
fail_unless_equals_int (flush_stop_events, 1);
/* at this point even the other pad agg:sink_1 should be flushing so thread2
* should have stopped */
g_thread_join (thread2);
/* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
* that flushing completes once all the pads have been flushed */
buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
GST_BUFFER_TIMESTAMP (buf1) = GST_SECOND;
data1->pad = srcpad1;
data1->buffer = buf1;
thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
/* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
* sent downstream */
gst_pad_push_event (srcpad2, gst_event_new_flush_start ());
gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
/* still, only one FLUSH_START and one FLUSH_STOP are forwarded downstream */
fail_unless_equals_int (flush_start_events, 1);
fail_unless_equals_int (flush_stop_events, 1);
/* EOS agg:sink_1 so the buffer queued in agg:sink_0 is collected */
data2->pad = srcpad2;
data2->event = gst_event_new_eos ();
thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
fail_unless_collected (TRUE);
/* these will return immediately as at this point the threads have been
* unlocked and are finished */
g_thread_join (thread1);
g_thread_join (thread2);
}
GST_END_TEST;
static Suite *
gst_collect_pads_suite (void)
{
Suite *suite;
TCase *general, *buffers, *pipeline;
TCase *general, *buffers, *pipeline, *flush;
gst_agregator_plugin_register ();
suite = suite_create ("GstCollectPads");
general = tcase_create ("general");
suite_add_tcase (suite, general);
tcase_add_checked_fixture (general, setup, teardown);
@ -807,6 +1032,12 @@ gst_collect_pads_suite (void)
tcase_add_test (pipeline, test_linear_pipeline);
tcase_add_test (pipeline, test_branched_pipeline);
flush = tcase_create ("flush");
suite_add_tcase (suite, flush);
tcase_add_checked_fixture (flush, flush_setup, flush_teardown);
tcase_add_test (flush, test_flushing_seek_failure);
tcase_add_test (flush, test_flushing_seek);
return suite;
}