appsink: on drain wait for buffers to be consumed

So that an upstream element can claim all buffers to return to its buffer pool.

Added unit test 'test_query_drain'
  make elements/appsink.check

https://bugzilla.gnome.org/show_bug.cgi?id=786739
This commit is contained in:
Julien Isorce 2017-08-24 10:02:31 +01:00
parent 7b1056b946
commit fc86194595
2 changed files with 121 additions and 0 deletions

View file

@ -965,9 +965,21 @@ gst_app_sink_getcaps (GstBaseSink * psink, GstCaps * filter)
static gboolean static gboolean
gst_app_sink_query (GstBaseSink * bsink, GstQuery * query) gst_app_sink_query (GstBaseSink * bsink, GstQuery * query)
{ {
GstAppSink *appsink = GST_APP_SINK_CAST (bsink);
GstAppSinkPrivate *priv = appsink->priv;
gboolean ret; gboolean ret;
switch (GST_QUERY_TYPE (query)) { switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_DRAIN:
{
g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsink, "waiting buffers to be consumed");
while (priv->num_buffers > 0 && priv->preroll_buffer)
g_cond_wait (&priv->cond, &priv->mutex);
g_mutex_unlock (&priv->mutex);
ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
break;
}
case GST_QUERY_SEEKING:{ case GST_QUERY_SEEKING:{
GstFormat fmt; GstFormat fmt;

View file

@ -553,6 +553,114 @@ GST_START_TEST (test_do_not_care_preroll)
GST_END_TEST; GST_END_TEST;
typedef struct
{
GMutex mutex;
GCond cond;
GstAppSink *appsink;
gboolean check;
} TestQeuryDrainContext;
static gpointer
my_app_thread (TestQeuryDrainContext * ctx)
{
GstSample *pulled_preroll = NULL;
GstSample *pulled_sample = NULL;
/* Wait for the query to reach appsink. */
g_mutex_lock (&ctx->mutex);
while (!ctx->check)
g_cond_wait (&ctx->cond, &ctx->mutex);
g_mutex_unlock (&ctx->mutex);
pulled_preroll = gst_app_sink_pull_preroll (ctx->appsink);
fail_unless (pulled_preroll);
gst_sample_unref (pulled_preroll);
pulled_sample = gst_app_sink_pull_sample (ctx->appsink);
fail_unless (pulled_sample);
gst_sample_unref (pulled_sample);
pulled_sample = gst_app_sink_pull_sample (ctx->appsink);
fail_unless (pulled_sample);
gst_sample_unref (pulled_sample);
return NULL;
}
static GstPadProbeReturn
query_handler (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
TestQeuryDrainContext *ctx = (TestQeuryDrainContext *) user_data;
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_DRAIN:
{
if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_PUSH) {
g_mutex_lock (&ctx->mutex);
ctx->check = TRUE;
g_cond_signal (&ctx->cond);
g_mutex_unlock (&ctx->mutex);
} else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_PULL) {
/* Check that there is no pending buffers when drain query is done. */
fail_if (gst_app_sink_try_pull_preroll (ctx->appsink, 0));
fail_if (gst_app_sink_try_pull_sample (ctx->appsink, 0));
}
break;
}
default:
break;
}
return GST_PAD_PROBE_OK;
}
GST_START_TEST (test_query_drain)
{
GstElement *sink = NULL;
GstBuffer *buffer = NULL;
GstPad *sinkpad = NULL;
GThread *thread = NULL;
GstQuery *query = NULL;
TestQeuryDrainContext ctx = { 0 };
sink = setup_appsink ();
g_mutex_init (&ctx.mutex);
g_cond_init (&ctx.cond);
ctx.appsink = GST_APP_SINK (sink);
ctx.check = FALSE;
sinkpad = gst_element_get_static_pad (sink, "sink");
gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
query_handler, (gpointer) & ctx, NULL);
gst_object_unref (sinkpad);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
buffer = gst_buffer_new_and_alloc (4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
thread = g_thread_new ("appthread", (GThreadFunc) my_app_thread, &ctx);
fail_unless (thread != NULL);
query = gst_query_new_drain ();
fail_unless (gst_pad_peer_query (mysrcpad, query));
gst_query_unref (query);
g_thread_join (thread);
g_mutex_clear (&ctx.mutex);
g_cond_clear (&ctx.cond);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
static Suite * static Suite *
appsink_suite (void) appsink_suite (void)
{ {
@ -570,6 +678,7 @@ appsink_suite (void)
tcase_add_test (tc_chain, test_buffer_list_signal); tcase_add_test (tc_chain, test_buffer_list_signal);
tcase_add_test (tc_chain, test_segment); tcase_add_test (tc_chain, test_segment);
tcase_add_test (tc_chain, test_pull_with_timeout); tcase_add_test (tc_chain, test_pull_with_timeout);
tcase_add_test (tc_chain, test_query_drain);
tcase_add_test (tc_chain, test_pull_preroll); tcase_add_test (tc_chain, test_pull_preroll);
tcase_add_test (tc_chain, test_do_not_care_preroll); tcase_add_test (tc_chain, test_do_not_care_preroll);