From fadabe8b787d53c90dc06e1fcdaff3f2f04e990c Mon Sep 17 00:00:00 2001 From: HoonHee Lee Date: Tue, 4 Mar 2014 19:40:05 +0900 Subject: [PATCH] streamiddemux: Add streamiddemux element Demultiplex a stream to multiple source pads based on the stream ids from the stream-start events. This basically reverses the behaviour of funnel. https://bugzilla.gnome.org/show_bug.cgi?id=707605 --- configure.ac | 1 + plugins/elements/Makefile.am | 2 + plugins/elements/gstelements.c | 5 + plugins/elements/gststreamiddemux.c | 401 ++++++++++++++ plugins/elements/gststreamiddemux.h | 66 +++ tests/check/Makefile.am | 1 + tests/check/elements/streamiddemux.c | 514 ++++++++++++++++++ tests/examples/Makefile.am | 1 + tests/examples/streamiddemux/Makefile.am | 6 + .../streamiddemux/streamiddemux-stream.c | 241 ++++++++ 10 files changed, 1238 insertions(+) create mode 100644 plugins/elements/gststreamiddemux.c create mode 100644 plugins/elements/gststreamiddemux.h create mode 100644 tests/check/elements/streamiddemux.c create mode 100644 tests/examples/streamiddemux/Makefile.am create mode 100644 tests/examples/streamiddemux/streamiddemux-stream.c diff --git a/configure.ac b/configure.ac index e637239dfd..9e4582662b 100644 --- a/configure.ac +++ b/configure.ac @@ -839,6 +839,7 @@ tests/examples/memory/Makefile tests/examples/metadata/Makefile tests/examples/netclock/Makefile tests/examples/queue/Makefile +tests/examples/streamiddemux/Makefile tests/examples/streams/Makefile tests/examples/typefind/Makefile tools/Makefile diff --git a/plugins/elements/Makefile.am b/plugins/elements/Makefile.am index 6d3857cc7d..ee3c9fb2eb 100644 --- a/plugins/elements/Makefile.am +++ b/plugins/elements/Makefile.am @@ -24,6 +24,7 @@ libgstcoreelements_la_SOURCES = \ gstsparsefile.c \ gsttee.c \ gsttypefindelement.c \ + gststreamiddemux.c \ gstvalve.c libgstcoreelements_la_CFLAGS = $(GST_OBJ_CFLAGS) @@ -54,6 +55,7 @@ noinst_HEADERS = \ gstsparsefile.h \ gsttee.h \ gsttypefindelement.h \ + gststreamiddemux.h \ gstvalve.h EXTRA_DIST = gstfdsrc.c \ diff --git a/plugins/elements/gstelements.c b/plugins/elements/gstelements.c index 2b1781da74..5c07855794 100644 --- a/plugins/elements/gstelements.c +++ b/plugins/elements/gstelements.c @@ -46,6 +46,7 @@ #include "gsttee.h" #include "gsttypefindelement.h" #include "gstvalve.h" +#include "gststreamiddemux.h" static gboolean plugin_init (GstPlugin * plugin) @@ -109,6 +110,10 @@ plugin_init (GstPlugin * plugin) gst_valve_get_type ())) return FALSE; + if (!gst_element_register (plugin, "streamiddemux", GST_RANK_PRIMARY, + gst_streamid_demux_get_type ())) + return FALSE; + return TRUE; } diff --git a/plugins/elements/gststreamiddemux.c b/plugins/elements/gststreamiddemux.c new file mode 100644 index 0000000000..8528acb5e4 --- /dev/null +++ b/plugins/elements/gststreamiddemux.c @@ -0,0 +1,401 @@ +/* GStreamer streamiddemux element + * + * Copyright 2013 LGE Corporation. + * @author: Hoonhee Lee + * @author: Jeongseok Kim + * @author: Wonchul Lee + * + * gststreamiddemux.c: Simple stream-id-demultiplexer element + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * SECTION:element-streamid-demux + * @see_also: #GstFunnel + * + * Direct input stream to one out of N output pads by stream-id. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include + +#include "gststreamiddemux.h" + +GST_DEBUG_CATEGORY_STATIC (streamid_demux_debug); +#define GST_CAT_DEFAULT streamid_demux_debug + +enum +{ + PROP_0, + PROP_ACTIVE_PAD, + PROP_LAST +}; + +static GstStaticPadTemplate gst_streamid_demux_sink_factory = +GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +static GstStaticPadTemplate gst_streamid_demux_src_factory = +GST_STATIC_PAD_TEMPLATE ("src_%u", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS_ANY); + +#define _do_init \ +GST_DEBUG_CATEGORY_INIT (streamid_demux_debug, \ + "streamiddemux", 0, "Streamid demuxer"); +#define gst_streamid_demux_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstStreamidDemux, gst_streamid_demux, + GST_TYPE_ELEMENT, _do_init); + +static void gst_streamid_demux_dispose (GObject * object); +static void gst_streamid_demux_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static GstFlowReturn gst_streamid_demux_chain (GstPad * pad, + GstObject * parent, GstBuffer * buf); +static gboolean gst_streamid_demux_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static GstStateChangeReturn gst_streamid_demux_change_state (GstElement * + element, GstStateChange transition); +static GstPad *gst_streamid_demux_get_srcpad_by_stream_id (GstStreamidDemux * + demux, const gchar * stream_id); +static gboolean gst_streamid_demux_srcpad_create (GstStreamidDemux * demux, + GstPad * pad, const gchar * stream_id); +static void gst_streamid_demux_reset (GstStreamidDemux * demux); +static void gst_streamid_demux_release_srcpad (const GValue * item, + GstStreamidDemux * demux); + +static void +gst_streamid_demux_class_init (GstStreamidDemuxClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + gobject_class->get_property = gst_streamid_demux_get_property; + gobject_class->dispose = gst_streamid_demux_dispose; + + g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD, + g_param_spec_object ("active-pad", "Active pad", + "The currently active src pad", GST_TYPE_PAD, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_set_static_metadata (gstelement_class, "Streamid Demux", + "Generic", "1-to-N output stream by stream-id", + "HoonHee Lee "); + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&gst_streamid_demux_sink_factory)); + + gst_element_class_add_pad_template (gstelement_class, + gst_static_pad_template_get (&gst_streamid_demux_src_factory)); + + gstelement_class->change_state = gst_streamid_demux_change_state; +} + +static void +gst_streamid_demux_init (GstStreamidDemux * demux) +{ + demux->sinkpad = + gst_pad_new_from_static_template (&gst_streamid_demux_sink_factory, + "sink"); + gst_pad_set_chain_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_streamid_demux_chain)); + gst_pad_set_event_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_streamid_demux_event)); + + gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad); + + /* srcpad management */ + demux->active_srcpad = NULL; + demux->nb_srcpads = 0; + + /* initialize hash table for srcpad */ + demux->stream_id_pairs = + g_hash_table_new_full (g_str_hash, g_str_equal, (GDestroyNotify) g_free, + (GDestroyNotify) gst_object_unref); +} + +static void +gst_streamid_demux_dispose (GObject * object) +{ + GstStreamidDemux *demux = GST_STREAMID_DEMUX (object); + + gst_streamid_demux_reset (demux); + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + +static void +gst_streamid_demux_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstStreamidDemux *demux = GST_STREAMID_DEMUX (object); + + switch (prop_id) { + case PROP_ACTIVE_PAD: + GST_OBJECT_LOCK (demux); + g_value_set_object (value, demux->active_srcpad); + GST_OBJECT_UNLOCK (demux); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gboolean +forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) +{ + GstPad *srcpad = GST_PAD_CAST (user_data); + + gst_pad_push_event (srcpad, gst_event_ref (*event)); + + return TRUE; +} + +static gboolean +gst_streamid_demux_srcpad_create (GstStreamidDemux * demux, GstPad * pad, + const gchar * stream_id) +{ + gchar *padname = NULL; + GstPad *srcpad = NULL; + GstPadTemplate *pad_tmpl = NULL; + + padname = g_strdup_printf ("src_%u", demux->nb_srcpads++); + pad_tmpl = gst_static_pad_template_get (&gst_streamid_demux_src_factory); + + GST_LOG_OBJECT (demux, "generating a srcpad:%s", padname); + srcpad = gst_pad_new_from_template (pad_tmpl, padname); + gst_object_unref (pad_tmpl); + g_free (padname); + g_return_val_if_fail (srcpad != NULL, FALSE); + + demux->active_srcpad = srcpad; + g_hash_table_insert (demux->stream_id_pairs, g_strdup (stream_id), + gst_object_ref (srcpad)); + + return TRUE; +} + +static GstFlowReturn +gst_streamid_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) +{ + GstFlowReturn res = GST_FLOW_OK; + GstStreamidDemux *demux = NULL; + GstPad *srcpad = NULL; + + demux = GST_STREAMID_DEMUX (parent); + + GST_LOG_OBJECT (demux, "pushing buffer to %" GST_PTR_FORMAT, + demux->active_srcpad); + + GST_OBJECT_LOCK (demux); + if (demux->active_srcpad) { + srcpad = gst_object_ref (demux->active_srcpad); + GST_OBJECT_UNLOCK (demux); + res = gst_pad_push (srcpad, buf); + gst_object_unref (srcpad); + } else { + GST_OBJECT_UNLOCK (demux); + goto no_active_srcpad; + } + + GST_LOG_OBJECT (demux, "handled buffer %s", gst_flow_get_name (res)); + return res; + +/* ERROR */ +no_active_srcpad: + { + GST_WARNING_OBJECT (demux, "srcpad is not initialized"); + return GST_FLOW_NOT_NEGOTIATED; + } +} + +static GstPad * +gst_streamid_demux_get_srcpad_by_stream_id (GstStreamidDemux * demux, + const gchar * stream_id) +{ + GstPad *srcpad = NULL; + + GST_DEBUG_OBJECT (demux, "stream_id = %s", stream_id); + if (demux->stream_id_pairs == NULL || stream_id == NULL) { + goto done; + } + + srcpad = g_hash_table_lookup (demux->stream_id_pairs, stream_id); + + if (srcpad) { + GST_DEBUG_OBJECT (demux, "srcpad = %s:%s matched", + GST_DEBUG_PAD_NAME (srcpad)); + } + +done: + return srcpad; +} + +static gboolean +gst_streamid_demux_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + gboolean res = TRUE; + GstStreamidDemux *demux; + const gchar *stream_id = NULL; + GstPad *active_srcpad = NULL; + + demux = GST_STREAMID_DEMUX (parent); + + GST_DEBUG_OBJECT (demux, "event = %s, sticky = %d", + GST_EVENT_TYPE_NAME (event), GST_EVENT_IS_STICKY (event)); + + if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) { + gst_event_parse_stream_start (event, &stream_id); + if (!stream_id) + goto no_stream_id; + + GST_OBJECT_LOCK (demux); + active_srcpad = + gst_streamid_demux_get_srcpad_by_stream_id (demux, stream_id); + if (!active_srcpad) { + /* try to generate a srcpad */ + if (gst_streamid_demux_srcpad_create (demux, pad, stream_id)) { + GST_OBJECT_UNLOCK (demux); + + gst_pad_set_active (demux->active_srcpad, TRUE); + /* Forward sticky events to the new srcpad */ + gst_pad_sticky_events_foreach (demux->sinkpad, forward_sticky_events, + demux->active_srcpad); + gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->active_srcpad); + } else { + GST_OBJECT_UNLOCK (demux); + goto fail_create_srcpad; + } + } else if (demux->active_srcpad != active_srcpad) { + demux->active_srcpad = active_srcpad; + GST_OBJECT_UNLOCK (demux); + + g_object_notify (G_OBJECT (demux), "active-pad"); + } else + GST_OBJECT_UNLOCK (demux); + } + + if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START + || GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP + || GST_EVENT_TYPE (event) == GST_EVENT_EOS) { + res = gst_pad_event_default (pad, parent, event); + } else if (demux->active_srcpad) { + GstPad *srcpad = NULL; + GST_OBJECT_LOCK (demux); + srcpad = gst_object_ref (demux->active_srcpad); + GST_OBJECT_UNLOCK (demux); + res = gst_pad_push_event (srcpad, event); + gst_object_unref (srcpad); + } else { + gst_event_unref (event); + } + return res; + + /* ERRORS */ +no_stream_id: + { + GST_ELEMENT_ERROR (demux, STREAM, DEMUX, + ("Error occurred trying to get stream-id to create a srcpad"), + ("no stream-id found at %s", GST_EVENT_TYPE_NAME (event))); + + gst_event_unref (event); + return FALSE; + } + +fail_create_srcpad: + { + GST_ELEMENT_ERROR (demux, STREAM, FAILED, + ("Error occurred trying to create a srcpad"), + ("Failed to create a srcpad via stream-id:%s", stream_id)); + gst_event_unref (event); + return FALSE; + } +} + +static void +gst_streamid_demux_release_srcpad (const GValue * item, + GstStreamidDemux * demux) +{ + GstPad *pad = g_value_get_object (item); + + if (pad != NULL) { + gst_pad_set_active (pad, FALSE); + gst_element_remove_pad (GST_ELEMENT_CAST (demux), pad); + } +} + +static void +gst_streamid_demux_reset (GstStreamidDemux * demux) +{ + GstIterator *it = NULL; + GstIteratorResult itret = GST_ITERATOR_OK; + + GST_OBJECT_LOCK (demux); + if (demux->active_srcpad != NULL) + demux->active_srcpad = NULL; + + GST_OBJECT_UNLOCK (demux); + + if (demux->stream_id_pairs != NULL) { + g_hash_table_unref (demux->stream_id_pairs); + demux->stream_id_pairs = NULL; + } + + it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux)); + while (itret == GST_ITERATOR_OK || itret == GST_ITERATOR_RESYNC) { + itret = + gst_iterator_foreach (it, + (GstIteratorForeachFunction) gst_streamid_demux_release_srcpad, demux); + if (itret == GST_ITERATOR_RESYNC) + gst_iterator_resync (it); + } + gst_iterator_free (it); +} + +static GstStateChangeReturn +gst_streamid_demux_change_state (GstElement * element, + GstStateChange transition) +{ + GstStreamidDemux *demux; + GstStateChangeReturn result; + + demux = GST_STREAMID_DEMUX (element); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + break; + default: + break; + } + + result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_streamid_demux_reset (demux); + break; + default: + break; + } + + return result; +} diff --git a/plugins/elements/gststreamiddemux.h b/plugins/elements/gststreamiddemux.h new file mode 100644 index 0000000000..ec5383b2c1 --- /dev/null +++ b/plugins/elements/gststreamiddemux.h @@ -0,0 +1,66 @@ +/* + * GStreamer streamiddemux eleement + * + * Copyright 2013 LGE Corporation. + * @author: Hoonhee Lee + * @author: Jeongseok Kim + * @author: Wonchul Lee + * + * gststreamiddemux.h: Simple stream-id-demultiplexer element + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef __GST_STREAMID_DEMUX_H__ +#define __GST_STREAMID_DEMUX_H__ + +#include + +G_BEGIN_DECLS +#define GST_TYPE_STREAMID_DEMUX \ + (gst_streamid_demux_get_type()) +#define GST_STREAMID_DEMUX(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMID_DEMUX, GstStreamidDemux)) +#define GST_STREAMID_DEMUX_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMID_DEMUX, GstStreamidDemuxClass)) +#define GST_IS_STREAMID_DEMUX(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMID_DEMUX)) +#define GST_IS_STREAMID_DEMUX_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMID_DEMUX)) +typedef struct _GstStreamidDemux GstStreamidDemux; +typedef struct _GstStreamidDemuxClass GstStreamidDemuxClass; + +struct _GstStreamidDemux +{ + GstElement element; + + GstPad *sinkpad; + + guint nb_srcpads; + GstPad *active_srcpad; + + /* This table contains srcpad and stream-id */ + GHashTable *stream_id_pairs; +}; + +struct _GstStreamidDemuxClass +{ + GstElementClass parent_class; +}; + +G_GNUC_INTERNAL GType gst_streamid_demux_get_type (void); + +G_END_DECLS +#endif /* __GST_STREAMID_DEMUX_H__ */ diff --git a/tests/check/Makefile.am b/tests/check/Makefile.am index 36d0ebc03d..5e7e5ab8b3 100644 --- a/tests/check/Makefile.am +++ b/tests/check/Makefile.am @@ -95,6 +95,7 @@ REGISTRY_CHECKS = \ elements/queue \ elements/queue2 \ elements/valve \ + elements/streamiddemux \ libs/baseparse \ libs/basesrc \ libs/basesink \ diff --git a/tests/check/elements/streamiddemux.c b/tests/check/elements/streamiddemux.c new file mode 100644 index 0000000000..8c10bce72b --- /dev/null +++ b/tests/check/elements/streamiddemux.c @@ -0,0 +1,514 @@ +/* GStreamer unit tests for the streamiddemux + * + * Copyright 2013 LGE Corporation. + * @author: Hoonhee Lee + * @author: Jeongseok Kim + * @author: Wonchul Lee + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include + +#define NUM_SUBSTREAMS 100 +#define NUM_BUFFER 1000 + +static GstPad *active_srcpad; + +struct TestData +{ + GstElement *demux; + GstPad *mysrc, *mysink[NUM_SUBSTREAMS]; + GstPad *demuxsink, *demuxsrc[NUM_SUBSTREAMS]; + gint srcpad_cnt; + GstCaps *mycaps; + GstCaps *caps[NUM_SUBSTREAMS]; + GstSegment segment[NUM_SUBSTREAMS]; + gchar *stream_ids[NUM_SUBSTREAMS]; +}; + +static void +set_active_srcpad (struct TestData *td) +{ + if (active_srcpad) + gst_object_unref (active_srcpad); + + g_object_get (td->demux, "active-pad", &active_srcpad, NULL); +} + +static void +release_test_objects (struct TestData *td) +{ + fail_unless (gst_element_set_state (td->demux, GST_STATE_NULL) == + GST_STATE_CHANGE_SUCCESS); + + gst_object_unref (td->demuxsink); + + gst_caps_unref (td->mycaps); + + if (active_srcpad) + gst_object_unref (active_srcpad); + + gst_object_unref (td->demux); +} + +static void +src_pad_added_cb (GstElement * demux, GstPad * pad, struct TestData *td) +{ + if (td->srcpad_cnt < NUM_SUBSTREAMS) { + td->demuxsrc[td->srcpad_cnt] = pad; + fail_unless (gst_pad_link (pad, + td->mysink[td->srcpad_cnt++]) == GST_PAD_LINK_OK); + } +} + +static void +setup_test_objects (struct TestData *td) +{ + td->mycaps = gst_caps_new_empty_simple ("test/test"); + td->srcpad_cnt = 0; + + td->demux = gst_element_factory_make ("streamiddemux", NULL); + fail_unless (td->demux != NULL); + g_signal_connect (td->demux, "pad-added", G_CALLBACK (src_pad_added_cb), td); + td->demuxsink = gst_element_get_static_pad (td->demux, "sink"); + fail_unless (td->demuxsink != NULL); + + fail_unless (gst_element_set_state (td->demux, GST_STATE_PLAYING) == + GST_STATE_CHANGE_SUCCESS); +} + +static GstFlowReturn +chain_ok (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstPad *peer_pad = NULL; + gchar *pad_stream_id, *active_srcpad_stream_id; + + peer_pad = gst_pad_get_peer (active_srcpad); + pad_stream_id = gst_pad_get_stream_id (pad); + active_srcpad_stream_id = gst_pad_get_stream_id (active_srcpad); + fail_unless (pad == peer_pad); + fail_unless (g_strcmp0 (pad_stream_id, active_srcpad_stream_id) == 0); + + g_free (pad_stream_id); + g_free (active_srcpad_stream_id); + gst_object_unref (peer_pad); + gst_buffer_unref (buffer); + + return GST_FLOW_OK; +} + +GST_START_TEST (test_simple_create_destroy) +{ + GstElement *demux; + + demux = gst_element_factory_make ("streamiddemux", NULL); + gst_object_unref (demux); +} + +GST_END_TEST; + +GST_START_TEST (test_streamiddemux_with_stream_start) +{ + struct TestData td; + + setup_test_objects (&td); + + GST_DEBUG ("Creating mysink"); + td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK); + gst_pad_set_active (td.mysink[0], TRUE); + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Pushing stream-start event"); + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start ("test0"))); + + g_object_get (td.demux, "active-pad", &active_srcpad, NULL); + fail_unless (active_srcpad != NULL, "Failed to generate a srcpad"); + fail_unless (td.srcpad_cnt == 1, "pad-added signal has not emmited"); + + GST_DEBUG ("Releasing mysink and mysrc"); + gst_pad_set_active (td.mysink[0], FALSE); + gst_pad_set_active (td.mysrc, FALSE); + + gst_object_unref (td.mysink[0]); + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +GST_START_TEST (test_streamiddemux_without_stream_start) +{ + struct TestData td; + GstSegment segment; + + setup_test_objects (&td); + + GST_DEBUG ("Creating mysink"); + td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK); + gst_pad_set_active (td.mysink[0], TRUE); + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Pushing caps and segment event without stream-start"); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_caps (td.mycaps))); + gst_segment_init (&segment, GST_FORMAT_BYTES); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_segment (&segment))); + + g_object_get (td.demux, "active-pad", &active_srcpad, NULL); + fail_unless (active_srcpad == NULL, "srcpad has created unexpectedly"); + fail_unless (td.srcpad_cnt == 0, "pad-added signal is emmited unexpectedly"); + + GST_DEBUG ("Releasing mysink and mysrc"); + gst_pad_set_active (td.mysink[0], FALSE); + gst_pad_set_active (td.mysrc, FALSE); + + gst_object_unref (td.mysink[0]); + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +GST_START_TEST (test_streamiddemux_simple) +{ + struct TestData td; + + setup_test_objects (&td); + + GST_DEBUG ("Creating mysink"); + td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK); + td.mysink[0]->chaindata = &td; + gst_pad_set_chain_function (td.mysink[0], chain_ok); + gst_pad_set_active (td.mysink[0], TRUE); + + td.mysink[1] = gst_pad_new ("mysink1", GST_PAD_SINK); + td.mysink[1]->chaindata = &td; + gst_pad_set_chain_function (td.mysink[1], chain_ok); + gst_pad_set_active (td.mysink[1], TRUE); + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Pushing stream-start, caps and segment event"); + gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps, + GST_FORMAT_BYTES, "test0"); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps, + GST_FORMAT_BYTES, "test1"); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + GST_DEBUG ("Pushing buffer"); + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start ("test0"))); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start ("test1"))); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + GST_DEBUG ("Releasing mysink and mysrc"); + gst_pad_set_active (td.mysink[0], FALSE); + gst_pad_set_active (td.mysink[1], FALSE); + gst_pad_set_active (td.mysrc, FALSE); + + gst_object_unref (td.mysink[0]); + gst_object_unref (td.mysink[1]); + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +GList *expected[NUM_SUBSTREAMS]; + +static gboolean +sink_event_func (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GList **expected = GST_PAD_ELEMENT_PRIVATE (pad); + GstEvent *exp; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS:{ + GstCaps *recvcaps, *expectcaps; + + *expected = g_list_first (*expected); + exp = GST_EVENT ((*expected)->data); + + gst_event_parse_caps (event, &recvcaps); + gst_event_parse_caps (exp, &expectcaps); + + fail_unless (gst_caps_is_equal (recvcaps, expectcaps)); + break; + } + case GST_EVENT_SEGMENT:{ + const GstSegment *recvseg, *expectseg; + + *expected = g_list_last (*expected); + exp = GST_EVENT ((*expected)->data); + + gst_event_parse_segment (event, &recvseg); + gst_event_parse_segment (exp, &expectseg); + + fail_unless_equals_uint64 (recvseg->position, expectseg->position); + break; + } + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +GST_START_TEST (test_streamiddemux_num_buffers) +{ + struct TestData td; + gint buffer_cnt = 0; + gint stream_cnt = 0; + GstEvent *event; + + setup_test_objects (&td); + + GST_DEBUG ("Creating mysink"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gchar *name; + name = g_strdup_printf ("mysink%d", stream_cnt); + td.mysink[stream_cnt] = gst_pad_new (name, GST_PAD_SINK); + g_free (name); + gst_pad_set_chain_function (td.mysink[stream_cnt], chain_ok); + gst_pad_set_event_function (td.mysink[stream_cnt], sink_event_func); + gst_pad_set_active (td.mysink[stream_cnt], TRUE); + GST_PAD_ELEMENT_PRIVATE (td.mysink[stream_cnt]) = &expected[stream_cnt]; + } + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Creating caps"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gchar *caps_name; + caps_name = g_strdup_printf ("test/test%d", stream_cnt); + td.caps[stream_cnt] = gst_caps_new_empty_simple (caps_name); + + g_free (caps_name); + } + + GST_DEBUG ("Creating segment"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gst_segment_init (&td.segment[stream_cnt], GST_FORMAT_BYTES); + td.segment[stream_cnt].position = stream_cnt * GST_SECOND; + } + + GST_DEBUG ("Pushing stream-start, caps and segment event"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gchar *name; + name = g_strdup_printf ("test%d", stream_cnt); + + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start (name))); + + event = gst_event_new_caps (td.caps[stream_cnt]); + expected[stream_cnt] = + g_list_append (expected[stream_cnt], gst_event_ref (event)); + fail_unless (gst_pad_push_event (td.mysrc, event)); + + event = gst_event_new_segment (&td.segment[stream_cnt]); + expected[stream_cnt] = + g_list_append (expected[stream_cnt], gst_event_ref (event)); + fail_unless (gst_pad_push_event (td.mysrc, event)); + + g_free (name); + set_active_srcpad (&td); + + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + } + + GST_DEBUG ("Pushing buffers to random srcpad"); + for (buffer_cnt = 0; buffer_cnt < NUM_BUFFER; ++buffer_cnt) { + gchar *name; + gint active_stream = rand () % NUM_SUBSTREAMS; + name = g_strdup_printf ("test%d", active_stream); + + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_stream_start (name))); + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_caps (td.caps[active_stream]))); + fail_unless (gst_pad_push_event (td.mysrc, + gst_event_new_segment (&td.segment[active_stream]))); + + g_free (name); + set_active_srcpad (&td); + + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + } + + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) + gst_caps_unref (td.caps[stream_cnt]); + + GST_DEBUG ("Releasing mysink and mysrc"); + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gst_pad_set_active (td.mysink[stream_cnt], FALSE); + } + gst_pad_set_active (td.mysrc, FALSE); + + for (stream_cnt = 0; stream_cnt < NUM_SUBSTREAMS; ++stream_cnt) { + gst_object_unref (td.mysink[stream_cnt]); + } + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +guint num_eos = 0; +guint num_flush_start = 0; +guint num_flush_stop = 0; + +static gboolean +event_func (GstPad * pad, GstObject * parent, GstEvent * event) +{ + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_STREAM_START: + ++num_flush_start; + break; + case GST_EVENT_FLUSH_STOP: + ++num_flush_stop; + break; + case GST_EVENT_EOS: + ++num_eos; + break; + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +GST_START_TEST (test_streamiddemux_eos) +{ + struct TestData td; + + setup_test_objects (&td); + + num_eos = 0; + + GST_DEBUG ("Creating mysink"); + td.mysink[0] = gst_pad_new ("mysink0", GST_PAD_SINK); + gst_pad_set_chain_function (td.mysink[0], chain_ok); + gst_pad_set_event_function (td.mysink[0], event_func); + gst_pad_set_active (td.mysink[0], TRUE); + + td.mysink[1] = gst_pad_new ("mysink1", GST_PAD_SINK); + gst_pad_set_chain_function (td.mysink[1], chain_ok); + gst_pad_set_event_function (td.mysink[1], event_func); + gst_pad_set_active (td.mysink[1], TRUE); + + GST_DEBUG ("Creating mysrc"); + td.mysrc = gst_pad_new ("mysrc", GST_PAD_SRC); + fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (td.mysrc, td.demuxsink))); + gst_pad_set_active (td.mysrc, TRUE); + + GST_DEBUG ("Pushing stream-start, caps and segment event"); + gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps, + GST_FORMAT_BYTES, "test0"); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + gst_check_setup_events_with_stream_id (td.mysrc, td.demux, td.mycaps, + GST_FORMAT_BYTES, "test1"); + set_active_srcpad (&td); + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_OK); + + GST_DEBUG ("Pushing flush event"); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_flush_start ())); + fail_unless (num_flush_start == 2, + "Failed to send flush-start event to all pads internally linked"); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_flush_stop (TRUE))); + fail_unless (num_flush_stop == 2, + "Failed to send flush-stop event to all pads internally linked"); + + GST_DEBUG ("Pushing eos event"); + fail_unless (gst_pad_push_event (td.mysrc, gst_event_new_eos ())); + fail_unless (num_eos == 2, + "Failed to send eos event to all pads internally linked"); + + fail_unless (gst_pad_push (td.mysrc, gst_buffer_new ()) == GST_FLOW_EOS); + + GST_DEBUG ("Releasing mysink and mysrc"); + gst_pad_set_active (td.mysink[0], FALSE); + gst_pad_set_active (td.mysink[1], FALSE); + gst_pad_set_active (td.mysrc, FALSE); + + gst_object_unref (td.mysink[0]); + gst_object_unref (td.mysink[1]); + gst_object_unref (td.mysrc); + + GST_DEBUG ("Releasing streamiddemux"); + release_test_objects (&td); +} + +GST_END_TEST; + +static Suite * +streamiddemux_suite (void) +{ + Suite *s = suite_create ("streamiddemux"); + TCase *tc_chain; + + tc_chain = tcase_create ("streamiddemux simple"); + tcase_add_test (tc_chain, test_simple_create_destroy); + tcase_add_test (tc_chain, test_streamiddemux_with_stream_start); + tcase_add_test (tc_chain, test_streamiddemux_without_stream_start); + tcase_add_test (tc_chain, test_streamiddemux_simple); + tcase_add_test (tc_chain, test_streamiddemux_num_buffers); + tcase_add_test (tc_chain, test_streamiddemux_eos); + suite_add_tcase (s, tc_chain); + + return s; +} + +GST_CHECK_MAIN (streamiddemux); diff --git a/tests/examples/Makefile.am b/tests/examples/Makefile.am index 376bac8874..fce72a15c2 100644 --- a/tests/examples/Makefile.am +++ b/tests/examples/Makefile.am @@ -20,6 +20,7 @@ always_dirs = \ netclock \ queue \ stepping \ + streamiddemux \ streams \ typefind diff --git a/tests/examples/streamiddemux/Makefile.am b/tests/examples/streamiddemux/Makefile.am new file mode 100644 index 0000000000..e182d29880 --- /dev/null +++ b/tests/examples/streamiddemux/Makefile.am @@ -0,0 +1,6 @@ +noinst_PROGRAMS = streamiddemux-stream + +streamiddemux_stream_SOURCES = streamiddemux-stream.c +streamiddemux_stream_LDADD = $(GST_OBJ_LIBS) +streamiddemux_stream_CFLAGS = $(GST_OBJ_CFLAGS) + diff --git a/tests/examples/streamiddemux/streamiddemux-stream.c b/tests/examples/streamiddemux/streamiddemux-stream.c new file mode 100644 index 0000000000..1ef128b2b9 --- /dev/null +++ b/tests/examples/streamiddemux/streamiddemux-stream.c @@ -0,0 +1,241 @@ +#include + +#define NUM_STREAM 13 + +typedef struct _App App; + +struct _App +{ + GstElement *pipeline; + GstElement *audiotestsrc[NUM_STREAM]; + GstElement *audioconvert[NUM_STREAM]; + GstElement *capsfilter[NUM_STREAM]; + GstElement *vorbisenc[NUM_STREAM]; + GstElement *oggmux[NUM_STREAM]; + GstElement *funnel; + GstElement *demux; + GstElement *stream_synchronizer; + GstElement *queue[NUM_STREAM]; + GstElement *filesink[NUM_STREAM]; + + gboolean pad_blocked[NUM_STREAM]; + GstPad *queue_srcpad[NUM_STREAM]; + gulong blocked_id[NUM_STREAM]; +}; + +App s_app; + +gint pad_added_cnt = 0; + +static gboolean +bus_call (GstBus * bus, GstMessage * msg, gpointer data) +{ + GMainLoop *loop = (GMainLoop *) data; + + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_EOS:{ + g_main_loop_quit (loop); + break; + } + case GST_MESSAGE_ERROR:{ + g_main_loop_quit (loop); + break; + } + default: + break; + } + return TRUE; +} + +static void +set_blocked (App * app, gboolean blocked) +{ + gint i = 0; + + for (i = 0; i < NUM_STREAM; i++) { + gst_pad_remove_probe (app->queue_srcpad[i], app->blocked_id[i]); + } +} + +static void +sink_do_reconfigure (App * app) +{ + gint i = 0; + GstPad *filesink_sinkpad[NUM_STREAM]; + GstPad *sync_sinkpad[NUM_STREAM]; + GstPad *sync_srcpad[NUM_STREAM]; + GstIterator *it; + GValue item = G_VALUE_INIT; + + for (i = 0; i < NUM_STREAM; i++) { + sync_sinkpad[i] = + gst_element_get_request_pad (app->stream_synchronizer, "sink_%u"); + it = gst_pad_iterate_internal_links (sync_sinkpad[i]); + g_assert (it); + gst_iterator_next (it, &item); + sync_srcpad[i] = g_value_dup_object (&item); + g_value_unset (&item); + + filesink_sinkpad[i] = gst_element_get_static_pad (app->filesink[i], "sink"); + + gst_pad_link_full (app->queue_srcpad[i], sync_sinkpad[i], + GST_PAD_LINK_CHECK_NOTHING); + gst_pad_link_full (sync_srcpad[i], filesink_sinkpad[i], + GST_PAD_LINK_CHECK_NOTHING); + } + gst_iterator_free (it); + +} + +static GstPadProbeReturn +blocked_cb (GstPad * blockedpad, GstPadProbeInfo * info, gpointer user_data) +{ + App *app = user_data; + gint i = 0; + gboolean all_pads_blocked = TRUE; + + for (i = 0; i < NUM_STREAM; i++) { + if (blockedpad == app->queue_srcpad[i]) + app->pad_blocked[i] = TRUE; + } + + for (i = 0; i < NUM_STREAM; i++) { + if (app->queue_srcpad[i] == FALSE) { + all_pads_blocked = FALSE; + break; + } + } + + if (all_pads_blocked == TRUE) { + sink_do_reconfigure (app); + set_blocked (app, FALSE); + } + + return GST_PAD_PROBE_OK; +} + +static void +src_pad_added_cb (GstElement * demux, GstPad * pad, App * app) +{ + GstPad *queue_sinkpad[NUM_STREAM]; + + queue_sinkpad[pad_added_cnt] = + gst_element_get_static_pad (app->queue[pad_added_cnt], "sink"); + gst_pad_link_full (pad, queue_sinkpad[pad_added_cnt], + GST_PAD_LINK_CHECK_NOTHING); + + app->queue_srcpad[pad_added_cnt] = + gst_element_get_static_pad (app->queue[pad_added_cnt], "src"); + app->blocked_id[pad_added_cnt] = + gst_pad_add_probe (app->queue_srcpad[pad_added_cnt], + GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, blocked_cb, app, NULL); + + pad_added_cnt++; +} + +gint +main (gint argc, gchar * argv[]) +{ + App *app = &s_app; + + GMainLoop *loop = NULL; + GstBus *bus; + guint bus_watch_id; + + GstPad *funnel_sinkpad[NUM_STREAM]; + GstPad *funnel_srcpad; + GstPad *demux_sinkpad; + GstPad *oggmux_srcpad[NUM_STREAM]; + + guint stream_cnt = 0; + GstCaps *caps; + + gst_init (&argc, &argv); + + app->pipeline = gst_pipeline_new ("pipeline"); + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + app->audiotestsrc[stream_cnt] = + gst_element_factory_make ("audiotestsrc", NULL); + app->audioconvert[stream_cnt] = + gst_element_factory_make ("audioconvert", NULL); + app->capsfilter[stream_cnt] = gst_element_factory_make ("capsfilter", NULL); + app->vorbisenc[stream_cnt] = gst_element_factory_make ("vorbisenc", NULL); + app->oggmux[stream_cnt] = gst_element_factory_make ("oggmux", NULL); + } + + app->funnel = gst_element_factory_make ("funnel", NULL); + app->demux = gst_element_factory_make ("streamiddemux", NULL); + app->stream_synchronizer = + gst_element_factory_make ("streamsynchronizer", NULL); + + caps = gst_caps_from_string ("audio/x-raw,channels=1;"); + + stream_cnt = 0; + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + app->queue[stream_cnt] = gst_element_factory_make ("queue", NULL); + app->filesink[stream_cnt] = gst_element_factory_make ("filesink", NULL); + + g_object_set (app->audiotestsrc[stream_cnt], "wave", stream_cnt, + "num-buffers", 2000, NULL); + g_object_set (app->capsfilter[stream_cnt], "caps", caps, NULL); + g_object_set (app->filesink[stream_cnt], "location", + g_strdup_printf ("filesink_%d.ogg", stream_cnt), NULL); + } + + stream_cnt = 0; + + g_signal_connect (app->demux, "pad-added", G_CALLBACK (src_pad_added_cb), + app); + + loop = g_main_loop_new (NULL, FALSE); + + bus = gst_element_get_bus (app->pipeline); + bus_watch_id = gst_bus_add_watch (bus, bus_call, loop); + g_object_unref (bus); + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + gst_bin_add_many (GST_BIN (app->pipeline), app->audiotestsrc[stream_cnt], + app->audioconvert[stream_cnt], app->capsfilter[stream_cnt], + app->vorbisenc[stream_cnt], app->oggmux[stream_cnt], + app->queue[stream_cnt], app->filesink[stream_cnt], NULL); + if (stream_cnt == 0) { + gst_bin_add_many (GST_BIN (app->pipeline), app->funnel, app->demux, + app->stream_synchronizer, NULL); + } + } + + stream_cnt = 0; + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + gst_element_link_many (app->audiotestsrc[stream_cnt], + app->audioconvert[stream_cnt], app->capsfilter[stream_cnt], + app->vorbisenc[stream_cnt], app->oggmux[stream_cnt], NULL); + } + + stream_cnt = 0; + + for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) { + funnel_sinkpad[stream_cnt] = + gst_element_get_request_pad (app->funnel, "sink_%u"); + oggmux_srcpad[stream_cnt] = + gst_element_get_static_pad (app->oggmux[stream_cnt], "src"); + gst_pad_link (oggmux_srcpad[stream_cnt], funnel_sinkpad[stream_cnt]); + } + + funnel_srcpad = gst_element_get_static_pad (app->funnel, "src"); + + demux_sinkpad = gst_element_get_static_pad (app->demux, "sink"); + gst_pad_link (funnel_srcpad, demux_sinkpad); + + gst_element_set_state (app->pipeline, GST_STATE_PLAYING); + g_main_loop_run (loop); + + gst_element_set_state (app->pipeline, GST_STATE_NULL); + g_object_unref (app->pipeline); + g_source_remove (bus_watch_id); + g_main_loop_unref (loop); + + return 0; +}