aggregator: Consistently lock some members

Some members sometimes used atomic access, sometimes where not locked at
all. Instead consistently use a mutex to protect them, also document
that.

https://bugzilla.gnome.org/show_bug.cgi?id=742684
This commit is contained in:
Olivier Crête 2015-01-21 18:45:36 -05:00 committed by Tim-Philipp Müller
parent 2b7d4e2404
commit 90f3cb4af0

View file

@ -391,10 +391,12 @@ pad_not_ready:
static void static void
gst_aggregator_reset_flow_values (GstAggregator * self) gst_aggregator_reset_flow_values (GstAggregator * self)
{ {
GST_OBJECT_LOCK (self);
self->priv->flow_return = GST_FLOW_FLUSHING; self->priv->flow_return = GST_FLOW_FLUSHING;
self->priv->send_stream_start = TRUE; self->priv->send_stream_start = TRUE;
self->priv->send_segment = TRUE; self->priv->send_segment = TRUE;
gst_segment_init (&self->segment, GST_FORMAT_TIME); gst_segment_init (&self->segment, GST_FORMAT_TIME);
GST_OBJECT_UNLOCK (self);
} }
static inline void static inline void
@ -426,19 +428,21 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
self->priv->srccaps = NULL; self->priv->srccaps = NULL;
} }
if (g_atomic_int_get (&self->priv->send_segment)) { GST_OBJECT_LOCK (self);
if (!g_atomic_int_get (&self->priv->flush_seeking)) { if (self->priv->send_segment && !self->priv->flush_seeking) {
GstEvent *segev = gst_event_new_segment (&self->segment); GstEvent *segev = gst_event_new_segment (&self->segment);
if (!self->priv->seqnum) if (!self->priv->seqnum)
self->priv->seqnum = gst_event_get_seqnum (segev); self->priv->seqnum = gst_event_get_seqnum (segev);
else else
gst_event_set_seqnum (segev, self->priv->seqnum); gst_event_set_seqnum (segev, self->priv->seqnum);
self->priv->send_segment = FALSE;
GST_OBJECT_UNLOCK (self);
GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev); GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
gst_pad_push_event (self->srcpad, segev); gst_pad_push_event (self->srcpad, segev);
g_atomic_int_set (&self->priv->send_segment, FALSE); } else {
} GST_OBJECT_UNLOCK (self);
} }
if (priv->tags && priv->tags_changed) { if (priv->tags && priv->tags_changed) {
@ -478,14 +482,15 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
{ {
gst_aggregator_push_mandatory_events (self); gst_aggregator_push_mandatory_events (self);
if (!g_atomic_int_get (&self->priv->flush_seeking) && GST_OBJECT_LOCK (self);
gst_pad_is_active (self->srcpad)) { if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer); GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
GST_OBJECT_UNLOCK (self);
return gst_pad_push (self->srcpad, buffer); return gst_pad_push (self->srcpad, buffer);
} else { } else {
GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)", GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
g_atomic_int_get (&self->priv->flush_seeking), self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
gst_pad_is_active (self->srcpad)); GST_OBJECT_UNLOCK (self);
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
return GST_FLOW_OK; return GST_FLOW_OK;
} }
@ -641,9 +646,10 @@ gst_aggregator_aggregate_func (GstAggregator * self)
gst_aggregator_push_eos (self); gst_aggregator_push_eos (self);
} }
if (priv->flow_return == GST_FLOW_FLUSHING && GST_OBJECT_LOCK (self);
g_atomic_int_get (&priv->flush_seeking)) if (priv->flow_return == GST_FLOW_FLUSHING && priv->flush_seeking)
priv->flow_return = GST_FLOW_OK; priv->flow_return = GST_FLOW_OK;
GST_OBJECT_UNLOCK (self);
GST_LOG_OBJECT (self, "flow return is %s", GST_LOG_OBJECT (self, "flow return is %s",
gst_flow_get_name (priv->flow_return)); gst_flow_get_name (priv->flow_return));
@ -722,33 +728,35 @@ gst_aggregator_flush (GstAggregator * self)
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
GST_DEBUG_OBJECT (self, "Flushing everything"); GST_DEBUG_OBJECT (self, "Flushing everything");
g_atomic_int_set (&priv->send_segment, TRUE); GST_OBJECT_LOCK (self);
g_atomic_int_set (&priv->flush_seeking, FALSE); priv->send_segment = TRUE;
priv->flush_seeking = FALSE;
g_atomic_int_set (&priv->tags_changed, FALSE); g_atomic_int_set (&priv->tags_changed, FALSE);
GST_OBJECT_UNLOCK (self);
if (klass->flush) if (klass->flush)
ret = klass->flush (self); ret = klass->flush (self);
return ret; return ret;
} }
/* Called with GstAggregator's object lock held */
static gboolean static gboolean
gst_aggregator_all_flush_stop_received (GstAggregator * self) gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
{ {
GList *tmp; GList *tmp;
GstAggregatorPad *tmppad; GstAggregatorPad *tmppad;
GST_OBJECT_LOCK (self);
for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) { for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
tmppad = (GstAggregatorPad *) tmp->data; tmppad = (GstAggregatorPad *) tmp->data;
if (_check_pending_flush_stop (tmppad) == FALSE) { if (_check_pending_flush_stop (tmppad) == FALSE) {
GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i", GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop); tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
GST_OBJECT_UNLOCK (self);
return FALSE; return FALSE;
} }
} }
GST_OBJECT_UNLOCK (self);
return TRUE; return TRUE;
} }
@ -772,10 +780,12 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
g_atomic_int_set (&padpriv->pending_flush_stop, TRUE); g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
} }
if (g_atomic_int_get (&priv->flush_seeking)) { GST_OBJECT_LOCK (self);
if (priv->flush_seeking) {
/* If flush_seeking we forward the first FLUSH_START */ /* If flush_seeking we forward the first FLUSH_START */
if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start, if (priv->pending_flush_start) {
TRUE, FALSE) == TRUE) { priv->pending_flush_start = FALSE;
GST_OBJECT_UNLOCK (self);
GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
gst_aggregator_stop_srcpad_task (self, event); gst_aggregator_stop_srcpad_task (self, event);
@ -786,9 +796,11 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
GST_LOG_OBJECT (self, "GOT STREAM_LOCK"); GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
event = NULL; event = NULL;
} else { } else {
GST_OBJECT_UNLOCK (self);
gst_event_unref (event); gst_event_unref (event);
} }
} else { } else {
GST_OBJECT_UNLOCK (self);
gst_event_unref (event); gst_event_unref (event);
} }
PAD_STREAM_UNLOCK (aggpad); PAD_STREAM_UNLOCK (aggpad);
@ -819,26 +831,29 @@ gst_aggregator_default_sink_event (GstAggregator * self,
GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP"); GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
gst_aggregator_pad_flush (aggpad, self); gst_aggregator_pad_flush (aggpad, self);
if (g_atomic_int_get (&priv->flush_seeking)) { GST_OBJECT_LOCK (self);
if (priv->flush_seeking) {
g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE); g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
if (gst_aggregator_all_flush_stop_received_locked (self)) {
GST_OBJECT_UNLOCK (self);
/* That means we received FLUSH_STOP/FLUSH_STOP on
* all sinkpads -- Seeking is Done... sending FLUSH_STOP */
gst_aggregator_flush (self);
gst_pad_push_event (self->srcpad, event);
event = NULL;
SRC_STREAM_LOCK (self);
priv->send_eos = TRUE;
SRC_STREAM_BROADCAST (self);
SRC_STREAM_UNLOCK (self);
if (g_atomic_int_get (&priv->flush_seeking)) { GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
if (gst_aggregator_all_flush_stop_received (self)) { GST_PAD_STREAM_UNLOCK (self->srcpad);
/* That means we received FLUSH_STOP/FLUSH_STOP on gst_aggregator_start_srcpad_task (self);
* all sinkpads -- Seeking is Done... sending FLUSH_STOP */ } else {
gst_aggregator_flush (self); GST_OBJECT_UNLOCK (self);
gst_pad_push_event (self->srcpad, event);
event = NULL;
SRC_STREAM_LOCK (self);
priv->send_eos = TRUE;
SRC_STREAM_BROADCAST (self);
SRC_STREAM_UNLOCK (self);
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
GST_PAD_STREAM_UNLOCK (self->srcpad);
gst_aggregator_start_srcpad_task (self);
}
} }
} else {
GST_OBJECT_UNLOCK (self);
} }
/* We never forward the event */ /* We never forward the event */
@ -1391,21 +1406,25 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
flush = flags & GST_SEEK_FLAG_FLUSH; flush = flags & GST_SEEK_FLAG_FLUSH;
GST_OBJECT_LOCK (self);
if (flush) { if (flush) {
g_atomic_int_set (&priv->pending_flush_start, TRUE); priv->pending_flush_start = TRUE;
g_atomic_int_set (&priv->flush_seeking, TRUE); priv->flush_seeking = TRUE;
} }
gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
stop_type, stop, NULL); stop_type, stop, NULL);
GST_OBJECT_UNLOCK (self);
/* forward the seek upstream */ /* forward the seek upstream */
evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush); evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
event = NULL; event = NULL;
if (!evdata.result || !evdata.one_actually_seeked) { if (!evdata.result || !evdata.one_actually_seeked) {
g_atomic_int_set (&priv->flush_seeking, FALSE); GST_OBJECT_LOCK (self);
g_atomic_int_set (&priv->pending_flush_start, FALSE); priv->flush_seeking = FALSE;
priv->pending_flush_start = FALSE;
GST_OBJECT_UNLOCK (self);
} }
GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result); GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);