From 90f3cb4af0409dc33a5c1526bffa1ae6237b4c8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Wed, 21 Jan 2015 18:45:36 -0500 Subject: [PATCH] aggregator: Consistently lock some members Some members sometimes used atomic access, sometimes where not locked at all. Instead consistently use a mutex to protect them, also document that. https://bugzilla.gnome.org/show_bug.cgi?id=742684 --- libs/gst/base/gstaggregator.c | 113 ++++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 47 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index adc70dcb1f..0b9d8a25be 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -391,10 +391,12 @@ pad_not_ready: static void gst_aggregator_reset_flow_values (GstAggregator * self) { + GST_OBJECT_LOCK (self); self->priv->flow_return = GST_FLOW_FLUSHING; self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; gst_segment_init (&self->segment, GST_FORMAT_TIME); + GST_OBJECT_UNLOCK (self); } static inline void @@ -426,19 +428,21 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) self->priv->srccaps = NULL; } - if (g_atomic_int_get (&self->priv->send_segment)) { - if (!g_atomic_int_get (&self->priv->flush_seeking)) { - GstEvent *segev = gst_event_new_segment (&self->segment); + GST_OBJECT_LOCK (self); + if (self->priv->send_segment && !self->priv->flush_seeking) { + GstEvent *segev = gst_event_new_segment (&self->segment); - if (!self->priv->seqnum) - self->priv->seqnum = gst_event_get_seqnum (segev); - else - gst_event_set_seqnum (segev, self->priv->seqnum); + if (!self->priv->seqnum) + self->priv->seqnum = gst_event_get_seqnum (segev); + else + gst_event_set_seqnum (segev, self->priv->seqnum); + self->priv->send_segment = FALSE; + GST_OBJECT_UNLOCK (self); - GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev); - gst_pad_push_event (self->srcpad, segev); - g_atomic_int_set (&self->priv->send_segment, FALSE); - } + GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev); + gst_pad_push_event (self->srcpad, segev); + } else { + GST_OBJECT_UNLOCK (self); } if (priv->tags && priv->tags_changed) { @@ -478,14 +482,15 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) { gst_aggregator_push_mandatory_events (self); - if (!g_atomic_int_get (&self->priv->flush_seeking) && - gst_pad_is_active (self->srcpad)) { + GST_OBJECT_LOCK (self); + if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) { GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer); + GST_OBJECT_UNLOCK (self); return gst_pad_push (self->srcpad, buffer); } else { GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)", - g_atomic_int_get (&self->priv->flush_seeking), - gst_pad_is_active (self->srcpad)); + self->priv->flush_seeking, gst_pad_is_active (self->srcpad)); + GST_OBJECT_UNLOCK (self); gst_buffer_unref (buffer); return GST_FLOW_OK; } @@ -641,9 +646,10 @@ gst_aggregator_aggregate_func (GstAggregator * self) gst_aggregator_push_eos (self); } - if (priv->flow_return == GST_FLOW_FLUSHING && - g_atomic_int_get (&priv->flush_seeking)) + GST_OBJECT_LOCK (self); + if (priv->flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) priv->flow_return = GST_FLOW_OK; + GST_OBJECT_UNLOCK (self); GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (priv->flow_return)); @@ -722,33 +728,35 @@ gst_aggregator_flush (GstAggregator * self) GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); GST_DEBUG_OBJECT (self, "Flushing everything"); - g_atomic_int_set (&priv->send_segment, TRUE); - g_atomic_int_set (&priv->flush_seeking, FALSE); + GST_OBJECT_LOCK (self); + priv->send_segment = TRUE; + priv->flush_seeking = FALSE; g_atomic_int_set (&priv->tags_changed, FALSE); + GST_OBJECT_UNLOCK (self); if (klass->flush) ret = klass->flush (self); return ret; } + +/* Called with GstAggregator's object lock held */ + static gboolean -gst_aggregator_all_flush_stop_received (GstAggregator * self) +gst_aggregator_all_flush_stop_received_locked (GstAggregator * self) { GList *tmp; GstAggregatorPad *tmppad; - GST_OBJECT_LOCK (self); for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) { tmppad = (GstAggregatorPad *) tmp->data; if (_check_pending_flush_stop (tmppad) == FALSE) { GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i", tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop); - GST_OBJECT_UNLOCK (self); return FALSE; } } - GST_OBJECT_UNLOCK (self); return TRUE; } @@ -772,10 +780,12 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, g_atomic_int_set (&padpriv->pending_flush_stop, TRUE); } - if (g_atomic_int_get (&priv->flush_seeking)) { + GST_OBJECT_LOCK (self); + if (priv->flush_seeking) { /* If flush_seeking we forward the first FLUSH_START */ - if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start, - TRUE, FALSE) == TRUE) { + if (priv->pending_flush_start) { + priv->pending_flush_start = FALSE; + GST_OBJECT_UNLOCK (self); GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); gst_aggregator_stop_srcpad_task (self, event); @@ -786,9 +796,11 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GST_LOG_OBJECT (self, "GOT STREAM_LOCK"); event = NULL; } else { + GST_OBJECT_UNLOCK (self); gst_event_unref (event); } } else { + GST_OBJECT_UNLOCK (self); gst_event_unref (event); } PAD_STREAM_UNLOCK (aggpad); @@ -819,26 +831,29 @@ gst_aggregator_default_sink_event (GstAggregator * self, GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP"); gst_aggregator_pad_flush (aggpad, self); - if (g_atomic_int_get (&priv->flush_seeking)) { + GST_OBJECT_LOCK (self); + if (priv->flush_seeking) { g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE); + if (gst_aggregator_all_flush_stop_received_locked (self)) { + GST_OBJECT_UNLOCK (self); + /* That means we received FLUSH_STOP/FLUSH_STOP on + * all sinkpads -- Seeking is Done... sending FLUSH_STOP */ + gst_aggregator_flush (self); + gst_pad_push_event (self->srcpad, event); + event = NULL; + SRC_STREAM_LOCK (self); + priv->send_eos = TRUE; + SRC_STREAM_BROADCAST (self); + SRC_STREAM_UNLOCK (self); - if (g_atomic_int_get (&priv->flush_seeking)) { - if (gst_aggregator_all_flush_stop_received (self)) { - /* That means we received FLUSH_STOP/FLUSH_STOP on - * all sinkpads -- Seeking is Done... sending FLUSH_STOP */ - gst_aggregator_flush (self); - gst_pad_push_event (self->srcpad, event); - event = NULL; - SRC_STREAM_LOCK (self); - priv->send_eos = TRUE; - SRC_STREAM_BROADCAST (self); - SRC_STREAM_UNLOCK (self); - - GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); - GST_PAD_STREAM_UNLOCK (self->srcpad); - gst_aggregator_start_srcpad_task (self); - } + GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK"); + GST_PAD_STREAM_UNLOCK (self->srcpad); + gst_aggregator_start_srcpad_task (self); + } else { + GST_OBJECT_UNLOCK (self); } + } else { + GST_OBJECT_UNLOCK (self); } /* We never forward the event */ @@ -1391,21 +1406,25 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) flush = flags & GST_SEEK_FLAG_FLUSH; + GST_OBJECT_LOCK (self); if (flush) { - g_atomic_int_set (&priv->pending_flush_start, TRUE); - g_atomic_int_set (&priv->flush_seeking, TRUE); + priv->pending_flush_start = TRUE; + priv->flush_seeking = TRUE; } gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, stop_type, stop, NULL); + GST_OBJECT_UNLOCK (self); /* forward the seek upstream */ evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush); event = NULL; if (!evdata.result || !evdata.one_actually_seeked) { - g_atomic_int_set (&priv->flush_seeking, FALSE); - g_atomic_int_set (&priv->pending_flush_start, FALSE); + GST_OBJECT_LOCK (self); + priv->flush_seeking = FALSE; + priv->pending_flush_start = FALSE; + GST_OBJECT_UNLOCK (self); } GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);