appsrc: fix deadlock setting pipeline in NULL state with block=true

This commit is contained in:
Nicola Murino 2013-03-12 10:32:44 +01:00 committed by Sebastian Dröge
parent f05a95ea3c
commit 2a1dc7ca56
2 changed files with 156 additions and 0 deletions

View file

@ -746,6 +746,7 @@ gst_app_src_stop (GstBaseSrc * bsrc)
priv->flushing = TRUE; priv->flushing = TRUE;
priv->started = FALSE; priv->started = FALSE;
gst_app_src_flush_queued (appsrc); gst_app_src_flush_queued (appsrc);
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
return TRUE; return TRUE;

View file

@ -20,6 +20,7 @@
#include <gst/check/gstcheck.h> #include <gst/check/gstcheck.h>
#include <gst/app/gstappsrc.h> #include <gst/app/gstappsrc.h>
#include <gst/app/gstappsink.h>
#define SAMPLE_CAPS "application/x-gst-check-test" #define SAMPLE_CAPS "application/x-gst-check-test"
@ -110,6 +111,158 @@ GST_START_TEST (test_appsrc_non_null_caps)
GST_END_TEST; GST_END_TEST;
static GstAppSinkCallbacks app_callbacks;
typedef struct
{
GMainLoop *loop;
GstElement *source;
GstElement *sink;
} ProgramData;
static GstFlowReturn
on_new_sample_from_source (GstAppSink * elt, gpointer user_data)
{
ProgramData *data = (ProgramData *) user_data;
GstSample *sample;
GstBuffer *buffer;
GstElement *source;
sample = gst_app_sink_pull_sample (GST_APP_SINK (elt));
buffer = gst_sample_get_buffer (sample);
source = gst_bin_get_by_name (GST_BIN (data->sink), "testsource");
gst_app_src_push_buffer (GST_APP_SRC (source), gst_buffer_ref (buffer));
gst_sample_unref (sample);
g_object_unref (source);
return GST_FLOW_OK;
}
/* called when we get a GstMessage from the source pipeline when we get EOS, we
* notify the appsrc of it. */
static gboolean
on_source_message (GstBus * bus, GstMessage * message, ProgramData * data)
{
GstElement *source;
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
source = gst_bin_get_by_name (GST_BIN (data->sink), "testsource");
fail_unless (gst_app_src_end_of_stream (GST_APP_SRC (source)) ==
GST_FLOW_OK);
break;
case GST_MESSAGE_ERROR:
g_main_loop_quit (data->loop);
break;
default:
break;
}
return TRUE;
}
static gboolean
on_sink_message (GstBus * bus, GstMessage * message, ProgramData * data)
{
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
g_main_loop_quit (data->loop);
break;
case GST_MESSAGE_ERROR:
ASSERT_SET_STATE (data->sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
ASSERT_SET_STATE (data->source, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
g_main_loop_quit (data->loop);
break;
default:
break;
}
return TRUE;
}
static gboolean
error_timeout (ProgramData * data)
{
GstBus *bus;
bus = gst_element_get_bus (data->sink);
gst_bus_post (bus, gst_message_new_error (GST_OBJECT (data->sink), NULL,
"test error"));
gst_object_unref (bus);
return FALSE;
}
/*
* appsink => appsrc pipelines executed 100 times:
* - appsink pipeline has sync=false
* - appsrc pipeline has sync=true
* - appsrc has block=true
* after 1 second an error message is posted on appsink pipeline bus
* when the error is received the appsrc pipeline is set to NULL
* and then the appsink pipeline is
* set to NULL too, this must not deadlock
*/
GST_START_TEST (test_appsrc_block_deadlock)
{
int i = 0;
int num_iteration = 100;
while (i < num_iteration) {
ProgramData *data = NULL;
GstBus *bus = NULL;
GstElement *testsink = NULL;
gint tout;
data = g_new0 (ProgramData, 1);
data->loop = g_main_loop_new (NULL, FALSE);
data->source =
gst_parse_launch ("videotestsrc ! appsink sync=false name=testsink",
NULL);
fail_unless (data->source != NULL);
bus = gst_element_get_bus (data->source);
gst_bus_add_watch (bus, (GstBusFunc) on_source_message, data);
gst_object_unref (bus);
app_callbacks.new_sample = on_new_sample_from_source;
testsink = gst_bin_get_by_name (GST_BIN (data->source), "testsink");
gst_app_sink_set_callbacks (GST_APP_SINK_CAST (testsink), &app_callbacks,
data, NULL);
gst_object_unref (testsink);
data->sink =
gst_parse_launch
("appsrc name=testsource block=1 max-bytes=1000 is-live=true ! fakesink sync=true",
NULL);
fail_unless (data->sink != NULL);
bus = gst_element_get_bus (data->sink);
gst_bus_add_watch (bus, (GstBusFunc) on_sink_message, data);
gst_object_unref (bus);
tout = g_timeout_add (150, (GSourceFunc) error_timeout, data);
ASSERT_SET_STATE (data->sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
ASSERT_SET_STATE (data->source, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
g_main_loop_run (data->loop);
ASSERT_SET_STATE (data->sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
ASSERT_SET_STATE (data->source, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
g_source_remove (tout);
gst_object_unref (data->source);
gst_object_unref (data->sink);
g_main_loop_unref (data->loop);
g_free (data);
i++;
g_print ("appsrc deadlock test iteration number %d/%d\n", i, num_iteration);
}
}
GST_END_TEST;
static Suite * static Suite *
appsrc_suite (void) appsrc_suite (void)
@ -118,7 +271,9 @@ appsrc_suite (void)
TCase *tc_chain = tcase_create ("general"); TCase *tc_chain = tcase_create ("general");
tcase_add_test (tc_chain, test_appsrc_non_null_caps); tcase_add_test (tc_chain, test_appsrc_non_null_caps);
tcase_add_test (tc_chain, test_appsrc_block_deadlock);
tcase_set_timeout (tc_chain, 20);
suite_add_tcase (s, tc_chain); suite_add_tcase (s, tc_chain);
return s; return s;