collectpads: add two more tests using collectpads within an element

Add a static plugin with a rudimentary element using collectpads and do some
pipeline based tests.
This commit is contained in:
Stefan Sauer 2013-02-18 20:47:04 +01:00
parent f1df4c13a1
commit 03e81ca8a2

View file

@ -21,9 +21,232 @@
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <gst/check/gstcheck.h>
#include <gst/base/gstcollectpads.h>
/* dummy collectpads based element */
#define GST_TYPE_AGGREGATOR (gst_aggregator_get_type ())
#define GST_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_AGGREGATOR, GstAggregator))
#define GST_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_AGGREGATOR, GstAggregatorClass))
#define GST_AGGREGATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_AGGREGATOR, GstAggregatorClass))
typedef struct _GstAggregator GstAggregator;
typedef struct _GstAggregatorClass GstAggregatorClass;
struct _GstAggregator
{
GstElement parent;
GstCollectPads *collect;
GstPad *srcpad;
GstPad *sinkpad[2];
gint padcount;
};
struct _GstAggregatorClass
{
GstElementClass parent_class;
};
static GType gst_aggregator_get_type (void);
G_DEFINE_TYPE (GstAggregator, gst_aggregator, GST_TYPE_ELEMENT);
static GstStaticPadTemplate gst_aggregator_src_template =
GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate gst_aggregator_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
static GstFlowReturn
gst_agregator_collected (GstCollectPads * pads, gpointer user_data)
{
GstAggregator *aggregator = GST_AGGREGATOR (user_data);
GstBuffer *inbuf;
GstCollectData *collect_data = (GstCollectData *) pads->data->data;
guint outsize = gst_collect_pads_available (pads);
/* can only happen when no pads to collect or all EOS */
if (outsize == 0)
goto eos;
inbuf = gst_collect_pads_take_buffer (pads, collect_data, outsize);
if (!inbuf)
goto eos;
/* just forward the first buffer */
GST_DEBUG_OBJECT (aggregator, "forward buffer %p", inbuf);
return gst_pad_push (aggregator->srcpad, inbuf);
/* ERRORS */
eos:
{
GST_DEBUG_OBJECT (aggregator, "no data available, must be EOS");
gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
return GST_FLOW_EOS;
}
}
static GstPad *
gst_aggregator_request_new_pad (GstElement * element, GstPadTemplate * templ,
const gchar * unused, const GstCaps * caps)
{
GstAggregator *aggregator = GST_AGGREGATOR (element);
gchar *name;
GstPad *newpad;
gint padcount;
if (templ->direction != GST_PAD_SINK)
return NULL;
/* create new pad */
padcount = g_atomic_int_add (&aggregator->padcount, 1);
name = g_strdup_printf ("sink_%u", padcount);
newpad = gst_pad_new_from_template (templ, name);
g_free (name);
gst_collect_pads_add_pad (aggregator->collect, newpad,
sizeof (GstCollectData), NULL, TRUE);
/* takes ownership of the pad */
if (!gst_element_add_pad (GST_ELEMENT (aggregator), newpad))
goto could_not_add;
GST_DEBUG_OBJECT (aggregator, "added new pad %s", GST_OBJECT_NAME (newpad));
return newpad;
/* errors */
could_not_add:
{
GST_DEBUG_OBJECT (aggregator, "could not add pad");
gst_collect_pads_remove_pad (aggregator->collect, newpad);
gst_object_unref (newpad);
return NULL;
}
}
static void
gst_aggregator_release_pad (GstElement * element, GstPad * pad)
{
GstAggregator *aggregator = GST_AGGREGATOR (element);
if (aggregator->collect)
gst_collect_pads_remove_pad (aggregator->collect, pad);
gst_element_remove_pad (element, pad);
}
static GstStateChangeReturn
gst_aggregator_change_state (GstElement * element, GstStateChange transition)
{
GstAggregator *aggregator = GST_AGGREGATOR (element);
GstStateChangeReturn ret;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
gst_collect_pads_start (aggregator->collect);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
/* need to unblock the collectpads before calling the
* parent change_state so that streaming can finish */
gst_collect_pads_stop (aggregator->collect);
break;
default:
break;
}
ret =
GST_ELEMENT_CLASS (gst_aggregator_parent_class)->change_state (element,
transition);
switch (transition) {
default:
break;
}
return ret;
}
static void
gst_aggregator_dispose (GObject * object)
{
GstAggregator *aggregator = GST_AGGREGATOR (object);
if (aggregator->collect) {
gst_object_unref (aggregator->collect);
aggregator->collect = NULL;
}
G_OBJECT_CLASS (gst_aggregator_parent_class)->dispose (object);
}
static void
gst_aggregator_class_init (GstAggregatorClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
GstElementClass *gstelement_class = (GstElementClass *) klass;
gobject_class->dispose = gst_aggregator_dispose;
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&gst_aggregator_src_template));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&gst_aggregator_sink_template));
gst_element_class_set_static_metadata (gstelement_class, "Aggregator",
"Testing", "Combine N buffers", "Stefan Sauer <ensonic@users.sf.net>");
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
gstelement_class->release_pad =
GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
}
static void
gst_aggregator_init (GstAggregator * agregator)
{
GstPadTemplate *template;
template = gst_static_pad_template_get (&gst_aggregator_src_template);
agregator->srcpad = gst_pad_new_from_template (template, "src");
gst_object_unref (template);
GST_PAD_SET_PROXY_CAPS (agregator->srcpad);
gst_element_add_pad (GST_ELEMENT (agregator), agregator->srcpad);
/* keep track of the sinkpads requested */
agregator->collect = gst_collect_pads_new ();
gst_collect_pads_set_function (agregator->collect,
GST_DEBUG_FUNCPTR (gst_agregator_collected), agregator);
}
static gboolean
gst_agregator_plugin_init (GstPlugin * plugin)
{
return gst_element_register (plugin, "aggregator", GST_RANK_NONE,
GST_TYPE_AGGREGATOR);
}
static gboolean
gst_agregator_plugin_register (void)
{
return gst_plugin_register_static (GST_VERSION_MAJOR,
GST_VERSION_MINOR,
"aggregator",
"Combine buffers",
gst_agregator_plugin_init,
VERSION, GST_LICENSE, PACKAGE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);
}
#define fail_unless_collected(expected) \
G_STMT_START { \
g_mutex_lock (&lock); \
@ -430,11 +653,115 @@ GST_START_TEST (test_collect_default)
GST_END_TEST;
#define NUM_BUFFERS 3
static void
handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
{
*count = *count + 1;
}
/* Test a linear pipeline using aggregator */
GST_START_TEST (test_linear_pipeline)
{
GstElement *pipeline, *src, *agg, *sink;
GstBus *bus;
GstMessage *msg;
gint count = 0;
pipeline = gst_pipeline_new ("pipeline");
src = gst_check_setup_element ("fakesrc");
g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
NULL);
agg = gst_check_setup_element ("aggregator");
sink = gst_check_setup_element ("fakesink");
g_object_set (sink, "signal-handoffs", TRUE, NULL);
g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
fail_unless (gst_bin_add (GST_BIN (pipeline), src));
fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
fail_unless (gst_element_link (src, agg));
fail_unless (gst_element_link (agg, sink));
bus = gst_element_get_bus (pipeline);
fail_if (bus == NULL);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
gst_message_unref (msg);
fail_unless_equals_int (count, NUM_BUFFERS);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (bus);
gst_object_unref (pipeline);
}
GST_END_TEST;
/* Test a linear pipeline using aggregator */
GST_START_TEST (test_branched_pipeline)
{
GstElement *pipeline, *src, *tee, *queue[2], *agg, *sink;
GstBus *bus;
GstMessage *msg;
gint count = 0;
pipeline = gst_pipeline_new ("pipeline");
src = gst_check_setup_element ("fakesrc");
g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
NULL);
tee = gst_check_setup_element ("tee");
queue[0] = gst_check_setup_element ("queue");
gst_object_set_name (GST_OBJECT (queue[0]), "queue0");
queue[1] = gst_check_setup_element ("queue");
gst_object_set_name (GST_OBJECT (queue[1]), "queue1");
agg = gst_check_setup_element ("aggregator");
sink = gst_check_setup_element ("fakesink");
g_object_set (sink, "signal-handoffs", TRUE, NULL);
g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
fail_unless (gst_bin_add (GST_BIN (pipeline), src));
fail_unless (gst_bin_add (GST_BIN (pipeline), tee));
fail_unless (gst_bin_add (GST_BIN (pipeline), queue[0]));
fail_unless (gst_bin_add (GST_BIN (pipeline), queue[1]));
fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
fail_unless (gst_element_link (src, tee));
fail_unless (gst_element_link (tee, queue[0]));
fail_unless (gst_element_link (tee, queue[1]));
fail_unless (gst_element_link (queue[0], agg));
fail_unless (gst_element_link (queue[1], agg));
fail_unless (gst_element_link (agg, sink));
bus = gst_element_get_bus (pipeline);
fail_if (bus == NULL);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
gst_message_unref (msg);
/* we have two branches, but we still only forward buffers from one branch */
fail_unless_equals_int (count, NUM_BUFFERS);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (bus);
gst_object_unref (pipeline);
}
GST_END_TEST;
static Suite *
gst_collect_pads_suite (void)
{
Suite *suite;
TCase *general, *buffers;
TCase *general, *buffers, *pipeline;
gst_agregator_plugin_register ();
suite = suite_create ("GstCollectPads");
general = tcase_create ("general");
@ -450,6 +777,11 @@ gst_collect_pads_suite (void)
tcase_add_checked_fixture (buffers, setup_buffer_cb, teardown);
tcase_add_test (buffers, test_collect_default);
pipeline = tcase_create ("pipeline");
suite_add_tcase (suite, pipeline);
tcase_add_test (pipeline, test_linear_pipeline);
tcase_add_test (pipeline, test_branched_pipeline);
return suite;
}