examples: fix a race condition when seeking

Fixes a race condition that caused pipeline deadlock during seeks.
This commit is contained in:
Josep Torra 2013-09-20 17:09:52 +02:00
parent 995e3af89a
commit aa21522afb

View file

@ -162,11 +162,13 @@ typedef struct
/* GStreamer related resources */ /* GStreamer related resources */
GstElement *pipeline; GstElement *pipeline;
GstElement *vsink;
GstEGLDisplay *gst_display; GstEGLDisplay *gst_display;
/* Interthread comunication */ /* Interthread comunication */
GAsyncQueue *queue; GAsyncQueue *queue;
GMutex *lock; GMutex *queue_lock;
GMutex *flow_lock;
GCond *cond; GCond *cond;
gboolean flushing; gboolean flushing;
GstMiniObject *popped_obj; GstMiniObject *popped_obj;
@ -954,14 +956,11 @@ update_image (APP_STATE_T * state, GstBuffer * buffer)
{ {
GstMemory *mem = gst_buffer_peek_memory (buffer, 0); GstMemory *mem = gst_buffer_peek_memory (buffer, 0);
g_mutex_lock (state->lock);
if (state->current_mem) { if (state->current_mem) {
gst_memory_unref (state->current_mem); gst_memory_unref (state->current_mem);
} }
state->current_mem = gst_memory_ref (mem); state->current_mem = gst_memory_ref (mem);
g_mutex_unlock (state->lock);
TRACE_VC_MEMORY_ONCE_FOR_ID ("before glEGLImageTargetTexture2DOES", gid0); TRACE_VC_MEMORY_ONCE_FOR_ID ("before glEGLImageTargetTexture2DOES", gid0);
glBindTexture (GL_TEXTURE_2D, state->tex); glBindTexture (GL_TEXTURE_2D, state->tex);
@ -969,8 +968,6 @@ update_image (APP_STATE_T * state, GstBuffer * buffer)
gst_egl_image_memory_get_image (mem)); gst_egl_image_memory_get_image (mem));
TRACE_VC_MEMORY_ONCE_FOR_ID ("after glEGLImageTargetTexture2DOES", gid1); TRACE_VC_MEMORY_ONCE_FOR_ID ("after glEGLImageTargetTexture2DOES", gid1);
render_scene (state);
} }
static void static void
@ -978,7 +975,8 @@ init_intercom (APP_STATE_T * state)
{ {
state->queue = state->queue =
g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref); g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
state->lock = g_mutex_new (); state->queue_lock = g_mutex_new ();
state->flow_lock = g_mutex_new ();
state->cond = g_cond_new (); state->cond = g_cond_new ();
} }
@ -990,8 +988,12 @@ terminate_intercom (APP_STATE_T * state)
g_async_queue_unref (state->queue); g_async_queue_unref (state->queue);
} }
if (state->lock) { if (state->queue_lock) {
g_mutex_free (state->lock); g_mutex_free (state->queue_lock);
}
if (state->flow_lock) {
g_mutex_free (state->flow_lock);
} }
if (state->cond) { if (state->cond) {
@ -1013,33 +1015,47 @@ flush_start (APP_STATE_T * state)
{ {
GstMiniObject *object = NULL; GstMiniObject *object = NULL;
g_mutex_lock (state->lock); g_mutex_lock (state->queue_lock);
state->flushing = TRUE; state->flushing = TRUE;
g_cond_broadcast (state->cond); g_cond_broadcast (state->cond);
g_mutex_unlock (state->lock); g_mutex_unlock (state->queue_lock);
g_mutex_lock (state->flow_lock);
while ((object = g_async_queue_try_pop (state->queue))) { while ((object = g_async_queue_try_pop (state->queue))) {
gst_mini_object_unref (object); gst_mini_object_unref (object);
} }
g_mutex_lock (state->queue_lock);
flush_internal (state); flush_internal (state);
state->popped_obj = NULL;
g_mutex_unlock (state->queue_lock);
g_mutex_unlock (state->flow_lock);
} }
static void static void
flush_stop (APP_STATE_T * state) flush_stop (APP_STATE_T * state)
{ {
g_mutex_lock (state->lock); GstMiniObject *object = NULL;
g_mutex_lock (state->queue_lock);
while ((object = GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) {
gst_mini_object_unref (object);
}
flush_internal (state);
state->popped_obj = NULL; state->popped_obj = NULL;
state->flushing = FALSE; state->flushing = FALSE;
g_mutex_unlock (state->lock); g_mutex_unlock (state->queue_lock);
} }
static void static void
pipeline_pause (APP_STATE_T * state) pipeline_pause (APP_STATE_T * state)
{ {
flush_start (state);
gst_element_set_state (state->pipeline, GST_STATE_PAUSED); gst_element_set_state (state->pipeline, GST_STATE_PAUSED);
flush_stop (state); }
static void
pipeline_play (APP_STATE_T * state)
{
gst_element_set_state (state->pipeline, GST_STATE_PLAYING);
} }
static gint64 static gint64
@ -1048,7 +1064,7 @@ pipeline_get_position (APP_STATE_T * state)
gint64 position = -1; gint64 position = -1;
if (state->pipeline) { if (state->pipeline) {
gst_element_query_position (state->pipeline, GST_FORMAT_TIME, &position); gst_element_query_position (state->vsink, GST_FORMAT_TIME, &position);
} }
return position; return position;
@ -1074,7 +1090,7 @@ pipeline_seek (APP_STATE_T * state, gint64 position)
event = gst_event_new_seek (1.0, event = gst_event_new_seek (1.0,
GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT,
GST_SEEK_TYPE_SET, position, GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE); GST_SEEK_TYPE_SET, position, GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE);
if (!gst_element_send_event (state->pipeline, event)) { if (!gst_element_send_event (state->vsink, event)) {
g_print ("seek failed\n"); g_print ("seek failed\n");
} }
} }
@ -1084,32 +1100,33 @@ static gboolean
handle_queued_objects (APP_STATE_T * state) handle_queued_objects (APP_STATE_T * state)
{ {
GstMiniObject *object = NULL; GstMiniObject *object = NULL;
gboolean done = FALSE;
if (g_async_queue_length (state->queue) == 0) { if (g_async_queue_length (state->queue) == 0) {
return FALSE; return FALSE;
} }
while ((object = g_async_queue_try_pop (state->queue))) { g_mutex_lock (state->queue_lock);
g_mutex_lock (state->lock);
if (state->flushing) { if (state->flushing) {
state->popped_obj = object;
gst_mini_object_unref (object);
g_cond_broadcast (state->cond); g_cond_broadcast (state->cond);
g_mutex_unlock (state->lock); done = TRUE;
continue;
} }
g_mutex_unlock (state->lock); g_mutex_unlock (state->queue_lock);
while (!done && (object = g_async_queue_try_pop (state->queue))) {
if (GST_IS_BUFFER (object)) { if (GST_IS_BUFFER (object)) {
GstBuffer *buffer = GST_BUFFER_CAST (object); GstBuffer *buffer = GST_BUFFER_CAST (object);
g_mutex_lock (state->queue_lock);
update_image (state, buffer); update_image (state, buffer);
render_scene (state);
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
g_mutex_unlock (state->queue_lock);
} else if (GST_IS_QUERY (object)) { } else if (GST_IS_QUERY (object)) {
GstQuery *query = GST_QUERY_CAST (object); GstQuery *query = GST_QUERY_CAST (object);
GstStructure *s = (GstStructure *) gst_query_get_structure (query); GstStructure *s = (GstStructure *) gst_query_get_structure (query);
g_mutex_lock (state->lock); g_mutex_lock (state->queue_lock);
if (gst_structure_has_name (s, "eglglessink-allocate-eglimage")) { if (gst_structure_has_name (s, "eglglessink-allocate-eglimage")) {
GstBuffer *buffer; GstBuffer *buffer;
@ -1139,7 +1156,7 @@ handle_queued_objects (APP_STATE_T * state)
state->popped_obj = object; state->popped_obj = object;
g_cond_broadcast (state->cond); g_cond_broadcast (state->cond);
g_mutex_unlock (state->lock); g_mutex_unlock (state->queue_lock);
return TRUE; return TRUE;
} else if (GST_IS_EVENT (object)) { } else if (GST_IS_EVENT (object)) {
@ -1147,7 +1164,7 @@ handle_queued_objects (APP_STATE_T * state)
g_print ("\nevent %p %s\n", event, g_print ("\nevent %p %s\n", event,
gst_event_type_get_name (GST_EVENT_TYPE (event))); gst_event_type_get_name (GST_EVENT_TYPE (event)));
g_mutex_lock (state->lock); g_mutex_lock (state->queue_lock);
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS: case GST_EVENT_EOS:
flush_internal (state); flush_internal (state);
@ -1155,14 +1172,14 @@ handle_queued_objects (APP_STATE_T * state)
default: default:
break; break;
} }
g_mutex_unlock (state->lock); g_mutex_unlock (state->queue_lock);
gst_event_unref (event); gst_event_unref (event);
} }
g_mutex_lock (state->lock); g_mutex_lock (state->queue_lock);
state->popped_obj = object; state->popped_obj = object;
g_cond_broadcast (state->cond); g_cond_broadcast (state->cond);
g_mutex_unlock (state->lock); g_mutex_unlock (state->queue_lock);
} }
return FALSE; return FALSE;
@ -1171,24 +1188,29 @@ handle_queued_objects (APP_STATE_T * state)
static gboolean static gboolean
queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous) queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous)
{ {
g_mutex_lock (state->lock); gboolean res = TRUE;
g_mutex_lock (state->flow_lock);
if (state->flushing) { if (state->flushing) {
g_mutex_unlock (state->lock);
gst_mini_object_unref (obj); gst_mini_object_unref (obj);
return FALSE; res = FALSE;
goto beach;
} }
g_async_queue_push (state->queue, obj); g_async_queue_push (state->queue, obj);
if (synchronous) { if (synchronous) {
/* Waiting for object to be handled */ /* Waiting for object to be handled */
g_mutex_lock (state->queue_lock);
do { do {
g_cond_wait (state->cond, state->lock); g_cond_wait (state->cond, state->queue_lock);
} while (!state->flushing && state->popped_obj != obj); } while (!state->flushing && state->popped_obj != obj);
g_mutex_unlock (state->queue_lock);
} }
g_mutex_unlock (state->lock);
return TRUE; beach:
g_mutex_unlock (state->flow_lock);
return res;
} }
static void static void
@ -1261,9 +1283,9 @@ query_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
return GST_PAD_PROBE_OK; return GST_PAD_PROBE_OK;
} }
g_mutex_lock (state->lock); g_mutex_lock (state->queue_lock);
pool = state->pool ? gst_object_ref (state->pool) : NULL; pool = state->pool ? gst_object_ref (state->pool) : NULL;
g_mutex_unlock (state->lock); g_mutex_unlock (state->queue_lock);
if (pool) { if (pool) {
GstCaps *pcaps; GstCaps *pcaps;
@ -1373,6 +1395,7 @@ init_playbin_player (APP_STATE_T * state, const gchar * uri)
"video-sink", vsink, "flags", "video-sink", vsink, "flags",
GST_PLAY_FLAG_NATIVE_VIDEO | GST_PLAY_FLAG_AUDIO, NULL); GST_PLAY_FLAG_NATIVE_VIDEO | GST_PLAY_FLAG_AUDIO, NULL);
state->vsink = gst_object_ref (vsink);
return TRUE; return TRUE;
} }
@ -1409,6 +1432,7 @@ init_parse_launch_player (APP_STATE_T * state, const gchar * spipeline)
gst_pad_add_probe (gst_element_get_static_pad (vsink, "sink"), gst_pad_add_probe (gst_element_get_static_pad (vsink, "sink"),
GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, query_cb, state, NULL); GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, query_cb, state, NULL);
state->vsink = gst_object_ref (vsink);
return TRUE; return TRUE;
} }
@ -1502,7 +1526,7 @@ handle_keyboard (GIOChannel * source, GIOCondition cond, APP_STATE_T * state)
pipeline_pause (state); pipeline_pause (state);
break; break;
case 'r': case 'r':
gst_element_set_state (state->pipeline, GST_STATE_PLAYING); pipeline_play (state);
break; break;
case 'l': case 'l':
report_position_duration (state); report_position_duration (state);
@ -1555,10 +1579,10 @@ buffering_cb (GstBus * bus, GstMessage * msg, APP_STATE_T * state)
gst_message_parse_buffering (msg, &percent); gst_message_parse_buffering (msg, &percent);
g_print ("Buffering %3d%%\r", percent); g_print ("Buffering %3d%%\r", percent);
if (percent < 100) if (percent < 100)
gst_element_set_state (state->pipeline, GST_STATE_PAUSED); pipeline_pause (state);
else { else {
g_print ("\n"); g_print ("\n");
gst_element_set_state (state->pipeline, GST_STATE_PLAYING); pipeline_play (state);
} }
} }
@ -1774,6 +1798,11 @@ done:
/* Release pipeline */ /* Release pipeline */
if (state->pipeline) { if (state->pipeline) {
gst_element_set_state (state->pipeline, GST_STATE_NULL); gst_element_set_state (state->pipeline, GST_STATE_NULL);
if (state->vsink) {
gst_object_unref (state->vsink);
state->vsink = NULL;
}
gst_object_unref (state->pipeline); gst_object_unref (state->pipeline);
} }