gstreamer/tests/check/elements/appsink.c
Guillaume Desmottes 0a657d6db5 appsink: add API to catch events
There is currently no way for users to receive incoming events from
appsink while keeping them properly serialized with the buffers flow.
This can be especially useful when application is injecting custom
downstream events into the pipeline and needs to know when they reached
appsink.

Solving this by adding a new signal notifying about new incoming events
and a set of action signals and method to pull those events.
The API is actually pulling the samples and events all together as they
are actually fetched from the same queue.
Having a specific API to pull only events would have the side effect of
discarding samples (and pulling samples would discard events) making
this API not convenient for users.

Partially fix #247

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1046>
2021-07-22 13:56:22 +02:00

935 lines
26 KiB
C

/* GStreamer
*
* Copyright (C) 2009, Axis Communications AB, LUND, SWEDEN
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/check/gstcheck.h>
#include <gst/app/gstappsink.h>
gint global_testdata;
static GstPad *mysrcpad;
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-gst-check")
);
static GstElement *
setup_appsink (void)
{
GstElement *appsink;
GstCaps *caps;
GST_DEBUG ("setup_appsink");
appsink = gst_check_setup_element ("appsink");
mysrcpad = gst_check_setup_src_pad (appsink, &srctemplate);
gst_pad_set_active (mysrcpad, TRUE);
caps = gst_caps_new_empty_simple ("application/x-gst-check");
gst_check_setup_events (mysrcpad, appsink, caps, GST_FORMAT_TIME);
gst_caps_unref (caps);
return appsink;
}
static void
cleanup_appsink (GstElement * appsink)
{
GST_DEBUG ("cleanup_appsink");
gst_check_teardown_src_pad (appsink);
gst_check_teardown_element (appsink);
}
/* This function does an operation to it's indata argument and returns it.
* The exact operation performed doesn't matter. Currently it multiplies with
* two, but it could do anything. The idea is to use the function to verify
* that the code calling it gets run. */
static gint
operate_on_data (gint indata)
{
return indata * 2;
}
static GstFlowReturn
callback_function (GstAppSink * appsink, gpointer callback_data)
{
global_testdata = operate_on_data (*((gint *) callback_data));
return GST_FLOW_OK;
}
static void
notify_function (gpointer callback_data)
{
global_testdata = operate_on_data (*((gint *) callback_data));
}
GST_START_TEST (test_non_clients)
{
GstElement *sink;
GstBuffer *buffer;
sink = setup_appsink ();
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
GST_DEBUG ("cleaning up appsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
/* Verifies that the handoff callback gets run one time when passing a buffer */
GST_START_TEST (test_handoff_callback)
{
GstElement *sink;
GstBuffer *buffer;
gint testdata;
GstAppSinkCallbacks callbacks = { NULL };
sink = setup_appsink ();
global_testdata = 0;
testdata = 5; /* Arbitrary value */
callbacks.new_sample = callback_function;
gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks, &testdata, NULL);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
buffer = gst_buffer_new_and_alloc (4);
/* Pushing a buffer should run our callback */
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
testdata = operate_on_data (testdata);
/* If both test_data & global_testdata have been operated on, we're happy. */
fail_unless (testdata == global_testdata);
GST_DEBUG ("cleaning up appsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
/* Verifies that the notify function gets executed when the sink is destroyed */
GST_START_TEST (test_notify0)
{
GstElement *sink;
gint testdata;
GstAppSinkCallbacks callbacks = { NULL };
sink = gst_element_factory_make ("appsink", NULL);
global_testdata = 0;
testdata = 17; /* Arbitrary value */
gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks,
&testdata, (*notify_function));
GST_DEBUG ("cleaning up appsink");
/* Destroying sink should call our notify_function */
gst_object_unref (sink);
testdata = operate_on_data (testdata);
/* If both test_data & global_testdata have been operated on, we're happy. */
fail_unless (testdata == global_testdata);
}
GST_END_TEST;
/* Verifies that the notify function gets executed when
* gst_app_sink_set_callbacks () gets called */
GST_START_TEST (test_notify1)
{
GstElement *sink;
gint testdata;
GstAppSinkCallbacks callbacks = { NULL };
sink = gst_element_factory_make ("appsink", NULL);
global_testdata = 0;
testdata = 42; /* Arbitrary value */
gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks,
&testdata, (*notify_function));
/* Setting new callbacks should trigger the destroy of the old data */
gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks, &testdata, NULL);
testdata = operate_on_data (testdata);
/* If both test_data & global_testdata have been operated on, we're happy. */
fail_unless (testdata == global_testdata);
GST_DEBUG ("cleaning up appsink");
gst_object_unref (sink);
}
GST_END_TEST;
static const gint values[] = { 1, 2, 4 };
static GstBufferList *
create_buffer_list (void)
{
guint len;
GstBuffer *buffer;
GstBufferList *mylist;
mylist = gst_buffer_list_new ();
fail_if (mylist == NULL);
len = gst_buffer_list_length (mylist);
fail_if (len != 0);
buffer = gst_buffer_new_and_alloc (sizeof (gint));
gst_buffer_fill (buffer, 0, &values[0], sizeof (gint));
gst_buffer_list_add (mylist, buffer);
buffer = gst_buffer_new_and_alloc (sizeof (gint));
gst_buffer_fill (buffer, 0, &values[1], sizeof (gint));
gst_buffer_list_add (mylist, buffer);
buffer = gst_buffer_new_and_alloc (sizeof (gint));
gst_buffer_fill (buffer, 0, &values[2], sizeof (gint));
gst_buffer_list_add (mylist, buffer);
return mylist;
}
static GstFlowReturn
callback_function_sample_fallback (GstAppSink * appsink, gpointer p_counter)
{
GstSample *sample;
GstBuffer *buf;
gint *p_int_counter = p_counter;
sample = gst_app_sink_pull_sample (appsink);
buf = gst_sample_get_buffer (sample);
fail_unless (GST_IS_BUFFER (buf));
/* buffer list has 3 buffers in two groups */
switch (*p_int_counter) {
case 0:
fail_unless_equals_int (gst_buffer_get_size (buf), sizeof (gint));
gst_check_buffer_data (buf, &values[0], sizeof (gint));
break;
case 1:
fail_unless_equals_int (gst_buffer_get_size (buf), sizeof (gint));
gst_check_buffer_data (buf, &values[1], sizeof (gint));
break;
case 2:
fail_unless_equals_int (gst_buffer_get_size (buf), sizeof (gint));
gst_check_buffer_data (buf, &values[2], sizeof (gint));
break;
default:
g_warn_if_reached ();
break;
}
gst_sample_unref (sample);
*p_int_counter += 1;
return GST_FLOW_OK;
}
static GstFlowReturn
callback_function_sample (GstAppSink * appsink, gpointer p_counter)
{
GstSample *sample;
GstBufferList *list;
gint *p_int_counter = p_counter;
guint len;
gint i;
sample = gst_app_sink_pull_sample (appsink);
list = gst_sample_get_buffer_list (sample);
fail_unless (GST_IS_BUFFER_LIST (list));
len = gst_buffer_list_length (list);
fail_unless_equals_int (len, 3);
for (i = 0; i < len; i++) {
GstBuffer *buf = gst_buffer_list_get (list, i);
fail_unless_equals_int (gst_buffer_get_size (buf), sizeof (gint));
gst_check_buffer_data (buf, &values[i], sizeof (gint));
}
gst_sample_unref (sample);
*p_int_counter += 1;
return GST_FLOW_OK;
}
GST_START_TEST (test_buffer_list_fallback)
{
GstElement *sink;
GstBufferList *list;
GstAppSinkCallbacks callbacks = { NULL };
gint counter = 0;
gboolean buffer_list_support;
sink = setup_appsink ();
/* verify that the buffer list support is disabled per default */
g_object_get (sink, "buffer-list", &buffer_list_support, NULL);
fail_unless (buffer_list_support == FALSE);
callbacks.new_sample = callback_function_sample_fallback;
gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks, &counter, NULL);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
list = create_buffer_list ();
fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
fail_unless_equals_int (counter, 3);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_buffer_list_support)
{
GstElement *sink;
GstBufferList *list;
GstAppSinkCallbacks callbacks = { NULL };
gint counter = 0;
sink = setup_appsink ();
/* enable buffer list support */
g_object_set (sink, "buffer-list", TRUE, NULL);
callbacks.new_sample = callback_function_sample;
gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks, &counter, NULL);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
list = create_buffer_list ();
fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
fail_unless_equals_int (counter, 1);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_buffer_list_fallback_signal)
{
GstElement *sink;
GstBufferList *list;
gint counter = 0;
sink = setup_appsink ();
/* C calling convention to the rescue.. */
g_signal_connect (sink, "new-sample",
G_CALLBACK (callback_function_sample_fallback), &counter);
g_object_set (sink, "emit-signals", TRUE, NULL);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
list = create_buffer_list ();
fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
fail_unless_equals_int (counter, 3);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_buffer_list_signal)
{
GstElement *sink;
GstBufferList *list;
gint counter = 0;
sink = setup_appsink ();
/* enable buffer list support */
g_object_set (sink, "buffer-list", TRUE, NULL);
/* C calling convention to the rescue.. */
g_signal_connect (sink, "new-sample", G_CALLBACK (callback_function_sample),
&counter);
g_object_set (sink, "emit-signals", TRUE, NULL);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
list = create_buffer_list ();
fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
fail_unless_equals_int (counter, 1);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_segment)
{
GstElement *sink;
GstSegment segment;
GstBuffer *buffer;
GstSample *pulled_preroll;
GstSample *pulled_sample;
sink = setup_appsink ();
gst_segment_init (&segment, GST_FORMAT_TIME);
segment.start = 2 * GST_SECOND;
fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_segment (&segment)));
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
g_signal_emit_by_name (sink, "pull-preroll", &pulled_preroll);
fail_unless (gst_segment_is_equal (&segment,
gst_sample_get_segment (pulled_preroll)));
gst_sample_unref (pulled_preroll);
g_signal_emit_by_name (sink, "pull-sample", &pulled_sample);
fail_unless (gst_segment_is_equal (&segment,
gst_sample_get_segment (pulled_sample)));
gst_sample_unref (pulled_sample);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_pull_with_timeout)
{
GstElement *sink;
GstBuffer *buffer;
GstSample *s;
guint64 t1, tdiff;
sink = setup_appsink ();
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
/* Check that it actually waits for a bit */
t1 = gst_util_get_timestamp ();
s = gst_app_sink_try_pull_preroll (GST_APP_SINK (sink), GST_SECOND / 20);
tdiff = gst_util_get_timestamp () - t1;
GST_LOG ("tdiff: %" GST_TIME_FORMAT, GST_TIME_ARGS (tdiff));
fail_unless (s == NULL);
fail_unless (tdiff > (GST_SECOND / (20 * 2)));
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
s = gst_app_sink_try_pull_preroll (GST_APP_SINK (sink), GST_SECOND / 20);
fail_unless (s != NULL);
gst_sample_unref (s);
s = gst_app_sink_try_pull_sample (GST_APP_SINK (sink), 500 * GST_SECOND);
fail_unless (s != NULL);
gst_sample_unref (s);
/* No waiting */
s = gst_app_sink_try_pull_sample (GST_APP_SINK (sink), 0);
fail_unless (s == NULL);
/* Check that it actually waits for a bit */
t1 = gst_util_get_timestamp ();
s = gst_app_sink_try_pull_sample (GST_APP_SINK (sink), GST_SECOND / 20);
tdiff = gst_util_get_timestamp () - t1;
GST_LOG ("tdiff: %" GST_TIME_FORMAT, GST_TIME_ARGS (tdiff));
fail_unless (s == NULL);
fail_unless (tdiff > (GST_SECOND / (20 * 2)));
/* No waiting, with buffer pending */
buffer = gst_buffer_new_and_alloc (5);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
s = gst_app_sink_try_pull_sample (GST_APP_SINK (sink), 0);
fail_unless (s != NULL);
gst_sample_unref (s);
/* With timeout, with buffer pending */
buffer = gst_buffer_new_and_alloc (6);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
s = gst_app_sink_try_pull_sample (GST_APP_SINK (sink), GST_SECOND / 20);
fail_unless (s != NULL);
gst_sample_unref (s);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_pull_preroll)
{
GstElement *sink = NULL;
GstBuffer *buffer = NULL;
GstSample *pulled_preroll = NULL;
sink = setup_appsink ();
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
pulled_preroll = gst_app_sink_pull_preroll (GST_APP_SINK (sink));
fail_unless (pulled_preroll);
gst_sample_unref (pulled_preroll);
fail_if (gst_app_sink_try_pull_preroll (GST_APP_SINK (sink), 0));
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_do_not_care_preroll)
{
GstElement *sink = NULL;
GstBuffer *buffer = NULL;
GstSample *pulled_sample = NULL;
sink = setup_appsink ();
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
pulled_sample = gst_app_sink_pull_sample (GST_APP_SINK (sink));
fail_unless (pulled_sample);
gst_sample_unref (pulled_sample);
fail_if (gst_app_sink_try_pull_preroll (GST_APP_SINK (sink), 0));
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
typedef struct
{
GMutex mutex;
GCond cond;
GstAppSink *appsink;
gboolean check;
} TestQueryDrainContext;
#define TEST_QUERY_DRAIN_CONTEXT_INIT { { 0, }, }
static gpointer
my_app_thread (TestQueryDrainContext * ctx)
{
GstSample *pulled_preroll = NULL;
GstSample *pulled_sample = NULL;
/* Wait for the query to reach appsink. */
g_mutex_lock (&ctx->mutex);
while (!ctx->check)
g_cond_wait (&ctx->cond, &ctx->mutex);
g_mutex_unlock (&ctx->mutex);
pulled_preroll = gst_app_sink_pull_preroll (ctx->appsink);
fail_unless (pulled_preroll);
gst_sample_unref (pulled_preroll);
pulled_sample = gst_app_sink_pull_sample (ctx->appsink);
fail_unless (pulled_sample);
gst_sample_unref (pulled_sample);
pulled_sample = gst_app_sink_pull_sample (ctx->appsink);
fail_unless (pulled_sample);
gst_sample_unref (pulled_sample);
return NULL;
}
static GstPadProbeReturn
query_handler (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
TestQueryDrainContext *ctx = (TestQueryDrainContext *) user_data;
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_DRAIN:
{
if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_PUSH) {
g_mutex_lock (&ctx->mutex);
ctx->check = TRUE;
g_cond_signal (&ctx->cond);
g_mutex_unlock (&ctx->mutex);
} else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_PULL) {
/* Check that there is no pending buffers when drain query is done. */
fail_if (gst_app_sink_try_pull_preroll (ctx->appsink, 0));
fail_if (gst_app_sink_try_pull_sample (ctx->appsink, 0));
}
break;
}
default:
break;
}
return GST_PAD_PROBE_OK;
}
GST_START_TEST (test_query_drain)
{
GstElement *sink = NULL;
GstBuffer *buffer = NULL;
GstPad *sinkpad = NULL;
GThread *thread = NULL;
GstQuery *query = NULL;
TestQueryDrainContext ctx = TEST_QUERY_DRAIN_CONTEXT_INIT;
sink = setup_appsink ();
g_mutex_init (&ctx.mutex);
g_cond_init (&ctx.cond);
ctx.appsink = GST_APP_SINK (sink);
ctx.check = FALSE;
sinkpad = gst_element_get_static_pad (sink, "sink");
gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
query_handler, (gpointer) & ctx, NULL);
gst_object_unref (sinkpad);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
thread = g_thread_new ("appthread", (GThreadFunc) my_app_thread, &ctx);
fail_unless (thread != NULL);
query = gst_query_new_drain ();
fail_unless (gst_pad_peer_query (mysrcpad, query));
gst_query_unref (query);
g_thread_join (thread);
g_mutex_clear (&ctx.mutex);
g_cond_clear (&ctx.cond);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_pull_sample_refcounts)
{
GstElement *sink;
GstBuffer *buffer;
GstSample *s1, *s2, *s3;
sink = setup_appsink ();
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
s1 = gst_app_sink_pull_sample (GST_APP_SINK (sink));
fail_unless (s1 != NULL);
fail_unless (gst_buffer_get_size (gst_sample_get_buffer (s1)) == 4);
gst_sample_unref (s1);
buffer = gst_buffer_new_and_alloc (6);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
s2 = gst_app_sink_pull_sample (GST_APP_SINK (sink));
fail_unless (s2 != NULL);
fail_unless (gst_buffer_get_size (gst_sample_get_buffer (s2)) == 6);
/* We unreffed s1, appsink should thus reuse the same sample,
* avoiding an extra allocation */
fail_unless (s1 == s2);
buffer = gst_buffer_new_and_alloc (8);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
s3 = gst_app_sink_pull_sample (GST_APP_SINK (sink));
fail_unless (s3 != NULL);
fail_unless (gst_buffer_get_size (gst_sample_get_buffer (s2)) == 6);
fail_unless (gst_buffer_get_size (gst_sample_get_buffer (s3)) == 8);
/* We didn't unref s2, appsink should thus have created a new sample */
fail_unless (s2 != s3);
gst_sample_unref (s2);
gst_sample_unref (s3);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
static gboolean
new_event_cb (GstAppSink * appsink, gpointer callback_data)
{
guint *new_event_count = callback_data;
*new_event_count += 1;
return TRUE;
}
/* Verifies that the event callback is called */
GST_START_TEST (test_event_callback)
{
GstElement *sink;
GstPad *sinkpad;
GstBuffer *buffer;
guint new_event_count;
GstAppSinkCallbacks callbacks = { NULL };
GstMiniObject *object;
GstAppSink *app_sink;
sink = setup_appsink ();
app_sink = GST_APP_SINK (sink);
callbacks.new_event = new_event_cb;
gst_app_sink_set_callbacks (app_sink, &callbacks, &new_event_count, NULL);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
/* push a buffer so pending events are pushed */
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
/* flush pending events from the queue */
while ((object = gst_app_sink_try_pull_object (app_sink, 0)))
gst_mini_object_unref (object);
new_event_count = 0;
/* push a buffer */
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
/* push custom event */
sinkpad = gst_element_get_static_pad (sink, "sink");
fail_unless (sinkpad);
fail_unless (gst_pad_send_event (sinkpad,
gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
gst_structure_new ("custom", NULL, NULL))));
fail_unless_equals_int (new_event_count, 1);
gst_object_unref (sinkpad);
/* push a second buffer */
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
/* check if the samples and events are pulled in the right order */
object = gst_app_sink_pull_object (app_sink);
fail_unless (GST_IS_SAMPLE (object));
gst_mini_object_unref (object);
object = gst_app_sink_pull_object (app_sink);
fail_unless (GST_IS_EVENT (object));
fail_unless_equals_int (GST_EVENT_TYPE (object), GST_EVENT_CUSTOM_DOWNSTREAM);
gst_mini_object_unref (object);
object = gst_app_sink_pull_object (app_sink);
fail_unless (GST_IS_SAMPLE (object));
gst_mini_object_unref (object);
GST_DEBUG ("cleaning up appsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_event_signals)
{
GstElement *sink;
GstPad *sinkpad;
GstBuffer *buffer;
GstMiniObject *object;
GstAppSink *app_sink;
guint new_event_count = 0;
sink = setup_appsink ();
app_sink = GST_APP_SINK (sink);
g_object_set (sink, "emit-signals", TRUE, NULL);
g_signal_connect (sink, "new-serialized-event", G_CALLBACK (new_event_cb),
&new_event_count);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
/* push a buffer so pending events are pushed */
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
/* flush pending events from the queue */
while ((object = gst_app_sink_try_pull_object (app_sink, 0)))
gst_mini_object_unref (object);
new_event_count = 0;
/* push a buffer */
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
/* push custom event */
sinkpad = gst_element_get_static_pad (sink, "sink");
fail_unless (sinkpad);
fail_unless (gst_pad_send_event (sinkpad,
gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
gst_structure_new ("custom", NULL, NULL))));
fail_unless_equals_int (new_event_count, 1);
gst_object_unref (sinkpad);
/* push a second buffer */
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
/* check if the buffers and events are pulled in the right order */
g_signal_emit_by_name (sink, "try-pull-object", GST_CLOCK_TIME_NONE, &object);
fail_unless (GST_IS_SAMPLE (object));
gst_mini_object_unref (object);
g_signal_emit_by_name (sink, "try-pull-object", GST_CLOCK_TIME_NONE, &object);
fail_unless (GST_IS_EVENT (object));
fail_unless_equals_int (GST_EVENT_TYPE (object), GST_EVENT_CUSTOM_DOWNSTREAM);
gst_mini_object_unref (object);
g_signal_emit_by_name (sink, "try-pull-object", GST_CLOCK_TIME_NONE, &object);
fail_unless (GST_IS_SAMPLE (object));
gst_mini_object_unref (object);
GST_DEBUG ("cleaning up appsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
/* try pulling events when appsink is in PAUSED */
GST_START_TEST (test_event_paused)
{
GstElement *sink;
guint new_event_count = 0;
GstAppSinkCallbacks callbacks = { NULL };
GstMiniObject *object;
GstAppSink *app_sink;
GstCaps *caps;
sink = setup_appsink ();
app_sink = GST_APP_SINK (sink);
callbacks.new_event = new_event_cb;
gst_app_sink_set_callbacks (app_sink, &callbacks, &new_event_count, NULL);
ASSERT_SET_STATE (sink, GST_STATE_PAUSED, GST_STATE_CHANGE_ASYNC);
/* push a couple of events while in PAUSED */
gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
caps = gst_caps_new_simple ("audio/x-raw", NULL, NULL);
gst_pad_push_event (mysrcpad, gst_event_new_caps (caps));
gst_caps_unref (caps);
fail_unless_equals_int (new_event_count, 2);
/* check pulled events */
object = gst_app_sink_pull_object (app_sink);
fail_unless (GST_IS_EVENT (object));
fail_unless_equals_int (GST_EVENT_TYPE (object), GST_EVENT_STREAM_START);
gst_mini_object_unref (object);
object = gst_app_sink_pull_object (app_sink);
fail_unless (GST_IS_EVENT (object));
fail_unless_equals_int (GST_EVENT_TYPE (object), GST_EVENT_CAPS);
gst_mini_object_unref (object);
object = gst_app_sink_try_pull_object (app_sink, 0);
fail_if (object);
GST_DEBUG ("cleaning up appsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
static Suite *
appsink_suite (void)
{
Suite *s = suite_create ("appsink");
TCase *tc_chain = tcase_create ("general");
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_non_clients);
tcase_add_test (tc_chain, test_handoff_callback);
tcase_add_test (tc_chain, test_notify0);
tcase_add_test (tc_chain, test_notify1);
tcase_add_test (tc_chain, test_buffer_list_fallback);
tcase_add_test (tc_chain, test_buffer_list_support);
tcase_add_test (tc_chain, test_buffer_list_fallback_signal);
tcase_add_test (tc_chain, test_buffer_list_signal);
tcase_add_test (tc_chain, test_segment);
tcase_add_test (tc_chain, test_pull_with_timeout);
tcase_add_test (tc_chain, test_query_drain);
tcase_add_test (tc_chain, test_pull_preroll);
tcase_add_test (tc_chain, test_do_not_care_preroll);
tcase_add_test (tc_chain, test_pull_sample_refcounts);
tcase_add_test (tc_chain, test_event_callback);
tcase_add_test (tc_chain, test_event_signals);
tcase_add_test (tc_chain, test_event_paused);
return s;
}
GST_CHECK_MAIN (appsink);