diff --git a/gst-libs/gst/base/gstaggregator.c b/gst-libs/gst/base/gstaggregator.c index 5f6de3ea72..99767d0e9d 100644 --- a/gst-libs/gst/base/gstaggregator.c +++ b/gst-libs/gst/base/gstaggregator.c @@ -122,6 +122,22 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); g_thread_self()); \ } G_STMT_END +#define PAD_STREAM_LOCK(pad) G_STMT_START { \ + GST_LOG_OBJECT (pad, "Taking lock from thread %p", \ + g_thread_self()); \ + g_mutex_lock(&pad->priv->stream_lock); \ + GST_LOG_OBJECT (pad, "Took lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + +#define PAD_STREAM_UNLOCK(pad) G_STMT_START { \ + GST_LOG_OBJECT (pad, "Releasing lock from thread %p", \ + g_thread_self()); \ + g_mutex_unlock(&pad->priv->stream_lock); \ + GST_LOG_OBJECT (pad, "Release lock from thread %p", \ + g_thread_self()); \ + } G_STMT_END + struct _GstAggregatorPadPrivate { gboolean pending_flush_start; @@ -131,6 +147,8 @@ struct _GstAggregatorPadPrivate GMutex event_lock; GCond event_cond; + + GMutex stream_lock; }; static gboolean @@ -581,6 +599,47 @@ _all_flush_stop_received (GstAggregator * self) return TRUE; } +static void +_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) +{ + GstBuffer *tmpbuf; + GstAggregatorPrivate *priv = self->priv; + GstAggregatorPadPrivate *padpriv = aggpad->priv; + + g_atomic_int_set (&aggpad->priv->flushing, TRUE); + /* Remove pad buffer and wake up the streaming thread */ + tmpbuf = gst_aggregator_pad_steal_buffer (aggpad); + gst_buffer_replace (&tmpbuf, NULL); + PAD_STREAM_LOCK (aggpad); + if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start, + TRUE, FALSE) == TRUE) { + GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now"); + g_atomic_int_set (&padpriv->pending_flush_stop, TRUE); + } + + if (g_atomic_int_get (&priv->flush_seeking)) { + /* If flush_seeking we forward the first FLUSH_START */ + if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start, + TRUE, FALSE) == TRUE) { + + GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); + _stop_srcpad_task (self, event); + priv->flow_return = GST_FLOW_OK; + + GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking"); + GST_PAD_STREAM_LOCK (self->srcpad); + GST_LOG_OBJECT (self, "GOT STREAM_LOCK"); + event = NULL; + } + } else { + gst_event_unref (event); + } + PAD_STREAM_UNLOCK (aggpad); + + tmpbuf = gst_aggregator_pad_steal_buffer (aggpad); + gst_buffer_replace (&tmpbuf, NULL); +} + /* GstAggregator vmethods default implementations */ static gboolean _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) @@ -588,41 +647,13 @@ _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event) gboolean res = TRUE; GstPad *pad = GST_PAD (aggpad); GstAggregatorPrivate *priv = self->priv; - GstAggregatorPadPrivate *padpriv = aggpad->priv; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { - GstBuffer *tmpbuf; - - g_atomic_int_set (&aggpad->priv->flushing, TRUE); - /* Remove pad buffer and wake up the streaming thread */ - tmpbuf = gst_aggregator_pad_steal_buffer (aggpad); - gst_buffer_replace (&tmpbuf, NULL); - if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start, - TRUE, FALSE) == TRUE) { - GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now"); - g_atomic_int_set (&padpriv->pending_flush_stop, TRUE); - } - - if (g_atomic_int_get (&priv->flush_seeking)) { - /* If flush_seeking we forward the first FLUSH_START */ - if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start, - TRUE, FALSE) == TRUE) { - - GST_DEBUG_OBJECT (self, "Flushing, pausing srcpad task"); - _stop_srcpad_task (self, event); - priv->flow_return = GST_FLOW_OK; - - GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking"); - GST_PAD_STREAM_LOCK (self->srcpad); - GST_LOG_OBJECT (self, "GOT STREAM_LOCK"); - event = NULL; - goto eat; - } - } - + _flush_start (self, aggpad, event); /* We forward only in one case: right after flush_seeking */ + event = NULL; goto eat; } case GST_EVENT_FLUSH_STOP: @@ -1258,6 +1289,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); + PAD_STREAM_LOCK (aggpad); if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) goto flushing; @@ -1274,7 +1306,6 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) goto flushing; - if (aggclass->clip) { aggclass->clip (self, aggpad, buffer, &actual_buf); } @@ -1284,6 +1315,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) gst_buffer_unref (aggpad->buffer); aggpad->buffer = actual_buf; PAD_UNLOCK_EVENT (aggpad); + PAD_STREAM_UNLOCK (aggpad); _add_aggregate_gsource (self); @@ -1292,6 +1324,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) return priv->flow_return; flushing: + PAD_STREAM_UNLOCK (aggpad); gst_buffer_unref (buffer); GST_DEBUG_OBJECT (aggpad, "We are flushing"); @@ -1299,6 +1332,7 @@ flushing: return GST_FLOW_FLUSHING; eos: + PAD_STREAM_UNLOCK (aggpad); gst_buffer_unref (buffer); GST_DEBUG_OBJECT (pad, "We are EOS already..."); @@ -1374,6 +1408,7 @@ gst_aggregator_pad_finalize (GObject * object) g_mutex_clear (&pad->priv->event_lock); g_cond_clear (&pad->priv->event_cond); + g_mutex_clear (&pad->priv->stream_lock); G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object); } @@ -1415,6 +1450,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) g_mutex_init (&pad->priv->event_lock); g_cond_init (&pad->priv->event_cond); + g_mutex_init (&pad->priv->stream_lock); } /**