From cc3f4185162493c0b4f88daecfc3377e4e3adaeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Wed, 14 Jan 2015 14:35:15 -0500 Subject: [PATCH] aggregator: Replace event lock with pad's object lock Reduce the number of locks simplify code, what is protects is exposed, but the lock was not. Also means adding an _unlocked version of gst_aggregator_pad_steal_buffer(). https://bugzilla.gnome.org/show_bug.cgi?id=742684 --- gst-libs/gst/base/gstaggregator.c | 108 +++++++++++++++++------------- gst-libs/gst/base/gstaggregator.h | 1 + gst/audiomixer/gstaudiomixer.c | 6 +- 3 files changed, 67 insertions(+), 48 deletions(-) diff --git a/gst-libs/gst/base/gstaggregator.c b/gst-libs/gst/base/gstaggregator.c index 3b95e2b4c4..4f67b85dac 100644 --- a/gst-libs/gst/base/gstaggregator.c +++ b/gst-libs/gst/base/gstaggregator.c @@ -78,19 +78,19 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug /* GstAggregatorPad definitions */ -#define PAD_LOCK_EVENT(pad) G_STMT_START { \ - GST_TRACE_OBJECT (pad, "Taking EVENT lock from thread %p", \ +#define PAD_LOCK(pad) G_STMT_START { \ + GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \ g_thread_self()); \ - g_mutex_lock(&pad->priv->event_lock); \ - GST_TRACE_OBJECT (pad, "Took EVENT lock from thread %p", \ + GST_OBJECT_LOCK (pad); \ + GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p", \ g_thread_self()); \ } G_STMT_END -#define PAD_UNLOCK_EVENT(pad) G_STMT_START { \ - GST_TRACE_OBJECT (pad, "Releasing EVENT lock from thread %p", \ +#define PAD_UNLOCK(pad) G_STMT_START { \ + GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p", \ g_thread_self()); \ - g_mutex_unlock(&pad->priv->event_lock); \ - GST_TRACE_OBJECT (pad, "Release EVENT lock from thread %p", \ + GST_OBJECT_UNLOCK (pad); \ + GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p", \ g_thread_self()); \ } G_STMT_END @@ -99,7 +99,7 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p", \ g_thread_self()); \ g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \ - &(pad->priv->event_lock)); \ + GST_OBJECT_GET_LOCK (pad)); \ GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p", \ g_thread_self()); \ } G_STMT_END @@ -181,7 +181,6 @@ struct _GstAggregatorPadPrivate gboolean pending_eos; gboolean flushing; - GMutex event_lock; GCond event_cond; GMutex stream_lock; @@ -847,13 +846,13 @@ gst_aggregator_default_sink_event (GstAggregator * self, * called */ SRC_STREAM_LOCK (self); - PAD_LOCK_EVENT (aggpad); + PAD_LOCK (aggpad); if (!aggpad->buffer) { aggpad->eos = TRUE; } else { aggpad->priv->pending_eos = TRUE; } - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); SRC_STREAM_BROADCAST (self); SRC_STREAM_UNLOCK (self); @@ -861,10 +860,10 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_SEGMENT: { - PAD_LOCK_EVENT (aggpad); + PAD_LOCK (aggpad); gst_event_copy_segment (event, &aggpad->segment); self->priv->seqnum = gst_event_get_seqnum (event); - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); goto eat; } case GST_EVENT_STREAM_START: @@ -1747,13 +1746,13 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE) goto eos; - PAD_LOCK_EVENT (aggpad); + PAD_LOCK (aggpad); while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) goto flushing; @@ -1763,11 +1762,11 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) } SRC_STREAM_LOCK (self); - PAD_LOCK_EVENT (aggpad); + PAD_LOCK (aggpad); if (aggpad->buffer) gst_buffer_unref (aggpad->buffer); aggpad->buffer = actual_buf; - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); PAD_STREAM_UNLOCK (aggpad); if (gst_aggregator_check_pads_ready (self)) @@ -1803,10 +1802,10 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_QUERY_IS_SERIALIZED (query)) { - PAD_LOCK_EVENT (aggpad); + PAD_LOCK (aggpad); if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) { - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); goto flushing; } @@ -1815,7 +1814,7 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) goto flushing; @@ -1838,11 +1837,11 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) { - PAD_LOCK_EVENT (aggpad); + PAD_LOCK (aggpad); if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); goto flushing; } @@ -1851,7 +1850,7 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) @@ -1876,16 +1875,16 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad, GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (active == FALSE) { - PAD_LOCK_EVENT (aggpad); + PAD_LOCK (aggpad); g_atomic_int_set (&aggpad->priv->flushing, TRUE); gst_buffer_replace (&aggpad->buffer, NULL); PAD_BROADCAST_EVENT (aggpad); - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); } else { - PAD_LOCK_EVENT (aggpad); + PAD_LOCK (aggpad); g_atomic_int_set (&aggpad->priv->flushing, FALSE); PAD_BROADCAST_EVENT (aggpad); - PAD_UNLOCK_EVENT (aggpad); + PAD_UNLOCK (aggpad); } return TRUE; @@ -1916,7 +1915,6 @@ gst_aggregator_pad_finalize (GObject * object) { GstAggregatorPad *pad = (GstAggregatorPad *) object; - g_mutex_clear (&pad->priv->event_lock); g_cond_clear (&pad->priv->event_cond); g_mutex_clear (&pad->priv->stream_lock); @@ -1956,12 +1954,42 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) GstAggregatorPadPrivate); pad->buffer = NULL; - g_mutex_init (&pad->priv->event_lock); g_cond_init (&pad->priv->event_cond); g_mutex_init (&pad->priv->stream_lock); } +/** + * gst_aggregator_pad_steal_buffer_unlocked: + * @pad: the pad to get buffer from + * + * Steal the ref to the buffer currently queued in @pad. + * + * MUST be called with the pad's object lock held. + * + * Returns: (transfer full): The buffer in @pad or NULL if no buffer was + * queued. You should unref the buffer after usage. + */ +GstBuffer * +gst_aggregator_pad_steal_buffer_unlocked (GstAggregatorPad * pad) +{ + GstBuffer *buffer = NULL; + + if (pad->buffer) { + GST_TRACE_OBJECT (pad, "Consuming buffer"); + buffer = pad->buffer; + pad->buffer = NULL; + if (pad->priv->pending_eos) { + pad->priv->pending_eos = FALSE; + pad->eos = TRUE; + } + PAD_BROADCAST_EVENT (pad); + GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); + } + + return buffer; +} + /** * gst_aggregator_pad_steal_buffer: * @pad: the pad to get buffer from @@ -1976,19 +2004,9 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) { GstBuffer *buffer = NULL; - PAD_LOCK_EVENT (pad); - if (pad->buffer) { - GST_TRACE_OBJECT (pad, "Consuming buffer"); - buffer = pad->buffer; - pad->buffer = NULL; - if (pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->eos = TRUE; - } - PAD_BROADCAST_EVENT (pad); - GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); - } - PAD_UNLOCK_EVENT (pad); + PAD_LOCK (pad); + buffer = gst_aggregator_pad_steal_buffer_unlocked (pad); + PAD_UNLOCK (pad); return buffer; } @@ -2006,10 +2024,10 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) { GstBuffer *buffer = NULL; - PAD_LOCK_EVENT (pad); + PAD_LOCK (pad); if (pad->buffer) buffer = gst_buffer_ref (pad->buffer); - PAD_UNLOCK_EVENT (pad); + PAD_UNLOCK (pad); return buffer; } diff --git a/gst-libs/gst/base/gstaggregator.h b/gst-libs/gst/base/gstaggregator.h index 638030add8..33c88cc065 100644 --- a/gst-libs/gst/base/gstaggregator.h +++ b/gst-libs/gst/base/gstaggregator.h @@ -104,6 +104,7 @@ GType gst_aggregator_pad_get_type (void); ***************************/ GstBuffer * gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad); +GstBuffer * gst_aggregator_pad_steal_buffer_unlocked (GstAggregatorPad * pad); GstBuffer * gst_aggregator_pad_get_buffer (GstAggregatorPad * pad); /********************* diff --git a/gst/audiomixer/gstaudiomixer.c b/gst/audiomixer/gstaudiomixer.c index 9f75a8e46c..01521e517a 100644 --- a/gst/audiomixer/gstaudiomixer.c +++ b/gst/audiomixer/gstaudiomixer.c @@ -1199,7 +1199,7 @@ gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad, GstBuffer *buf; /* Buffer done, drop it */ gst_buffer_replace (&pad->buffer, NULL); - buf = gst_aggregator_pad_steal_buffer (aggpad); + buf = gst_aggregator_pad_steal_buffer_unlocked (aggpad); if (buf) gst_buffer_unref (buf); } @@ -1208,7 +1208,7 @@ gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad, } if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) { - GstBuffer *aggpadbuf = gst_aggregator_pad_steal_buffer (aggpad); + GstBuffer *aggpadbuf = gst_aggregator_pad_steal_buffer_unlocked (aggpad); /* skip gap buffer */ GST_LOG_OBJECT (pad, "skipping GAP buffer"); @@ -1330,7 +1330,7 @@ gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad, /* Buffer done, drop it */ gst_buffer_replace (&pad->buffer, NULL); - buf = gst_aggregator_pad_steal_buffer (aggpad); + buf = gst_aggregator_pad_steal_buffer_unlocked (aggpad); if (buf) gst_buffer_unref (buf); GST_DEBUG_OBJECT (pad, "Finished mixing buffer, waiting for next");