mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-28 04:31:06 +00:00
aggregator: Consistently lock some members
Some members sometimes used atomic access, sometimes where not locked at all. Instead consistently use a mutex to protect them, also document that. https://bugzilla.gnome.org/show_bug.cgi?id=742684
This commit is contained in:
parent
067b44e0b8
commit
cc605f4560
1 changed files with 66 additions and 47 deletions
|
@ -391,10 +391,12 @@ pad_not_ready:
|
||||||
static void
|
static void
|
||||||
gst_aggregator_reset_flow_values (GstAggregator * self)
|
gst_aggregator_reset_flow_values (GstAggregator * self)
|
||||||
{
|
{
|
||||||
|
GST_OBJECT_LOCK (self);
|
||||||
self->priv->flow_return = GST_FLOW_FLUSHING;
|
self->priv->flow_return = GST_FLOW_FLUSHING;
|
||||||
self->priv->send_stream_start = TRUE;
|
self->priv->send_stream_start = TRUE;
|
||||||
self->priv->send_segment = TRUE;
|
self->priv->send_segment = TRUE;
|
||||||
gst_segment_init (&self->segment, GST_FORMAT_TIME);
|
gst_segment_init (&self->segment, GST_FORMAT_TIME);
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void
|
static inline void
|
||||||
|
@ -426,19 +428,21 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
|
||||||
self->priv->srccaps = NULL;
|
self->priv->srccaps = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (g_atomic_int_get (&self->priv->send_segment)) {
|
GST_OBJECT_LOCK (self);
|
||||||
if (!g_atomic_int_get (&self->priv->flush_seeking)) {
|
if (self->priv->send_segment && !self->priv->flush_seeking) {
|
||||||
GstEvent *segev = gst_event_new_segment (&self->segment);
|
GstEvent *segev = gst_event_new_segment (&self->segment);
|
||||||
|
|
||||||
if (!self->priv->seqnum)
|
if (!self->priv->seqnum)
|
||||||
self->priv->seqnum = gst_event_get_seqnum (segev);
|
self->priv->seqnum = gst_event_get_seqnum (segev);
|
||||||
else
|
else
|
||||||
gst_event_set_seqnum (segev, self->priv->seqnum);
|
gst_event_set_seqnum (segev, self->priv->seqnum);
|
||||||
|
self->priv->send_segment = FALSE;
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
|
GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
|
||||||
gst_pad_push_event (self->srcpad, segev);
|
gst_pad_push_event (self->srcpad, segev);
|
||||||
g_atomic_int_set (&self->priv->send_segment, FALSE);
|
} else {
|
||||||
}
|
GST_OBJECT_UNLOCK (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (priv->tags && priv->tags_changed) {
|
if (priv->tags && priv->tags_changed) {
|
||||||
|
@ -478,14 +482,15 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
|
||||||
{
|
{
|
||||||
gst_aggregator_push_mandatory_events (self);
|
gst_aggregator_push_mandatory_events (self);
|
||||||
|
|
||||||
if (!g_atomic_int_get (&self->priv->flush_seeking) &&
|
GST_OBJECT_LOCK (self);
|
||||||
gst_pad_is_active (self->srcpad)) {
|
if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
|
||||||
GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
|
GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
return gst_pad_push (self->srcpad, buffer);
|
return gst_pad_push (self->srcpad, buffer);
|
||||||
} else {
|
} else {
|
||||||
GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
|
GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
|
||||||
g_atomic_int_get (&self->priv->flush_seeking),
|
self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
|
||||||
gst_pad_is_active (self->srcpad));
|
GST_OBJECT_UNLOCK (self);
|
||||||
gst_buffer_unref (buffer);
|
gst_buffer_unref (buffer);
|
||||||
return GST_FLOW_OK;
|
return GST_FLOW_OK;
|
||||||
}
|
}
|
||||||
|
@ -641,9 +646,10 @@ gst_aggregator_aggregate_func (GstAggregator * self)
|
||||||
gst_aggregator_push_eos (self);
|
gst_aggregator_push_eos (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (priv->flow_return == GST_FLOW_FLUSHING &&
|
GST_OBJECT_LOCK (self);
|
||||||
g_atomic_int_get (&priv->flush_seeking))
|
if (priv->flow_return == GST_FLOW_FLUSHING && priv->flush_seeking)
|
||||||
priv->flow_return = GST_FLOW_OK;
|
priv->flow_return = GST_FLOW_OK;
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
|
|
||||||
GST_LOG_OBJECT (self, "flow return is %s",
|
GST_LOG_OBJECT (self, "flow return is %s",
|
||||||
gst_flow_get_name (priv->flow_return));
|
gst_flow_get_name (priv->flow_return));
|
||||||
|
@ -722,33 +728,35 @@ gst_aggregator_flush (GstAggregator * self)
|
||||||
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
|
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (self, "Flushing everything");
|
GST_DEBUG_OBJECT (self, "Flushing everything");
|
||||||
g_atomic_int_set (&priv->send_segment, TRUE);
|
GST_OBJECT_LOCK (self);
|
||||||
g_atomic_int_set (&priv->flush_seeking, FALSE);
|
priv->send_segment = TRUE;
|
||||||
|
priv->flush_seeking = FALSE;
|
||||||
g_atomic_int_set (&priv->tags_changed, FALSE);
|
g_atomic_int_set (&priv->tags_changed, FALSE);
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
if (klass->flush)
|
if (klass->flush)
|
||||||
ret = klass->flush (self);
|
ret = klass->flush (self);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Called with GstAggregator's object lock held */
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
gst_aggregator_all_flush_stop_received (GstAggregator * self)
|
gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
|
||||||
{
|
{
|
||||||
GList *tmp;
|
GList *tmp;
|
||||||
GstAggregatorPad *tmppad;
|
GstAggregatorPad *tmppad;
|
||||||
|
|
||||||
GST_OBJECT_LOCK (self);
|
|
||||||
for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
|
for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
|
||||||
tmppad = (GstAggregatorPad *) tmp->data;
|
tmppad = (GstAggregatorPad *) tmp->data;
|
||||||
|
|
||||||
if (_check_pending_flush_stop (tmppad) == FALSE) {
|
if (_check_pending_flush_stop (tmppad) == FALSE) {
|
||||||
GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
|
GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
|
||||||
tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
|
tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
|
||||||
GST_OBJECT_UNLOCK (self);
|
|
||||||
return FALSE;
|
return FALSE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
GST_OBJECT_UNLOCK (self);
|
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
@ -772,10 +780,12 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
|
||||||
g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
|
g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (g_atomic_int_get (&priv->flush_seeking)) {
|
GST_OBJECT_LOCK (self);
|
||||||
|
if (priv->flush_seeking) {
|
||||||
/* If flush_seeking we forward the first FLUSH_START */
|
/* If flush_seeking we forward the first FLUSH_START */
|
||||||
if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
|
if (priv->pending_flush_start) {
|
||||||
TRUE, FALSE) == TRUE) {
|
priv->pending_flush_start = FALSE;
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
|
|
||||||
GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
|
GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
|
||||||
gst_aggregator_stop_srcpad_task (self, event);
|
gst_aggregator_stop_srcpad_task (self, event);
|
||||||
|
@ -786,9 +796,11 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
|
||||||
GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
|
GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
|
||||||
event = NULL;
|
event = NULL;
|
||||||
} else {
|
} else {
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
gst_event_unref (event);
|
gst_event_unref (event);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
gst_event_unref (event);
|
gst_event_unref (event);
|
||||||
}
|
}
|
||||||
PAD_STREAM_UNLOCK (aggpad);
|
PAD_STREAM_UNLOCK (aggpad);
|
||||||
|
@ -819,11 +831,11 @@ gst_aggregator_default_sink_event (GstAggregator * self,
|
||||||
GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
|
GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
|
||||||
|
|
||||||
gst_aggregator_pad_flush (aggpad, self);
|
gst_aggregator_pad_flush (aggpad, self);
|
||||||
if (g_atomic_int_get (&priv->flush_seeking)) {
|
GST_OBJECT_LOCK (self);
|
||||||
|
if (priv->flush_seeking) {
|
||||||
g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
|
g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
|
||||||
|
if (gst_aggregator_all_flush_stop_received_locked (self)) {
|
||||||
if (g_atomic_int_get (&priv->flush_seeking)) {
|
GST_OBJECT_UNLOCK (self);
|
||||||
if (gst_aggregator_all_flush_stop_received (self)) {
|
|
||||||
/* That means we received FLUSH_STOP/FLUSH_STOP on
|
/* That means we received FLUSH_STOP/FLUSH_STOP on
|
||||||
* all sinkpads -- Seeking is Done... sending FLUSH_STOP */
|
* all sinkpads -- Seeking is Done... sending FLUSH_STOP */
|
||||||
gst_aggregator_flush (self);
|
gst_aggregator_flush (self);
|
||||||
|
@ -837,8 +849,11 @@ gst_aggregator_default_sink_event (GstAggregator * self,
|
||||||
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
|
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
|
||||||
GST_PAD_STREAM_UNLOCK (self->srcpad);
|
GST_PAD_STREAM_UNLOCK (self->srcpad);
|
||||||
gst_aggregator_start_srcpad_task (self);
|
gst_aggregator_start_srcpad_task (self);
|
||||||
|
} else {
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We never forward the event */
|
/* We never forward the event */
|
||||||
|
@ -1391,21 +1406,25 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
|
||||||
|
|
||||||
flush = flags & GST_SEEK_FLAG_FLUSH;
|
flush = flags & GST_SEEK_FLAG_FLUSH;
|
||||||
|
|
||||||
|
GST_OBJECT_LOCK (self);
|
||||||
if (flush) {
|
if (flush) {
|
||||||
g_atomic_int_set (&priv->pending_flush_start, TRUE);
|
priv->pending_flush_start = TRUE;
|
||||||
g_atomic_int_set (&priv->flush_seeking, TRUE);
|
priv->flush_seeking = TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
|
gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
|
||||||
stop_type, stop, NULL);
|
stop_type, stop, NULL);
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
|
|
||||||
/* forward the seek upstream */
|
/* forward the seek upstream */
|
||||||
evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
|
evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
|
||||||
event = NULL;
|
event = NULL;
|
||||||
|
|
||||||
if (!evdata.result || !evdata.one_actually_seeked) {
|
if (!evdata.result || !evdata.one_actually_seeked) {
|
||||||
g_atomic_int_set (&priv->flush_seeking, FALSE);
|
GST_OBJECT_LOCK (self);
|
||||||
g_atomic_int_set (&priv->pending_flush_start, FALSE);
|
priv->flush_seeking = FALSE;
|
||||||
|
priv->pending_flush_start = FALSE;
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
|
GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
|
||||||
|
|
Loading…
Reference in a new issue