aggregator: Cleanup locking around AggregatorPad flush related fields

And document the locking

https://bugzilla.gnome.org/show_bug.cgi?id=742684
This commit is contained in:
Thibault Saunier 2015-01-26 17:06:29 +01:00 committed by Tim-Philipp Müller
parent 14f35e8a78
commit 24abac9fce

View file

@ -161,18 +161,19 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
struct _GstAggregatorPadPrivate struct _GstAggregatorPadPrivate
{ {
/* To always be used atomically */
gboolean flushing;
/* Following fields are protected by the PAD_LOCK */
gboolean pending_flush_start; gboolean pending_flush_start;
gboolean pending_flush_stop; gboolean pending_flush_stop;
gboolean pending_eos; gboolean pending_eos;
gboolean flushing;
/* Protected by the pad lock */
GstBuffer *buffer; GstBuffer *buffer;
gboolean eos; gboolean eos;
GMutex lock; GMutex lock;
GCond event_cond; GCond event_cond;
GMutex stream_lock; GMutex stream_lock;
}; };
@ -687,7 +688,13 @@ gst_aggregator_start (GstAggregator * self)
static gboolean static gboolean
_check_pending_flush_stop (GstAggregatorPad * pad) _check_pending_flush_stop (GstAggregatorPad * pad)
{ {
return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start); gboolean res;
PAD_LOCK (pad);
res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
PAD_UNLOCK (pad);
return res;
} }
static gboolean static gboolean
@ -775,12 +782,16 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
/* Remove pad buffer and wake up the streaming thread */ /* Remove pad buffer and wake up the streaming thread */
tmpbuf = gst_aggregator_pad_steal_buffer (aggpad); tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
gst_buffer_replace (&tmpbuf, NULL); gst_buffer_replace (&tmpbuf, NULL);
PAD_STREAM_LOCK (aggpad); PAD_STREAM_LOCK (aggpad);
if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start, PAD_LOCK (aggpad);
TRUE, FALSE) == TRUE) { if (padpriv->pending_flush_start) {
GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now"); GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
padpriv->pending_flush_start = FALSE;
padpriv->pending_flush_stop = TRUE;
} }
PAD_UNLOCK (aggpad);
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (priv->flush_seeking) { if (priv->flush_seeking) {
@ -1319,7 +1330,7 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
EventData *evdata = user_data; EventData *evdata = user_data;
gboolean ret = TRUE; gboolean ret = TRUE;
GstPad *peer = gst_pad_get_peer (pad); GstPad *peer = gst_pad_get_peer (pad);
GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv; GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
if (peer) { if (peer) {
ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
@ -1353,8 +1364,10 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
} }
if (evdata->flush) { if (evdata->flush) {
padpriv->pending_flush_start = FALSE; PAD_LOCK (aggpad);
padpriv->pending_flush_stop = FALSE; aggpad->priv->pending_flush_start = FALSE;
aggpad->priv->pending_flush_stop = FALSE;
PAD_UNLOCK (aggpad);
} }
} else { } else {
evdata->one_actually_seeked = TRUE; evdata->one_actually_seeked = TRUE;
@ -1387,8 +1400,10 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) { for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
GstAggregatorPad *pad = l->data; GstAggregatorPad *pad = l->data;
PAD_LOCK (pad);
pad->priv->pending_flush_start = TRUE; pad->priv->pending_flush_start = TRUE;
pad->priv->pending_flush_stop = FALSE; pad->priv->pending_flush_stop = FALSE;
PAD_UNLOCK (pad);
} }
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
} }
@ -1781,10 +1796,10 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing; goto flushing;
if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE) PAD_LOCK (aggpad);
if (aggpad->priv->pending_eos == TRUE)
goto eos; goto eos;
PAD_LOCK (aggpad);
while (aggpad->priv->buffer while (aggpad->priv->buffer
&& g_atomic_int_get (&aggpad->priv->flushing) == FALSE) { && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
@ -1827,6 +1842,7 @@ flushing:
return GST_FLOW_FLUSHING; return GST_FLOW_FLUSHING;
eos: eos:
PAD_UNLOCK (aggpad);
PAD_STREAM_UNLOCK (aggpad); PAD_STREAM_UNLOCK (aggpad);
gst_buffer_unref (buffer); gst_buffer_unref (buffer);