/* * collectpads.c - GstCollectPads testsuite * Copyright (C) 2006 Alessandro Decina * * Authors: * Alessandro Decina * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include #include /* dummy collectpads based element */ #define GST_TYPE_AGGREGATOR (gst_aggregator_get_type ()) #define GST_AGGREGATOR(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_AGGREGATOR, GstAggregator)) #define GST_AGGREGATOR_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_AGGREGATOR, GstAggregatorClass)) #define GST_AGGREGATOR_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_AGGREGATOR, GstAggregatorClass)) typedef struct _GstAggregator GstAggregator; typedef struct _GstAggregatorClass GstAggregatorClass; struct _GstAggregator { GstElement parent; GstCollectPads *collect; GstPad *srcpad; GstPad *sinkpad[2]; gint padcount; gboolean first; }; struct _GstAggregatorClass { GstElementClass parent_class; }; static GType gst_aggregator_get_type (void); G_DEFINE_TYPE (GstAggregator, gst_aggregator, GST_TYPE_ELEMENT); static GstStaticPadTemplate gst_aggregator_src_template = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate gst_aggregator_sink_template = GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); static GstFlowReturn gst_agregator_collected (GstCollectPads * pads, gpointer user_data) { GstAggregator *aggregator = GST_AGGREGATOR (user_data); GstBuffer *inbuf; GstCollectData *collect_data = NULL; guint outsize = 0; GSList *walk; walk = pads->data; for (walk = pads->data; walk; walk = walk->next) { GstCollectData *tmp = (GstCollectData *) walk->data; if (tmp->buffer) { collect_data = tmp; break; } } /* can only happen when no pads to collect or all EOS */ if (collect_data == NULL) goto eos; outsize = gst_buffer_get_size (collect_data->buffer); inbuf = gst_collect_pads_take_buffer (pads, collect_data, outsize); if (!inbuf) goto eos; if (aggregator->first) { GstSegment segment; gst_segment_init (&segment, GST_FORMAT_BYTES); gst_pad_push_event (aggregator->srcpad, gst_event_new_stream_start ("test")); gst_pad_push_event (aggregator->srcpad, gst_event_new_segment (&segment)); aggregator->first = FALSE; } /* just forward the first buffer */ GST_DEBUG_OBJECT (aggregator, "forward buffer %p", inbuf); return gst_pad_push (aggregator->srcpad, inbuf); /* ERRORS */ eos: { GST_DEBUG_OBJECT (aggregator, "no data available, must be EOS"); gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ()); return GST_FLOW_EOS; } } static GstPad * gst_aggregator_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * unused, const GstCaps * caps) { GstAggregator *aggregator = GST_AGGREGATOR (element); gchar *name; GstPad *newpad; gint padcount; if (templ->direction != GST_PAD_SINK) return NULL; /* create new pad */ padcount = g_atomic_int_add (&aggregator->padcount, 1); name = g_strdup_printf ("sink_%u", padcount); newpad = gst_pad_new_from_template (templ, name); g_free (name); gst_collect_pads_add_pad (aggregator->collect, newpad, sizeof (GstCollectData), NULL, TRUE); /* takes ownership of the pad */ if (!gst_element_add_pad (GST_ELEMENT (aggregator), newpad)) goto could_not_add; GST_DEBUG_OBJECT (aggregator, "added new pad %s", GST_OBJECT_NAME (newpad)); return newpad; /* errors */ could_not_add: { GST_DEBUG_OBJECT (aggregator, "could not add pad"); gst_collect_pads_remove_pad (aggregator->collect, newpad); gst_object_unref (newpad); return NULL; } } static void gst_aggregator_release_pad (GstElement * element, GstPad * pad) { GstAggregator *aggregator = GST_AGGREGATOR (element); if (aggregator->collect) gst_collect_pads_remove_pad (aggregator->collect, pad); gst_element_remove_pad (element, pad); } static GstStateChangeReturn gst_aggregator_change_state (GstElement * element, GstStateChange transition) { GstAggregator *aggregator = GST_AGGREGATOR (element); GstStateChangeReturn ret; switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: gst_collect_pads_start (aggregator->collect); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: break; case GST_STATE_CHANGE_PAUSED_TO_READY: /* need to unblock the collectpads before calling the * parent change_state so that streaming can finish */ gst_collect_pads_stop (aggregator->collect); break; default: break; } ret = GST_ELEMENT_CLASS (gst_aggregator_parent_class)->change_state (element, transition); switch (transition) { default: break; } return ret; } static void gst_aggregator_dispose (GObject * object) { GstAggregator *aggregator = GST_AGGREGATOR (object); if (aggregator->collect) { gst_object_unref (aggregator->collect); aggregator->collect = NULL; } G_OBJECT_CLASS (gst_aggregator_parent_class)->dispose (object); } static void gst_aggregator_class_init (GstAggregatorClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; GstElementClass *gstelement_class = (GstElementClass *) klass; gobject_class->dispose = gst_aggregator_dispose; gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&gst_aggregator_src_template)); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&gst_aggregator_sink_template)); gst_element_class_set_static_metadata (gstelement_class, "Aggregator", "Testing", "Combine N buffers", "Stefan Sauer "); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_aggregator_release_pad); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_aggregator_change_state); } static void gst_aggregator_init (GstAggregator * agregator) { GstPadTemplate *template; template = gst_static_pad_template_get (&gst_aggregator_src_template); agregator->srcpad = gst_pad_new_from_template (template, "src"); gst_object_unref (template); GST_PAD_SET_PROXY_CAPS (agregator->srcpad); gst_element_add_pad (GST_ELEMENT (agregator), agregator->srcpad); /* keep track of the sinkpads requested */ agregator->collect = gst_collect_pads_new (); gst_collect_pads_set_function (agregator->collect, GST_DEBUG_FUNCPTR (gst_agregator_collected), agregator); agregator->first = TRUE; } static gboolean gst_agregator_plugin_init (GstPlugin * plugin) { return gst_element_register (plugin, "aggregator", GST_RANK_NONE, GST_TYPE_AGGREGATOR); } static gboolean gst_agregator_plugin_register (void) { return gst_plugin_register_static (GST_VERSION_MAJOR, GST_VERSION_MINOR, "aggregator", "Combine buffers", gst_agregator_plugin_init, VERSION, GST_LICENSE, PACKAGE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN); } #define fail_unless_collected(expected) \ G_STMT_START { \ g_mutex_lock (&lock); \ while (expected == TRUE && collected == FALSE) \ g_cond_wait (&cond, &lock); \ fail_unless_equals_int (collected, expected); \ g_mutex_unlock (&lock); \ } G_STMT_END; typedef struct { char foo; } BadCollectData; typedef struct { GstCollectData data; GstPad *pad; GstBuffer *buffer; GstEvent *event; GstFlowReturn expected_result; } TestData; static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); static GstCollectPads *collect; static gboolean collected; static GstPad *agg_srcpad, *srcpad1, *srcpad2; static GstPad *sinkpad1, *sinkpad2; static TestData *data1, *data2; static GstBuffer *outbuf1, *outbuf2; static GstElement *agg; gboolean fail_seek; gint flush_start_events, flush_stop_events; static GMutex lock; static GCond cond; static GstFlowReturn collected_cb (GstCollectPads * pads, gpointer user_data) { outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1); outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2); g_mutex_lock (&lock); collected = TRUE; g_cond_signal (&cond); g_mutex_unlock (&lock); return GST_FLOW_OK; } static GstFlowReturn handle_buffer_cb (GstCollectPads * pads, GstCollectData * data, GstBuffer * buf, gpointer user_data) { GST_DEBUG ("collected buffers via callback"); outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1); outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2); g_mutex_lock (&lock); collected = TRUE; g_cond_signal (&cond); g_mutex_unlock (&lock); return GST_FLOW_OK; } static gpointer push_buffer (gpointer user_data) { GstFlowReturn flow; GstCaps *caps; TestData *test_data = (TestData *) user_data; GstSegment segment; gst_pad_push_event (test_data->pad, gst_event_new_stream_start ("test")); caps = gst_caps_new_empty_simple ("foo/x-bar"); gst_pad_push_event (test_data->pad, gst_event_new_caps (caps)); gst_caps_unref (caps); gst_segment_init (&segment, GST_FORMAT_TIME); gst_pad_push_event (test_data->pad, gst_event_new_segment (&segment)); flow = gst_pad_push (test_data->pad, test_data->buffer); fail_unless (flow == test_data->expected_result, "got flow %s instead of OK", gst_flow_get_name (flow)); return NULL; } static gpointer push_event (gpointer user_data) { TestData *test_data = (TestData *) user_data; fail_unless (gst_pad_push_event (test_data->pad, test_data->event) == TRUE); return NULL; } static void setup_default (void) { collect = gst_collect_pads_new (); srcpad1 = gst_pad_new_from_static_template (&srctemplate, "src1"); srcpad2 = gst_pad_new_from_static_template (&srctemplate, "src2"); sinkpad1 = gst_pad_new_from_static_template (&sinktemplate, "sink1"); sinkpad2 = gst_pad_new_from_static_template (&sinktemplate, "sink2"); fail_unless (gst_pad_link (srcpad1, sinkpad1) == GST_PAD_LINK_OK); fail_unless (gst_pad_link (srcpad2, sinkpad2) == GST_PAD_LINK_OK); gst_pad_set_active (sinkpad1, TRUE); gst_pad_set_active (sinkpad2, TRUE); gst_pad_set_active (srcpad1, TRUE); gst_pad_set_active (srcpad2, TRUE); data1 = NULL; data2 = NULL; outbuf1 = NULL; outbuf2 = NULL; collected = FALSE; } static void setup (void) { setup_default (); gst_collect_pads_set_function (collect, collected_cb, NULL); } static void setup_buffer_cb (void) { setup_default (); gst_collect_pads_set_buffer_function (collect, handle_buffer_cb, NULL); } static void teardown (void) { gst_object_unref (sinkpad1); gst_object_unref (sinkpad2); gst_object_unref (collect); } GST_START_TEST (test_pad_add_remove) { ASSERT_CRITICAL (gst_collect_pads_add_pad (collect, sinkpad1, sizeof (BadCollectData), NULL, TRUE)); data1 = (TestData *) gst_collect_pads_add_pad (collect, sinkpad1, sizeof (TestData), NULL, TRUE); fail_unless (data1 != NULL); fail_unless (gst_collect_pads_remove_pad (collect, sinkpad2) == FALSE); fail_unless (gst_collect_pads_remove_pad (collect, sinkpad1) == TRUE); } GST_END_TEST; GST_START_TEST (test_collect) { GstBuffer *buf1, *buf2; GThread *thread1, *thread2; data1 = (TestData *) gst_collect_pads_add_pad (collect, sinkpad1, sizeof (TestData), NULL, TRUE); fail_unless (data1 != NULL); data2 = (TestData *) gst_collect_pads_add_pad (collect, sinkpad2, sizeof (TestData), NULL, TRUE); fail_unless (data2 != NULL); buf1 = gst_buffer_new (); buf2 = gst_buffer_new (); /* start collect pads */ gst_collect_pads_start (collect); /* push buffers on the pads */ data1->pad = srcpad1; data1->buffer = buf1; thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL); /* here thread1 is blocked and srcpad1 has a queued buffer */ fail_unless_collected (FALSE); data2->pad = srcpad2; data2->buffer = buf2; thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL); /* now both pads have a buffer */ fail_unless_collected (TRUE); fail_unless (outbuf1 == buf1); fail_unless (outbuf2 == buf2); /* these will return immediately as at this point the threads have been * unlocked and are finished */ g_thread_join (thread1); g_thread_join (thread2); gst_collect_pads_stop (collect); gst_buffer_unref (buf1); gst_buffer_unref (buf2); } GST_END_TEST; GST_START_TEST (test_collect_eos) { GstBuffer *buf1; GThread *thread1, *thread2; data1 = (TestData *) gst_collect_pads_add_pad (collect, sinkpad1, sizeof (TestData), NULL, TRUE); fail_unless (data1 != NULL); data2 = (TestData *) gst_collect_pads_add_pad (collect, sinkpad2, sizeof (TestData), NULL, TRUE); fail_unless (data2 != NULL); buf1 = gst_buffer_new (); /* start collect pads */ gst_collect_pads_start (collect); /* push a buffer on srcpad1 and EOS on srcpad2 */ data1->pad = srcpad1; data1->buffer = buf1; thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL); /* here thread1 is blocked and srcpad1 has a queued buffer */ fail_unless_collected (FALSE); data2->pad = srcpad2; data2->event = gst_event_new_eos (); thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL); /* now sinkpad1 has a buffer and sinkpad2 has EOS */ fail_unless_collected (TRUE); fail_unless (outbuf1 == buf1); /* sinkpad2 has EOS so a NULL buffer is returned */ fail_unless (outbuf2 == NULL); /* these will return immediately as when the data is popped the threads are * unlocked and will terminate */ g_thread_join (thread1); g_thread_join (thread2); gst_collect_pads_stop (collect); gst_buffer_unref (buf1); } GST_END_TEST; GST_START_TEST (test_collect_twice) { GstBuffer *buf1, *buf2; GThread *thread1, *thread2; data1 = (TestData *) gst_collect_pads_add_pad (collect, sinkpad1, sizeof (TestData), NULL, TRUE); fail_unless (data1 != NULL); data2 = (TestData *) gst_collect_pads_add_pad (collect, sinkpad2, sizeof (TestData), NULL, TRUE); fail_unless (data2 != NULL); GST_INFO ("round 1"); buf1 = gst_buffer_new (); /* start collect pads */ gst_collect_pads_start (collect); /* queue a buffer */ data1->pad = srcpad1; data1->buffer = buf1; thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL); /* here thread1 is blocked and srcpad1 has a queued buffer */ fail_unless_collected (FALSE); /* push EOS on the other pad */ data2->pad = srcpad2; data2->event = gst_event_new_eos (); thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL); /* one of the pads has a buffer, the other has EOS */ fail_unless_collected (TRUE); fail_unless (outbuf1 == buf1); /* there's nothing to pop from the one which received EOS */ fail_unless (outbuf2 == NULL); /* these will return immediately as at this point the threads have been * unlocked and are finished */ g_thread_join (thread1); g_thread_join (thread2); gst_collect_pads_stop (collect); collected = FALSE; GST_INFO ("round 2"); buf2 = gst_buffer_new (); /* clear EOS from pads */ gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)); gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE)); /* start collect pads */ gst_collect_pads_start (collect); /* push buffers on the pads */ data1->pad = srcpad1; data1->buffer = buf1; thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL); /* here thread1 is blocked and srcpad1 has a queued buffer */ fail_unless_collected (FALSE); data2->pad = srcpad2; data2->buffer = buf2; thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL); /* now both pads have a buffer */ fail_unless_collected (TRUE); /* these will return immediately as at this point the threads have been * unlocked and are finished */ g_thread_join (thread1); g_thread_join (thread2); gst_collect_pads_stop (collect); gst_buffer_unref (buf1); gst_buffer_unref (buf2); } GST_END_TEST; /* Test the default collected buffer func */ GST_START_TEST (test_collect_default) { GstBuffer *buf1, *buf2; GThread *thread1, *thread2; data1 = (TestData *) gst_collect_pads_add_pad (collect, sinkpad1, sizeof (TestData), NULL, TRUE); fail_unless (data1 != NULL); data2 = (TestData *) gst_collect_pads_add_pad (collect, sinkpad2, sizeof (TestData), NULL, TRUE); fail_unless (data2 != NULL); buf1 = gst_buffer_new (); GST_BUFFER_TIMESTAMP (buf1) = 0; buf2 = gst_buffer_new (); GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND; /* start collect pads */ gst_collect_pads_start (collect); /* push buffers on the pads */ data1->pad = srcpad1; data1->buffer = buf1; thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL); /* here thread1 is blocked and srcpad1 has a queued buffer */ fail_unless_collected (FALSE); data2->pad = srcpad2; data2->buffer = buf2; thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL); /* now both pads have a buffer */ fail_unless_collected (TRUE); /* The default callback should have popped the buffer with lower timestamp, * and this should therefore be NULL: */ fail_unless (outbuf1 == NULL); /* While this one should still be pending: */ fail_unless (outbuf2 == buf2); /* these will return immediately as at this point the threads have been * unlocked and are finished */ g_thread_join (thread1); g_thread_join (thread2); gst_collect_pads_stop (collect); gst_buffer_unref (buf1); gst_buffer_unref (buf2); } GST_END_TEST; #define NUM_BUFFERS 3 static void handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count) { *count = *count + 1; } /* Test a linear pipeline using aggregator */ GST_START_TEST (test_linear_pipeline) { GstElement *pipeline, *src, *agg, *sink; GstBus *bus; GstMessage *msg; gint count = 0; pipeline = gst_pipeline_new ("pipeline"); src = gst_check_setup_element ("fakesrc"); g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4, NULL); agg = gst_check_setup_element ("aggregator"); sink = gst_check_setup_element ("fakesink"); g_object_set (sink, "signal-handoffs", TRUE, NULL); g_signal_connect (sink, "handoff", (GCallback) handoff, &count); fail_unless (gst_bin_add (GST_BIN (pipeline), src)); fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); fail_unless (gst_element_link (src, agg)); fail_unless (gst_element_link (agg, sink)); bus = gst_element_get_bus (pipeline); fail_if (bus == NULL); gst_element_set_state (pipeline, GST_STATE_PLAYING); msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS); gst_message_unref (msg); fail_unless_equals_int (count, NUM_BUFFERS); gst_element_set_state (pipeline, GST_STATE_NULL); gst_object_unref (bus); gst_object_unref (pipeline); } GST_END_TEST; /* Test a linear pipeline using aggregator */ GST_START_TEST (test_branched_pipeline) { GstElement *pipeline, *src, *tee, *queue[2], *agg, *sink; GstBus *bus; GstMessage *msg; gint count = 0; pipeline = gst_pipeline_new ("pipeline"); src = gst_check_setup_element ("fakesrc"); g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4, NULL); tee = gst_check_setup_element ("tee"); queue[0] = gst_check_setup_element ("queue"); gst_object_set_name (GST_OBJECT (queue[0]), "queue0"); queue[1] = gst_check_setup_element ("queue"); gst_object_set_name (GST_OBJECT (queue[1]), "queue1"); agg = gst_check_setup_element ("aggregator"); sink = gst_check_setup_element ("fakesink"); g_object_set (sink, "signal-handoffs", TRUE, NULL); g_signal_connect (sink, "handoff", (GCallback) handoff, &count); fail_unless (gst_bin_add (GST_BIN (pipeline), src)); fail_unless (gst_bin_add (GST_BIN (pipeline), tee)); fail_unless (gst_bin_add (GST_BIN (pipeline), queue[0])); fail_unless (gst_bin_add (GST_BIN (pipeline), queue[1])); fail_unless (gst_bin_add (GST_BIN (pipeline), agg)); fail_unless (gst_bin_add (GST_BIN (pipeline), sink)); fail_unless (gst_element_link (src, tee)); fail_unless (gst_element_link (tee, queue[0])); fail_unless (gst_element_link (tee, queue[1])); fail_unless (gst_element_link (queue[0], agg)); fail_unless (gst_element_link (queue[1], agg)); fail_unless (gst_element_link (agg, sink)); bus = gst_element_get_bus (pipeline); fail_if (bus == NULL); gst_element_set_state (pipeline, GST_STATE_PLAYING); msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS); gst_message_unref (msg); /* we have two branches, but we still only forward buffers from one branch */ fail_unless_equals_int (count, NUM_BUFFERS * 2); gst_element_set_state (pipeline, GST_STATE_NULL); gst_object_unref (bus); gst_object_unref (pipeline); } GST_END_TEST; static GstPadProbeReturn downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) { if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) == GST_EVENT_FLUSH_START) g_atomic_int_inc (&flush_start_events); else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) == GST_EVENT_FLUSH_STOP) g_atomic_int_inc (&flush_stop_events); } else if (info->type & GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM) { g_mutex_lock (&lock); collected = TRUE; g_cond_signal (&cond); g_mutex_unlock (&lock); } return GST_PAD_PROBE_DROP; } static gboolean src_event (GstPad * pad, GstObject * parent, GstEvent * event) { gboolean ret = TRUE; if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK) { if (g_atomic_int_compare_and_exchange (&fail_seek, TRUE, FALSE) == TRUE) { ret = FALSE; } } gst_event_unref (event); return ret; } static gboolean agg_src_event (GstPad * pad, GstObject * parent, GstEvent * event) { return gst_collect_pads_src_event_default (GST_AGGREGATOR (parent)->collect, pad, event); } static GstPad * setup_src_pad (GstElement * element, GstStaticPadTemplate * tmpl, const char *name) { GstPad *srcpad, *sinkpad; srcpad = gst_pad_new_from_static_template (tmpl, "src"); sinkpad = gst_element_get_request_pad (element, name); fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK, "Could not link source and %s sink pads", GST_ELEMENT_NAME (element)); gst_pad_set_event_function (srcpad, src_event); gst_pad_set_active (srcpad, TRUE); return srcpad; } static void flush_setup (void) { agg = gst_check_setup_element ("aggregator"); agg_srcpad = gst_element_get_static_pad (agg, "src"); srcpad1 = setup_src_pad (agg, &srctemplate, "sink_0"); srcpad2 = setup_src_pad (agg, &srctemplate, "sink_1"); gst_pad_add_probe (agg_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH, downstream_probe_cb, NULL, NULL); gst_pad_set_event_function (agg_srcpad, agg_src_event); data1 = g_new0 (TestData, 1); data2 = g_new0 (TestData, 1); g_atomic_int_set (&flush_start_events, 0); g_atomic_int_set (&flush_stop_events, 0); gst_element_set_state (agg, GST_STATE_PLAYING); } static void flush_teardown (void) { gst_element_set_state (agg, GST_STATE_NULL); gst_object_unref (agg); gst_object_unref (agg_srcpad); gst_object_unref (srcpad1); gst_object_unref (srcpad2); g_free (data1); g_free (data2); } GST_START_TEST (test_flushing_seek_failure) { GstBuffer *buf1, *buf2; GThread *thread1, *thread2; GstEvent *event; /* Queue a buffer in agg:sink_1. Do a flushing seek and simulate one upstream * element failing to handle the seek (see src_event()). Check that the * flushing seek logic doesn't get triggered by checking that the buffer * queued on agg:sink_1 doesn't get flushed. */ /* queue a buffer in agg:sink_1 */ buf2 = gst_buffer_new_allocate (NULL, 1, NULL); GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND; data2->pad = srcpad2; data2->buffer = buf2; thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL); fail_unless_collected (FALSE); /* do the seek */ event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND); g_atomic_int_set (&fail_seek, TRUE); fail_if (gst_pad_send_event (agg_srcpad, event)); /* flush srcpad1 (pretending it's the upstream that didn't fail to seek) */ fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ())); fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE))); /* check that the flush events reached agg:src */ fail_unless_equals_int (flush_start_events, 1); fail_unless_equals_int (flush_stop_events, 1); /* push a buffer on agg:sink_0. This should trigger a collect since agg:sink_1 * should not have been flushed at this point */ buf1 = gst_buffer_new_allocate (NULL, 1, NULL); GST_BUFFER_TIMESTAMP (buf1) = 0; data1->pad = srcpad1; data1->buffer = buf1; thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL); fail_unless_collected (TRUE); collected = FALSE; /* at this point thread1 must have returned */ g_thread_join (thread1); /* push eos on agg:sink_0 so the buffer queued in agg:sink_1 is collected and * the pushing thread returns */ data1->pad = srcpad1; data1->event = gst_event_new_eos (); thread1 = g_thread_try_new ("gst-check", push_event, data1, NULL); fail_unless_collected (TRUE); g_thread_join (thread1); g_thread_join (thread2); } GST_END_TEST; GST_START_TEST (test_flushing_seek) { GstBuffer *buf1, *buf2; GThread *thread1, *thread2; GstEvent *event; /* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the * new flushing seek logic is triggered. On the first FLUSH_START call the * buffers queued in collectpads should get flushed. Only one FLUSH_START and * one FLUSH_STOP should be forwarded downstream. */ buf2 = gst_buffer_new_allocate (NULL, 1, NULL); GST_BUFFER_TIMESTAMP (buf2) = 0; data2->pad = srcpad2; data2->buffer = buf2; /* expect this buffer to be flushed */ data2->expected_result = GST_FLOW_FLUSHING; thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL); /* now do a successful flushing seek */ event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND); g_atomic_int_set (&fail_seek, FALSE); fail_unless (gst_pad_send_event (agg_srcpad, event)); /* flushing starts once one of the upstream elements sends the first * FLUSH_START */ fail_unless_equals_int (flush_start_events, 0); fail_unless_equals_int (flush_stop_events, 0); /* flush ogg:sink_0. This flushs collectpads, calls ::flush() and sends * FLUSH_START downstream */ fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ())); fail_unless_equals_int (flush_start_events, 1); fail_unless_equals_int (flush_stop_events, 0); /* the first FLUSH_STOP is forwarded downstream */ fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE))); fail_unless_equals_int (flush_start_events, 1); fail_unless_equals_int (flush_stop_events, 1); /* at this point even the other pad agg:sink_1 should be flushing so thread2 * should have stopped */ g_thread_join (thread2); /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify * that flushing completes once all the pads have been flushed */ buf1 = gst_buffer_new_allocate (NULL, 1, NULL); GST_BUFFER_TIMESTAMP (buf1) = GST_SECOND; data1->pad = srcpad1; data1->buffer = buf1; thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL); /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is * sent downstream */ gst_pad_push_event (srcpad2, gst_event_new_flush_start ()); gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE)); /* still, only one FLUSH_START and one FLUSH_STOP are forwarded downstream */ fail_unless_equals_int (flush_start_events, 1); fail_unless_equals_int (flush_stop_events, 1); /* EOS agg:sink_1 so the buffer queued in agg:sink_0 is collected */ data2->pad = srcpad2; data2->event = gst_event_new_eos (); thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL); fail_unless_collected (TRUE); /* these will return immediately as at this point the threads have been * unlocked and are finished */ g_thread_join (thread1); g_thread_join (thread2); } GST_END_TEST; static Suite * gst_collect_pads_suite (void) { Suite *suite; TCase *general, *buffers, *pipeline, *flush; gst_agregator_plugin_register (); suite = suite_create ("GstCollectPads"); general = tcase_create ("general"); suite_add_tcase (suite, general); tcase_add_checked_fixture (general, setup, teardown); tcase_add_test (general, test_pad_add_remove); tcase_add_test (general, test_collect); tcase_add_test (general, test_collect_eos); tcase_add_test (general, test_collect_twice); buffers = tcase_create ("buffers"); suite_add_tcase (suite, buffers); tcase_add_checked_fixture (buffers, setup_buffer_cb, teardown); tcase_add_test (buffers, test_collect_default); pipeline = tcase_create ("pipeline"); suite_add_tcase (suite, pipeline); tcase_add_test (pipeline, test_linear_pipeline); tcase_add_test (pipeline, test_branched_pipeline); flush = tcase_create ("flush"); suite_add_tcase (suite, flush); tcase_add_checked_fixture (flush, flush_setup, flush_teardown); tcase_add_test (flush, test_flushing_seek_failure); tcase_add_test (flush, test_flushing_seek); return suite; } GST_CHECK_MAIN (gst_collect_pads);