gstreamer/tests/check/libs/collectpads.c

1045 lines
30 KiB
C
Raw Normal View History

/*
2012-04-17 12:38:01 +00:00
* collectpads.c - GstCollectPads testsuite
* Copyright (C) 2006 Alessandro Decina <alessandro.d@gmail.com>
*
* Authors:
* Alessandro Decina <alessandro.d@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <gst/check/gstcheck.h>
2012-04-17 12:38:01 +00:00
#include <gst/base/gstcollectpads.h>
/* dummy collectpads based element */
#define GST_TYPE_AGGREGATOR (gst_aggregator_get_type ())
#define GST_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_AGGREGATOR, GstAggregator))
#define GST_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_AGGREGATOR, GstAggregatorClass))
#define GST_AGGREGATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_AGGREGATOR, GstAggregatorClass))
typedef struct _GstAggregator GstAggregator;
typedef struct _GstAggregatorClass GstAggregatorClass;
struct _GstAggregator
{
GstElement parent;
GstCollectPads *collect;
GstPad *srcpad;
GstPad *sinkpad[2];
gint padcount;
gboolean first;
};
struct _GstAggregatorClass
{
GstElementClass parent_class;
};
static GType gst_aggregator_get_type (void);
G_DEFINE_TYPE (GstAggregator, gst_aggregator, GST_TYPE_ELEMENT);
static GstStaticPadTemplate gst_aggregator_src_template =
GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate gst_aggregator_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstFlowReturn
gst_agregator_collected (GstCollectPads * pads, gpointer user_data)
{
GstAggregator *aggregator = GST_AGGREGATOR (user_data);
GstBuffer *inbuf;
GstCollectData *collect_data = NULL;
guint outsize = 0;
GSList *walk;
walk = pads->data;
for (walk = pads->data; walk; walk = walk->next) {
GstCollectData *tmp = (GstCollectData *) walk->data;
if (tmp->buffer) {
collect_data = tmp;
break;
}
}
/* can only happen when no pads to collect or all EOS */
if (collect_data == NULL)
goto eos;
outsize = gst_buffer_get_size (collect_data->buffer);
inbuf = gst_collect_pads_take_buffer (pads, collect_data, outsize);
if (!inbuf)
goto eos;
if (aggregator->first) {
GstSegment segment;
gst_segment_init (&segment, GST_FORMAT_BYTES);
gst_pad_push_event (aggregator->srcpad,
gst_event_new_stream_start ("test"));
gst_pad_push_event (aggregator->srcpad, gst_event_new_segment (&segment));
aggregator->first = FALSE;
}
/* just forward the first buffer */
GST_DEBUG_OBJECT (aggregator, "forward buffer %p", inbuf);
return gst_pad_push (aggregator->srcpad, inbuf);
/* ERRORS */
eos:
{
GST_DEBUG_OBJECT (aggregator, "no data available, must be EOS");
gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
return GST_FLOW_EOS;
}
}
static GstPad *
gst_aggregator_request_new_pad (GstElement * element, GstPadTemplate * templ,
const gchar * unused, const GstCaps * caps)
{
GstAggregator *aggregator = GST_AGGREGATOR (element);
gchar *name;
GstPad *newpad;
gint padcount;
if (templ->direction != GST_PAD_SINK)
return NULL;
/* create new pad */
padcount = g_atomic_int_add (&aggregator->padcount, 1);
name = g_strdup_printf ("sink_%u", padcount);
newpad = gst_pad_new_from_template (templ, name);
g_free (name);
gst_collect_pads_add_pad (aggregator->collect, newpad,
sizeof (GstCollectData), NULL, TRUE);
/* takes ownership of the pad */
if (!gst_element_add_pad (GST_ELEMENT (aggregator), newpad))
goto could_not_add;
GST_DEBUG_OBJECT (aggregator, "added new pad %s", GST_OBJECT_NAME (newpad));
return newpad;
/* errors */
could_not_add:
{
GST_DEBUG_OBJECT (aggregator, "could not add pad");
gst_collect_pads_remove_pad (aggregator->collect, newpad);
gst_object_unref (newpad);
return NULL;
}
}
static void
gst_aggregator_release_pad (GstElement * element, GstPad * pad)
{
GstAggregator *aggregator = GST_AGGREGATOR (element);
if (aggregator->collect)
gst_collect_pads_remove_pad (aggregator->collect, pad);
gst_element_remove_pad (element, pad);
}
static GstStateChangeReturn
gst_aggregator_change_state (GstElement * element, GstStateChange transition)
{
GstAggregator *aggregator = GST_AGGREGATOR (element);
GstStateChangeReturn ret;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
gst_collect_pads_start (aggregator->collect);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
/* need to unblock the collectpads before calling the
* parent change_state so that streaming can finish */
gst_collect_pads_stop (aggregator->collect);
break;
default:
break;
}
ret =
GST_ELEMENT_CLASS (gst_aggregator_parent_class)->change_state (element,
transition);
switch (transition) {
default:
break;
}
return ret;
}
static void
gst_aggregator_dispose (GObject * object)
{
GstAggregator *aggregator = GST_AGGREGATOR (object);
if (aggregator->collect) {
gst_object_unref (aggregator->collect);
aggregator->collect = NULL;
}
G_OBJECT_CLASS (gst_aggregator_parent_class)->dispose (object);
}
static void
gst_aggregator_class_init (GstAggregatorClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
GstElementClass *gstelement_class = (GstElementClass *) klass;
gobject_class->dispose = gst_aggregator_dispose;
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&gst_aggregator_src_template));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&gst_aggregator_sink_template));
gst_element_class_set_static_metadata (gstelement_class, "Aggregator",
"Testing", "Combine N buffers", "Stefan Sauer <ensonic@users.sf.net>");
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
gstelement_class->release_pad =
GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
}
static void
gst_aggregator_init (GstAggregator * agregator)
{
GstPadTemplate *template;
template = gst_static_pad_template_get (&gst_aggregator_src_template);
agregator->srcpad = gst_pad_new_from_template (template, "src");
gst_object_unref (template);
GST_PAD_SET_PROXY_CAPS (agregator->srcpad);
gst_element_add_pad (GST_ELEMENT (agregator), agregator->srcpad);
/* keep track of the sinkpads requested */
agregator->collect = gst_collect_pads_new ();
gst_collect_pads_set_function (agregator->collect,
GST_DEBUG_FUNCPTR (gst_agregator_collected), agregator);
agregator->first = TRUE;
}
static gboolean
gst_agregator_plugin_init (GstPlugin * plugin)
{
return gst_element_register (plugin, "aggregator", GST_RANK_NONE,
GST_TYPE_AGGREGATOR);
}
static gboolean
gst_agregator_plugin_register (void)
{
return gst_plugin_register_static (GST_VERSION_MAJOR,
GST_VERSION_MINOR,
"aggregator",
"Combine buffers",
gst_agregator_plugin_init,
VERSION, GST_LICENSE, PACKAGE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);
}
#define fail_unless_collected(expected) \
G_STMT_START { \
2012-09-12 09:52:25 +00:00
g_mutex_lock (&lock); \
while (expected == TRUE && collected == FALSE) \
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
g_cond_wait (&cond, &lock); \
fail_unless_equals_int (collected, expected); \
2012-09-12 09:52:25 +00:00
g_mutex_unlock (&lock); \
} G_STMT_END;
typedef struct
{
char foo;
2012-04-17 12:38:01 +00:00
} BadCollectData;
typedef struct
{
2012-04-17 12:38:01 +00:00
GstCollectData data;
GstPad *pad;
GstBuffer *buffer;
GstEvent *event;
GstFlowReturn expected_result;
} TestData;
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
2012-04-17 12:38:01 +00:00
static GstCollectPads *collect;
static gboolean collected;
static GstPad *agg_srcpad, *srcpad1, *srcpad2;
static GstPad *sinkpad1, *sinkpad2;
static TestData *data1, *data2;
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
static GstBuffer *outbuf1, *outbuf2;
static GstElement *agg;
gboolean fail_seek;
gint flush_start_events, flush_stop_events;
2012-09-12 09:52:25 +00:00
static GMutex lock;
static GCond cond;
static GstFlowReturn
2012-04-17 12:38:01 +00:00
collected_cb (GstCollectPads * pads, gpointer user_data)
{
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1);
outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2);
2012-09-12 09:52:25 +00:00
g_mutex_lock (&lock);
collected = TRUE;
2012-09-12 09:52:25 +00:00
g_cond_signal (&cond);
g_mutex_unlock (&lock);
return GST_FLOW_OK;
}
static GstFlowReturn
handle_buffer_cb (GstCollectPads * pads, GstCollectData * data,
GstBuffer * buf, gpointer user_data)
{
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
GST_DEBUG ("collected buffers via callback");
outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1);
outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2);
g_mutex_lock (&lock);
collected = TRUE;
g_cond_signal (&cond);
g_mutex_unlock (&lock);
return GST_FLOW_OK;
}
static gpointer
push_buffer (gpointer user_data)
{
GstFlowReturn flow;
GstCaps *caps;
TestData *test_data = (TestData *) user_data;
GstSegment segment;
gst_pad_push_event (test_data->pad, gst_event_new_stream_start ("test"));
caps = gst_caps_new_empty_simple ("foo/x-bar");
gst_pad_push_event (test_data->pad, gst_event_new_caps (caps));
gst_caps_unref (caps);
gst_segment_init (&segment, GST_FORMAT_TIME);
gst_pad_push_event (test_data->pad, gst_event_new_segment (&segment));
flow = gst_pad_push (test_data->pad, test_data->buffer);
fail_unless (flow == test_data->expected_result, "got flow %s instead of OK",
gst_flow_get_name (flow));
return NULL;
}
static gpointer
push_event (gpointer user_data)
{
TestData *test_data = (TestData *) user_data;
fail_unless (gst_pad_push_event (test_data->pad, test_data->event) == TRUE);
return NULL;
}
static void
setup_default (void)
{
2012-04-17 12:38:01 +00:00
collect = gst_collect_pads_new ();
srcpad1 = gst_pad_new_from_static_template (&srctemplate, "src1");
srcpad2 = gst_pad_new_from_static_template (&srctemplate, "src2");
sinkpad1 = gst_pad_new_from_static_template (&sinktemplate, "sink1");
sinkpad2 = gst_pad_new_from_static_template (&sinktemplate, "sink2");
fail_unless (gst_pad_link (srcpad1, sinkpad1) == GST_PAD_LINK_OK);
fail_unless (gst_pad_link (srcpad2, sinkpad2) == GST_PAD_LINK_OK);
gst_pad_set_active (sinkpad1, TRUE);
gst_pad_set_active (sinkpad2, TRUE);
gst_pad_set_active (srcpad1, TRUE);
gst_pad_set_active (srcpad2, TRUE);
data1 = NULL;
data2 = NULL;
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
outbuf1 = NULL;
outbuf2 = NULL;
collected = FALSE;
}
static void
setup (void)
{
setup_default ();
gst_collect_pads_set_function (collect, collected_cb, NULL);
}
static void
setup_buffer_cb (void)
{
setup_default ();
gst_collect_pads_set_buffer_function (collect, handle_buffer_cb, NULL);
}
static void
Correct all relevant warnings found by the sparse semantic code analyzer. This include marking several symbols static... Original commit message from CVS: * gst/gstconfig.h.in: * libs/gst/base/gstcollectpads.c: (gst_collect_pads_read_buffer): * libs/gst/check/gstcheck.c: (gst_check_log_message_func), (gst_check_log_critical_func), (gst_check_drop_buffers), (gst_check_element_push_buffer_list): * libs/gst/controller/gstcontroller.c: (gst_controller_get), (gst_controller_get_type): * libs/gst/controller/gsthelper.c: (gst_object_control_properties), (gst_object_get_controller), (gst_object_get_control_source): * libs/gst/controller/gstinterpolationcontrolsource.c: (gst_interpolation_control_source_new): * libs/gst/controller/gstlfocontrolsource.c: (gst_lfo_control_source_new): * libs/gst/dataprotocol/dataprotocol.c: (gst_dp_event_from_packet_0_2): * plugins/elements/gstfdsrc.c: * plugins/elements/gstmultiqueue.c: * plugins/elements/gsttee.c: * plugins/elements/gsttypefindelement.c: * plugins/indexers/gstfileindex.c: (_file_index_id_save_xml), (gst_file_index_add_association): * plugins/indexers/gstmemindex.c: * tests/benchmarks/gstpollstress.c: (mess_some_more): * tests/check/elements/queue.c: (setup_queue): * tests/check/gst/gstpipeline.c: * tests/check/libs/collectpads.c: (setup), (teardown), (gst_collect_pads_suite): * tests/examples/adapter/adapter_test.c: * tests/examples/metadata/read-metadata.c: (make_pipeline): * tests/examples/xml/createxml.c: * tests/examples/xml/runxml.c: * tools/gst-inspect.c: * tools/gst-run.c: Correct all relevant warnings found by the sparse semantic code analyzer. This include marking several symbols static, using NULL instead of 0 for pointers, not using variable sized arrays on the stack, moving variable declarations to the beginning of a block and using "foo (void)" instead of "foo ()" for declarations.
2008-02-29 12:41:33 +00:00
teardown (void)
{
gst_object_unref (sinkpad1);
gst_object_unref (sinkpad2);
gst_object_unref (collect);
}
GST_START_TEST (test_pad_add_remove)
{
2012-04-17 12:38:01 +00:00
ASSERT_CRITICAL (gst_collect_pads_add_pad (collect, sinkpad1,
2012-09-12 21:12:14 +00:00
sizeof (BadCollectData), NULL, TRUE));
2012-04-17 12:38:01 +00:00
data1 = (TestData *) gst_collect_pads_add_pad (collect,
sinkpad1, sizeof (TestData), NULL, TRUE);
fail_unless (data1 != NULL);
2012-04-17 12:38:01 +00:00
fail_unless (gst_collect_pads_remove_pad (collect, sinkpad2) == FALSE);
fail_unless (gst_collect_pads_remove_pad (collect, sinkpad1) == TRUE);
}
GST_END_TEST;
GST_START_TEST (test_collect)
{
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
GstBuffer *buf1, *buf2;
GThread *thread1, *thread2;
2012-04-17 12:38:01 +00:00
data1 = (TestData *) gst_collect_pads_add_pad (collect,
sinkpad1, sizeof (TestData), NULL, TRUE);
fail_unless (data1 != NULL);
2012-04-17 12:38:01 +00:00
data2 = (TestData *) gst_collect_pads_add_pad (collect,
sinkpad2, sizeof (TestData), NULL, TRUE);
fail_unless (data2 != NULL);
buf1 = gst_buffer_new ();
buf2 = gst_buffer_new ();
/* start collect pads */
2012-04-17 12:38:01 +00:00
gst_collect_pads_start (collect);
/* push buffers on the pads */
data1->pad = srcpad1;
data1->buffer = buf1;
2012-09-12 09:52:25 +00:00
thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
/* here thread1 is blocked and srcpad1 has a queued buffer */
fail_unless_collected (FALSE);
data2->pad = srcpad2;
data2->buffer = buf2;
2012-09-12 09:52:25 +00:00
thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
/* now both pads have a buffer */
fail_unless_collected (TRUE);
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
fail_unless (outbuf1 == buf1);
fail_unless (outbuf2 == buf2);
/* these will return immediately as at this point the threads have been
* unlocked and are finished */
g_thread_join (thread1);
g_thread_join (thread2);
2012-04-17 12:38:01 +00:00
gst_collect_pads_stop (collect);
gst_buffer_unref (buf1);
gst_buffer_unref (buf2);
}
GST_END_TEST;
GST_START_TEST (test_collect_eos)
{
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
GstBuffer *buf1;
GThread *thread1, *thread2;
2012-04-17 12:38:01 +00:00
data1 = (TestData *) gst_collect_pads_add_pad (collect,
sinkpad1, sizeof (TestData), NULL, TRUE);
fail_unless (data1 != NULL);
2012-04-17 12:38:01 +00:00
data2 = (TestData *) gst_collect_pads_add_pad (collect,
sinkpad2, sizeof (TestData), NULL, TRUE);
fail_unless (data2 != NULL);
buf1 = gst_buffer_new ();
/* start collect pads */
2012-04-17 12:38:01 +00:00
gst_collect_pads_start (collect);
/* push a buffer on srcpad1 and EOS on srcpad2 */
data1->pad = srcpad1;
data1->buffer = buf1;
2012-09-12 09:52:25 +00:00
thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
/* here thread1 is blocked and srcpad1 has a queued buffer */
fail_unless_collected (FALSE);
data2->pad = srcpad2;
data2->event = gst_event_new_eos ();
2012-09-12 09:52:25 +00:00
thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
/* now sinkpad1 has a buffer and sinkpad2 has EOS */
fail_unless_collected (TRUE);
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
fail_unless (outbuf1 == buf1);
/* sinkpad2 has EOS so a NULL buffer is returned */
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
fail_unless (outbuf2 == NULL);
/* these will return immediately as when the data is popped the threads are
* unlocked and will terminate */
g_thread_join (thread1);
g_thread_join (thread2);
2012-04-17 12:38:01 +00:00
gst_collect_pads_stop (collect);
gst_buffer_unref (buf1);
}
GST_END_TEST;
GST_START_TEST (test_collect_twice)
{
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
GstBuffer *buf1, *buf2;
GThread *thread1, *thread2;
2012-04-17 12:38:01 +00:00
data1 = (TestData *) gst_collect_pads_add_pad (collect,
sinkpad1, sizeof (TestData), NULL, TRUE);
fail_unless (data1 != NULL);
2012-04-17 12:38:01 +00:00
data2 = (TestData *) gst_collect_pads_add_pad (collect,
sinkpad2, sizeof (TestData), NULL, TRUE);
fail_unless (data2 != NULL);
GST_INFO ("round 1");
buf1 = gst_buffer_new ();
/* start collect pads */
2012-04-17 12:38:01 +00:00
gst_collect_pads_start (collect);
/* queue a buffer */
data1->pad = srcpad1;
data1->buffer = buf1;
2012-09-12 09:52:25 +00:00
thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
/* here thread1 is blocked and srcpad1 has a queued buffer */
fail_unless_collected (FALSE);
/* push EOS on the other pad */
data2->pad = srcpad2;
data2->event = gst_event_new_eos ();
2012-09-12 09:52:25 +00:00
thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
/* one of the pads has a buffer, the other has EOS */
fail_unless_collected (TRUE);
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
fail_unless (outbuf1 == buf1);
/* there's nothing to pop from the one which received EOS */
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
fail_unless (outbuf2 == NULL);
/* these will return immediately as at this point the threads have been
* unlocked and are finished */
g_thread_join (thread1);
g_thread_join (thread2);
2012-04-17 12:38:01 +00:00
gst_collect_pads_stop (collect);
collected = FALSE;
GST_INFO ("round 2");
buf2 = gst_buffer_new ();
/* clear EOS from pads */
gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE));
gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
/* start collect pads */
2012-04-17 12:38:01 +00:00
gst_collect_pads_start (collect);
/* push buffers on the pads */
data1->pad = srcpad1;
data1->buffer = buf1;
2012-09-12 09:52:25 +00:00
thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
/* here thread1 is blocked and srcpad1 has a queued buffer */
fail_unless_collected (FALSE);
data2->pad = srcpad2;
data2->buffer = buf2;
2012-09-12 09:52:25 +00:00
thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
/* now both pads have a buffer */
fail_unless_collected (TRUE);
/* these will return immediately as at this point the threads have been
* unlocked and are finished */
g_thread_join (thread1);
g_thread_join (thread2);
2012-04-17 12:38:01 +00:00
gst_collect_pads_stop (collect);
gst_buffer_unref (buf1);
gst_buffer_unref (buf2);
}
GST_END_TEST;
/* Test the default collected buffer func */
GST_START_TEST (test_collect_default)
{
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
GstBuffer *buf1, *buf2;
GThread *thread1, *thread2;
data1 = (TestData *) gst_collect_pads_add_pad (collect,
sinkpad1, sizeof (TestData), NULL, TRUE);
fail_unless (data1 != NULL);
data2 = (TestData *) gst_collect_pads_add_pad (collect,
sinkpad2, sizeof (TestData), NULL, TRUE);
fail_unless (data2 != NULL);
buf1 = gst_buffer_new ();
GST_BUFFER_TIMESTAMP (buf1) = 0;
buf2 = gst_buffer_new ();
GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
/* start collect pads */
gst_collect_pads_start (collect);
/* push buffers on the pads */
data1->pad = srcpad1;
data1->buffer = buf1;
thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
/* here thread1 is blocked and srcpad1 has a queued buffer */
fail_unless_collected (FALSE);
data2->pad = srcpad2;
data2->buffer = buf2;
thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
/* now both pads have a buffer */
fail_unless_collected (TRUE);
/* The default callback should have popped the buffer with lower timestamp,
* and this should therefore be NULL: */
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
fail_unless (outbuf1 == NULL);
/* While this one should still be pending: */
tests: fix spurious failure in test_collect collectpads test pop() in collected callback. There were three threads in the test cases that hanged: the test thread and two threads that push buffers. Each thread push one buffer on one pad. There are two pads in the collectpads so the second buffer will trigger the collect-callback. This is what happens when the hang occurs: The first thread pushes a buffer and initializes a cookie to the value of a counter in the collectpads object and waits on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). The second thread pushes a buffer and calls the collected callback, which signals the cond that the test thread is waiting for. The test thread pops both buffers (without holding any lock). Each call to _pop() increases the counter broadcasts the condition that the first thread is now waiting for. It then joins both threads (hangs). The first thread wakes up and returns, since its buffer has been consumed. The second thread starts executing again. When the callback, called by the second thread, has returned it initializes a cookie to the value of a counter, which has already prematurely been increased by the test thread when it popped the buffers, and wait's on a cond for the counter to change and for someone to consume the buffer (i.e. _pop() it). Since the buffer has already been poped and the counter has already been increased it will be stuck forever. https://bugzilla.gnome.org/show_bug.cgi?id=685555
2013-03-07 11:11:30 +00:00
fail_unless (outbuf2 == buf2);
/* these will return immediately as at this point the threads have been
* unlocked and are finished */
g_thread_join (thread1);
g_thread_join (thread2);
gst_collect_pads_stop (collect);
gst_buffer_unref (buf1);
gst_buffer_unref (buf2);
}
GST_END_TEST;
#define NUM_BUFFERS 3
static void
handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
{
*count = *count + 1;
}
/* Test a linear pipeline using aggregator */
GST_START_TEST (test_linear_pipeline)
{
GstElement *pipeline, *src, *agg, *sink;
GstBus *bus;
GstMessage *msg;
gint count = 0;
pipeline = gst_pipeline_new ("pipeline");
src = gst_check_setup_element ("fakesrc");
g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
NULL);
agg = gst_check_setup_element ("aggregator");
sink = gst_check_setup_element ("fakesink");
g_object_set (sink, "signal-handoffs", TRUE, NULL);
g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
fail_unless (gst_bin_add (GST_BIN (pipeline), src));
fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
fail_unless (gst_element_link (src, agg));
fail_unless (gst_element_link (agg, sink));
bus = gst_element_get_bus (pipeline);
fail_if (bus == NULL);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
gst_message_unref (msg);
fail_unless_equals_int (count, NUM_BUFFERS);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (bus);
gst_object_unref (pipeline);
}
GST_END_TEST;
/* Test a linear pipeline using aggregator */
GST_START_TEST (test_branched_pipeline)
{
GstElement *pipeline, *src, *tee, *queue[2], *agg, *sink;
GstBus *bus;
GstMessage *msg;
gint count = 0;
pipeline = gst_pipeline_new ("pipeline");
src = gst_check_setup_element ("fakesrc");
g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
NULL);
tee = gst_check_setup_element ("tee");
queue[0] = gst_check_setup_element ("queue");
gst_object_set_name (GST_OBJECT (queue[0]), "queue0");
queue[1] = gst_check_setup_element ("queue");
gst_object_set_name (GST_OBJECT (queue[1]), "queue1");
agg = gst_check_setup_element ("aggregator");
sink = gst_check_setup_element ("fakesink");
g_object_set (sink, "signal-handoffs", TRUE, NULL);
g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
fail_unless (gst_bin_add (GST_BIN (pipeline), src));
fail_unless (gst_bin_add (GST_BIN (pipeline), tee));
fail_unless (gst_bin_add (GST_BIN (pipeline), queue[0]));
fail_unless (gst_bin_add (GST_BIN (pipeline), queue[1]));
fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
fail_unless (gst_element_link (src, tee));
fail_unless (gst_element_link (tee, queue[0]));
fail_unless (gst_element_link (tee, queue[1]));
fail_unless (gst_element_link (queue[0], agg));
fail_unless (gst_element_link (queue[1], agg));
fail_unless (gst_element_link (agg, sink));
bus = gst_element_get_bus (pipeline);
fail_if (bus == NULL);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
gst_message_unref (msg);
/* we have two branches, but we still only forward buffers from one branch */
fail_unless_equals_int (count, NUM_BUFFERS * 2);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (bus);
gst_object_unref (pipeline);
}
GST_END_TEST;
static GstPadProbeReturn
downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
GST_EVENT_FLUSH_START)
g_atomic_int_inc (&flush_start_events);
else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
GST_EVENT_FLUSH_STOP)
g_atomic_int_inc (&flush_stop_events);
} else if (info->type & GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM) {
g_mutex_lock (&lock);
collected = TRUE;
g_cond_signal (&cond);
g_mutex_unlock (&lock);
}
return GST_PAD_PROBE_DROP;
}
static gboolean
src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
gboolean ret = TRUE;
if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
if (g_atomic_int_compare_and_exchange (&fail_seek, TRUE, FALSE) == TRUE) {
ret = FALSE;
}
}
gst_event_unref (event);
return ret;
}
static gboolean
agg_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
return gst_collect_pads_src_event_default (GST_AGGREGATOR (parent)->collect,
pad, event);
}
static GstPad *
setup_src_pad (GstElement * element,
GstStaticPadTemplate * tmpl, const char *name)
{
GstPad *srcpad, *sinkpad;
srcpad = gst_pad_new_from_static_template (tmpl, "src");
sinkpad = gst_element_get_request_pad (element, name);
fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
"Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
gst_pad_set_event_function (srcpad, src_event);
gst_pad_set_active (srcpad, TRUE);
return srcpad;
}
static void
flush_setup (void)
{
agg = gst_check_setup_element ("aggregator");
agg_srcpad = gst_element_get_static_pad (agg, "src");
srcpad1 = setup_src_pad (agg, &srctemplate, "sink_0");
srcpad2 = setup_src_pad (agg, &srctemplate, "sink_1");
gst_pad_add_probe (agg_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM |
GST_PAD_PROBE_TYPE_EVENT_FLUSH, downstream_probe_cb, NULL, NULL);
gst_pad_set_event_function (agg_srcpad, agg_src_event);
data1 = g_new0 (TestData, 1);
data2 = g_new0 (TestData, 1);
g_atomic_int_set (&flush_start_events, 0);
g_atomic_int_set (&flush_stop_events, 0);
gst_element_set_state (agg, GST_STATE_PLAYING);
}
static void
flush_teardown (void)
{
gst_element_set_state (agg, GST_STATE_NULL);
gst_object_unref (agg);
gst_object_unref (agg_srcpad);
gst_object_unref (srcpad1);
gst_object_unref (srcpad2);
g_free (data1);
g_free (data2);
}
GST_START_TEST (test_flushing_seek_failure)
{
GstBuffer *buf1, *buf2;
GThread *thread1, *thread2;
GstEvent *event;
/* Queue a buffer in agg:sink_1. Do a flushing seek and simulate one upstream
* element failing to handle the seek (see src_event()). Check that the
* flushing seek logic doesn't get triggered by checking that the buffer
* queued on agg:sink_1 doesn't get flushed.
*/
/* queue a buffer in agg:sink_1 */
buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
data2->pad = srcpad2;
data2->buffer = buf2;
thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
fail_unless_collected (FALSE);
/* do the seek */
event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
g_atomic_int_set (&fail_seek, TRUE);
fail_if (gst_pad_send_event (agg_srcpad, event));
/* flush srcpad1 (pretending it's the upstream that didn't fail to seek) */
fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
/* check that the flush events reached agg:src */
fail_unless_equals_int (flush_start_events, 1);
fail_unless_equals_int (flush_stop_events, 1);
/* push a buffer on agg:sink_0. This should trigger a collect since agg:sink_1
* should not have been flushed at this point */
buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
GST_BUFFER_TIMESTAMP (buf1) = 0;
data1->pad = srcpad1;
data1->buffer = buf1;
thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
fail_unless_collected (TRUE);
collected = FALSE;
/* at this point thread1 must have returned */
g_thread_join (thread1);
/* push eos on agg:sink_0 so the buffer queued in agg:sink_1 is collected and
* the pushing thread returns */
data1->pad = srcpad1;
data1->event = gst_event_new_eos ();
thread1 = g_thread_try_new ("gst-check", push_event, data1, NULL);
fail_unless_collected (TRUE);
g_thread_join (thread1);
g_thread_join (thread2);
}
GST_END_TEST;
GST_START_TEST (test_flushing_seek)
{
GstBuffer *buf1, *buf2;
GThread *thread1, *thread2;
GstEvent *event;
/* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the
* new flushing seek logic is triggered. On the first FLUSH_START call the
* buffers queued in collectpads should get flushed. Only one FLUSH_START and
* one FLUSH_STOP should be forwarded downstream.
*/
buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
GST_BUFFER_TIMESTAMP (buf2) = 0;
data2->pad = srcpad2;
data2->buffer = buf2;
/* expect this buffer to be flushed */
data2->expected_result = GST_FLOW_FLUSHING;
thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
/* now do a successful flushing seek */
event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
g_atomic_int_set (&fail_seek, FALSE);
fail_unless (gst_pad_send_event (agg_srcpad, event));
/* flushing starts once one of the upstream elements sends the first
* FLUSH_START */
fail_unless_equals_int (flush_start_events, 0);
fail_unless_equals_int (flush_stop_events, 0);
/* flush ogg:sink_0. This flushs collectpads, calls ::flush() and sends
* FLUSH_START downstream */
fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
fail_unless_equals_int (flush_start_events, 1);
fail_unless_equals_int (flush_stop_events, 0);
/* the first FLUSH_STOP is forwarded downstream */
fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
fail_unless_equals_int (flush_start_events, 1);
fail_unless_equals_int (flush_stop_events, 1);
/* at this point even the other pad agg:sink_1 should be flushing so thread2
* should have stopped */
g_thread_join (thread2);
/* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
* that flushing completes once all the pads have been flushed */
buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
GST_BUFFER_TIMESTAMP (buf1) = GST_SECOND;
data1->pad = srcpad1;
data1->buffer = buf1;
thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
/* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
* sent downstream */
gst_pad_push_event (srcpad2, gst_event_new_flush_start ());
gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
/* still, only one FLUSH_START and one FLUSH_STOP are forwarded downstream */
fail_unless_equals_int (flush_start_events, 1);
fail_unless_equals_int (flush_stop_events, 1);
/* EOS agg:sink_1 so the buffer queued in agg:sink_0 is collected */
data2->pad = srcpad2;
data2->event = gst_event_new_eos ();
thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
fail_unless_collected (TRUE);
/* these will return immediately as at this point the threads have been
* unlocked and are finished */
g_thread_join (thread1);
g_thread_join (thread2);
}
GST_END_TEST;
static Suite *
2012-04-17 12:38:01 +00:00
gst_collect_pads_suite (void)
{
Suite *suite;
TCase *general, *buffers, *pipeline, *flush;
gst_agregator_plugin_register ();
2012-04-17 12:38:01 +00:00
suite = suite_create ("GstCollectPads");
general = tcase_create ("general");
suite_add_tcase (suite, general);
tcase_add_checked_fixture (general, setup, teardown);
tcase_add_test (general, test_pad_add_remove);
tcase_add_test (general, test_collect);
tcase_add_test (general, test_collect_eos);
tcase_add_test (general, test_collect_twice);
buffers = tcase_create ("buffers");
suite_add_tcase (suite, buffers);
tcase_add_checked_fixture (buffers, setup_buffer_cb, teardown);
tcase_add_test (buffers, test_collect_default);
pipeline = tcase_create ("pipeline");
suite_add_tcase (suite, pipeline);
tcase_add_test (pipeline, test_linear_pipeline);
tcase_add_test (pipeline, test_branched_pipeline);
flush = tcase_create ("flush");
suite_add_tcase (suite, flush);
tcase_add_checked_fixture (flush, flush_setup, flush_teardown);
tcase_add_test (flush, test_flushing_seek_failure);
tcase_add_test (flush, test_flushing_seek);
return suite;
}
2012-04-17 12:38:01 +00:00
GST_CHECK_MAIN (gst_collect_pads);