diff --git a/gst-libs/gst/app/gstappsink.c b/gst-libs/gst/app/gstappsink.c index b3c110ec8d..87ae643e22 100644 --- a/gst-libs/gst/app/gstappsink.c +++ b/gst-libs/gst/app/gstappsink.c @@ -73,6 +73,13 @@ #include "gstappsink.h" +typedef enum +{ + NOONE_WAITING, + STREAM_WAITING, /* streaming thread is waiting for application thread */ + APP_WAITING, /* application thread is waiting for streaming thread */ +} GstAppSinkWaitStatus; + struct _GstAppSinkPrivate { GstCaps *caps; @@ -81,6 +88,7 @@ struct _GstAppSinkPrivate guint max_buffers; gboolean drop; gboolean wait_on_eos; + GstAppSinkWaitStatus wait_status; GCond cond; GMutex mutex; @@ -459,6 +467,7 @@ gst_app_sink_init (GstAppSink * appsink) priv->drop = DEFAULT_PROP_DROP; priv->wait_on_eos = DEFAULT_PROP_WAIT_ON_EOS; priv->buffer_lists_supported = DEFAULT_PROP_BUFFER_LIST; + priv->wait_status = NOONE_WAITING; } static void @@ -632,6 +641,7 @@ gst_app_sink_start (GstBaseSink * psink) g_mutex_lock (&priv->mutex); GST_DEBUG_OBJECT (appsink, "starting"); + priv->wait_status = NOONE_WAITING; priv->flushing = FALSE; priv->started = TRUE; gst_segment_init (&priv->preroll_segment, GST_FORMAT_TIME); @@ -651,6 +661,7 @@ gst_app_sink_stop (GstBaseSink * psink) GST_DEBUG_OBJECT (appsink, "stopping"); priv->flushing = TRUE; priv->started = FALSE; + priv->wait_status = NOONE_WAITING; gst_app_sink_flush_unlocked (appsink); gst_buffer_replace (&priv->preroll_buffer, NULL); gst_caps_replace (&priv->preroll_caps, NULL); @@ -707,8 +718,11 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event) * Otherwise we might signal EOS before all buffers are * consumed, which is a bit confusing for the application */ - while (priv->num_buffers > 0 && !priv->flushing && priv->wait_on_eos) + while (priv->num_buffers > 0 && !priv->flushing && priv->wait_on_eos) { + priv->wait_status = STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); + priv->wait_status = NOONE_WAITING; + } if (priv->flushing) emit = FALSE; g_mutex_unlock (&priv->mutex); @@ -755,7 +769,9 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer) GST_DEBUG_OBJECT (appsink, "setting preroll buffer %p", buffer); gst_buffer_replace (&priv->preroll_buffer, buffer); - g_cond_signal (&priv->cond); + if (priv->wait_status == APP_WAITING) + g_cond_signal (&priv->cond); + emit = priv->emit_signals; g_mutex_unlock (&priv->mutex); @@ -869,7 +885,10 @@ restart: } /* wait for a buffer to be removed or flush */ + priv->wait_status = STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); + priv->wait_status = NOONE_WAITING; + if (priv->flushing) goto flushing; } @@ -877,7 +896,10 @@ restart: /* we need to ref the buffer/list when pushing it in the queue */ gst_queue_array_push_tail (priv->queue, gst_mini_object_ref (data)); priv->num_buffers++; - g_cond_signal (&priv->cond); + + if (priv->wait_status == APP_WAITING) + g_cond_signal (&priv->cond); + emit = priv->emit_signals; g_mutex_unlock (&priv->mutex); @@ -972,8 +994,11 @@ gst_app_sink_query (GstBaseSink * bsink, GstQuery * query) { g_mutex_lock (&priv->mutex); GST_DEBUG_OBJECT (appsink, "waiting buffers to be consumed"); - while (priv->num_buffers > 0 || priv->preroll_buffer) + while (priv->num_buffers > 0 || priv->preroll_buffer) { + priv->wait_status = STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); + priv->wait_status = NOONE_WAITING; + } g_mutex_unlock (&priv->mutex); ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query); break; @@ -1486,12 +1511,14 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout) /* nothing to return, wait */ GST_DEBUG_OBJECT (appsink, "waiting for the preroll buffer"); + priv->wait_status = APP_WAITING; if (timeout_valid) { if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time)) goto expired; } else { g_cond_wait (&priv->cond, &priv->mutex); } + priv->wait_status = NOONE_WAITING; } sample = gst_sample_new (priv->preroll_buffer, priv->preroll_caps, @@ -1506,6 +1533,7 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout) expired: { GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL"); + priv->wait_status = NOONE_WAITING; g_mutex_unlock (&priv->mutex); return NULL; } @@ -1581,12 +1609,14 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout) /* nothing to return, wait */ GST_DEBUG_OBJECT (appsink, "waiting for a buffer"); + priv->wait_status = APP_WAITING; if (timeout_valid) { if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time)) goto expired; } else { g_cond_wait (&priv->cond, &priv->mutex); } + priv->wait_status = NOONE_WAITING; } obj = dequeue_buffer (appsink); @@ -1601,7 +1631,9 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout) } gst_mini_object_unref (obj); - g_cond_signal (&priv->cond); + if (priv->wait_status == STREAM_WAITING) + g_cond_signal (&priv->cond); + g_mutex_unlock (&priv->mutex); return sample; @@ -1610,6 +1642,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout) expired: { GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL"); + priv->wait_status = NOONE_WAITING; g_mutex_unlock (&priv->mutex); return NULL; }