diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 28bbdeb624..e9056e3525 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -161,18 +161,19 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); struct _GstAggregatorPadPrivate { + /* To always be used atomically */ + gboolean flushing; + + /* Following fields are protected by the PAD_LOCK */ gboolean pending_flush_start; gboolean pending_flush_stop; gboolean pending_eos; - gboolean flushing; - /* Protected by the pad lock */ GstBuffer *buffer; gboolean eos; GMutex lock; GCond event_cond; - GMutex stream_lock; }; @@ -687,7 +688,13 @@ gst_aggregator_start (GstAggregator * self) static gboolean _check_pending_flush_stop (GstAggregatorPad * pad) { - return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start); + gboolean res; + + PAD_LOCK (pad); + res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start); + PAD_UNLOCK (pad); + + return res; } static gboolean @@ -775,12 +782,16 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, /* Remove pad buffer and wake up the streaming thread */ tmpbuf = gst_aggregator_pad_steal_buffer (aggpad); gst_buffer_replace (&tmpbuf, NULL); + PAD_STREAM_LOCK (aggpad); - if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start, - TRUE, FALSE) == TRUE) { + PAD_LOCK (aggpad); + if (padpriv->pending_flush_start) { GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now"); - g_atomic_int_set (&padpriv->pending_flush_stop, TRUE); + + padpriv->pending_flush_start = FALSE; + padpriv->pending_flush_stop = TRUE; } + PAD_UNLOCK (aggpad); GST_OBJECT_LOCK (self); if (priv->flush_seeking) { @@ -1319,7 +1330,7 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) EventData *evdata = user_data; gboolean ret = TRUE; GstPad *peer = gst_pad_get_peer (pad); - GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv; + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (peer) { ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); @@ -1353,8 +1364,10 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) } if (evdata->flush) { - padpriv->pending_flush_start = FALSE; - padpriv->pending_flush_stop = FALSE; + PAD_LOCK (aggpad); + aggpad->priv->pending_flush_start = FALSE; + aggpad->priv->pending_flush_stop = FALSE; + PAD_UNLOCK (aggpad); } } else { evdata->one_actually_seeked = TRUE; @@ -1387,8 +1400,10 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) { GstAggregatorPad *pad = l->data; + PAD_LOCK (pad); pad->priv->pending_flush_start = TRUE; pad->priv->pending_flush_stop = FALSE; + PAD_UNLOCK (pad); } GST_OBJECT_UNLOCK (self); } @@ -1781,10 +1796,10 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) goto flushing; - if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE) + PAD_LOCK (aggpad); + if (aggpad->priv->pending_eos == TRUE) goto eos; - PAD_LOCK (aggpad); while (aggpad->priv->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); @@ -1827,6 +1842,7 @@ flushing: return GST_FLOW_FLUSHING; eos: + PAD_UNLOCK (aggpad); PAD_STREAM_UNLOCK (aggpad); gst_buffer_unref (buffer);