mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-26 00:58:12 +00:00
aggregator: Replace event lock with pad's object lock
Reduce the number of locks simplify code, what is protects is exposed, but the lock was not. Also means adding an _unlocked version of gst_aggregator_pad_steal_buffer(). https://bugzilla.gnome.org/show_bug.cgi?id=742684
This commit is contained in:
parent
0a2dc1185c
commit
cc3f418516
3 changed files with 67 additions and 48 deletions
|
@ -78,19 +78,19 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
|
||||||
#define GST_CAT_DEFAULT aggregator_debug
|
#define GST_CAT_DEFAULT aggregator_debug
|
||||||
|
|
||||||
/* GstAggregatorPad definitions */
|
/* GstAggregatorPad definitions */
|
||||||
#define PAD_LOCK_EVENT(pad) G_STMT_START { \
|
#define PAD_LOCK(pad) G_STMT_START { \
|
||||||
GST_TRACE_OBJECT (pad, "Taking EVENT lock from thread %p", \
|
GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \
|
||||||
g_thread_self()); \
|
g_thread_self()); \
|
||||||
g_mutex_lock(&pad->priv->event_lock); \
|
GST_OBJECT_LOCK (pad); \
|
||||||
GST_TRACE_OBJECT (pad, "Took EVENT lock from thread %p", \
|
GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p", \
|
||||||
g_thread_self()); \
|
g_thread_self()); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
#define PAD_UNLOCK_EVENT(pad) G_STMT_START { \
|
#define PAD_UNLOCK(pad) G_STMT_START { \
|
||||||
GST_TRACE_OBJECT (pad, "Releasing EVENT lock from thread %p", \
|
GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p", \
|
||||||
g_thread_self()); \
|
g_thread_self()); \
|
||||||
g_mutex_unlock(&pad->priv->event_lock); \
|
GST_OBJECT_UNLOCK (pad); \
|
||||||
GST_TRACE_OBJECT (pad, "Release EVENT lock from thread %p", \
|
GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p", \
|
||||||
g_thread_self()); \
|
g_thread_self()); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
|
||||||
GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p", \
|
GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p", \
|
||||||
g_thread_self()); \
|
g_thread_self()); \
|
||||||
g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \
|
g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \
|
||||||
&(pad->priv->event_lock)); \
|
GST_OBJECT_GET_LOCK (pad)); \
|
||||||
GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p", \
|
GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p", \
|
||||||
g_thread_self()); \
|
g_thread_self()); \
|
||||||
} G_STMT_END
|
} G_STMT_END
|
||||||
|
@ -181,7 +181,6 @@ struct _GstAggregatorPadPrivate
|
||||||
gboolean pending_eos;
|
gboolean pending_eos;
|
||||||
gboolean flushing;
|
gboolean flushing;
|
||||||
|
|
||||||
GMutex event_lock;
|
|
||||||
GCond event_cond;
|
GCond event_cond;
|
||||||
|
|
||||||
GMutex stream_lock;
|
GMutex stream_lock;
|
||||||
|
@ -847,13 +846,13 @@ gst_aggregator_default_sink_event (GstAggregator * self,
|
||||||
* called
|
* called
|
||||||
*/
|
*/
|
||||||
SRC_STREAM_LOCK (self);
|
SRC_STREAM_LOCK (self);
|
||||||
PAD_LOCK_EVENT (aggpad);
|
PAD_LOCK (aggpad);
|
||||||
if (!aggpad->buffer) {
|
if (!aggpad->buffer) {
|
||||||
aggpad->eos = TRUE;
|
aggpad->eos = TRUE;
|
||||||
} else {
|
} else {
|
||||||
aggpad->priv->pending_eos = TRUE;
|
aggpad->priv->pending_eos = TRUE;
|
||||||
}
|
}
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
|
|
||||||
SRC_STREAM_BROADCAST (self);
|
SRC_STREAM_BROADCAST (self);
|
||||||
SRC_STREAM_UNLOCK (self);
|
SRC_STREAM_UNLOCK (self);
|
||||||
|
@ -861,10 +860,10 @@ gst_aggregator_default_sink_event (GstAggregator * self,
|
||||||
}
|
}
|
||||||
case GST_EVENT_SEGMENT:
|
case GST_EVENT_SEGMENT:
|
||||||
{
|
{
|
||||||
PAD_LOCK_EVENT (aggpad);
|
PAD_LOCK (aggpad);
|
||||||
gst_event_copy_segment (event, &aggpad->segment);
|
gst_event_copy_segment (event, &aggpad->segment);
|
||||||
self->priv->seqnum = gst_event_get_seqnum (event);
|
self->priv->seqnum = gst_event_get_seqnum (event);
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
goto eat;
|
goto eat;
|
||||||
}
|
}
|
||||||
case GST_EVENT_STREAM_START:
|
case GST_EVENT_STREAM_START:
|
||||||
|
@ -1747,13 +1746,13 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
|
||||||
if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
|
if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
|
||||||
goto eos;
|
goto eos;
|
||||||
|
|
||||||
PAD_LOCK_EVENT (aggpad);
|
PAD_LOCK (aggpad);
|
||||||
|
|
||||||
while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
|
while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
|
||||||
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
|
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
|
||||||
PAD_WAIT_EVENT (aggpad);
|
PAD_WAIT_EVENT (aggpad);
|
||||||
}
|
}
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
|
|
||||||
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
|
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
|
||||||
goto flushing;
|
goto flushing;
|
||||||
|
@ -1763,11 +1762,11 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
SRC_STREAM_LOCK (self);
|
SRC_STREAM_LOCK (self);
|
||||||
PAD_LOCK_EVENT (aggpad);
|
PAD_LOCK (aggpad);
|
||||||
if (aggpad->buffer)
|
if (aggpad->buffer)
|
||||||
gst_buffer_unref (aggpad->buffer);
|
gst_buffer_unref (aggpad->buffer);
|
||||||
aggpad->buffer = actual_buf;
|
aggpad->buffer = actual_buf;
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
PAD_STREAM_UNLOCK (aggpad);
|
PAD_STREAM_UNLOCK (aggpad);
|
||||||
|
|
||||||
if (gst_aggregator_check_pads_ready (self))
|
if (gst_aggregator_check_pads_ready (self))
|
||||||
|
@ -1803,10 +1802,10 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
|
||||||
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
|
GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
|
||||||
|
|
||||||
if (GST_QUERY_IS_SERIALIZED (query)) {
|
if (GST_QUERY_IS_SERIALIZED (query)) {
|
||||||
PAD_LOCK_EVENT (aggpad);
|
PAD_LOCK (aggpad);
|
||||||
|
|
||||||
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
|
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
goto flushing;
|
goto flushing;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1815,7 +1814,7 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
|
||||||
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
|
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
|
||||||
PAD_WAIT_EVENT (aggpad);
|
PAD_WAIT_EVENT (aggpad);
|
||||||
}
|
}
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
|
|
||||||
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
|
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
|
||||||
goto flushing;
|
goto flushing;
|
||||||
|
@ -1838,11 +1837,11 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
|
||||||
|
|
||||||
if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
|
if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
|
||||||
&& GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
|
&& GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
|
||||||
PAD_LOCK_EVENT (aggpad);
|
PAD_LOCK (aggpad);
|
||||||
|
|
||||||
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
|
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
|
||||||
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
|
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
goto flushing;
|
goto flushing;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1851,7 +1850,7 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
|
||||||
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
|
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
|
||||||
PAD_WAIT_EVENT (aggpad);
|
PAD_WAIT_EVENT (aggpad);
|
||||||
}
|
}
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
|
|
||||||
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
|
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
|
||||||
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
|
&& GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
|
||||||
|
@ -1876,16 +1875,16 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
|
||||||
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
|
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
|
||||||
|
|
||||||
if (active == FALSE) {
|
if (active == FALSE) {
|
||||||
PAD_LOCK_EVENT (aggpad);
|
PAD_LOCK (aggpad);
|
||||||
g_atomic_int_set (&aggpad->priv->flushing, TRUE);
|
g_atomic_int_set (&aggpad->priv->flushing, TRUE);
|
||||||
gst_buffer_replace (&aggpad->buffer, NULL);
|
gst_buffer_replace (&aggpad->buffer, NULL);
|
||||||
PAD_BROADCAST_EVENT (aggpad);
|
PAD_BROADCAST_EVENT (aggpad);
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
} else {
|
} else {
|
||||||
PAD_LOCK_EVENT (aggpad);
|
PAD_LOCK (aggpad);
|
||||||
g_atomic_int_set (&aggpad->priv->flushing, FALSE);
|
g_atomic_int_set (&aggpad->priv->flushing, FALSE);
|
||||||
PAD_BROADCAST_EVENT (aggpad);
|
PAD_BROADCAST_EVENT (aggpad);
|
||||||
PAD_UNLOCK_EVENT (aggpad);
|
PAD_UNLOCK (aggpad);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
|
@ -1916,7 +1915,6 @@ gst_aggregator_pad_finalize (GObject * object)
|
||||||
{
|
{
|
||||||
GstAggregatorPad *pad = (GstAggregatorPad *) object;
|
GstAggregatorPad *pad = (GstAggregatorPad *) object;
|
||||||
|
|
||||||
g_mutex_clear (&pad->priv->event_lock);
|
|
||||||
g_cond_clear (&pad->priv->event_cond);
|
g_cond_clear (&pad->priv->event_cond);
|
||||||
g_mutex_clear (&pad->priv->stream_lock);
|
g_mutex_clear (&pad->priv->stream_lock);
|
||||||
|
|
||||||
|
@ -1956,12 +1954,42 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
|
||||||
GstAggregatorPadPrivate);
|
GstAggregatorPadPrivate);
|
||||||
|
|
||||||
pad->buffer = NULL;
|
pad->buffer = NULL;
|
||||||
g_mutex_init (&pad->priv->event_lock);
|
|
||||||
g_cond_init (&pad->priv->event_cond);
|
g_cond_init (&pad->priv->event_cond);
|
||||||
|
|
||||||
g_mutex_init (&pad->priv->stream_lock);
|
g_mutex_init (&pad->priv->stream_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* gst_aggregator_pad_steal_buffer_unlocked:
|
||||||
|
* @pad: the pad to get buffer from
|
||||||
|
*
|
||||||
|
* Steal the ref to the buffer currently queued in @pad.
|
||||||
|
*
|
||||||
|
* MUST be called with the pad's object lock held.
|
||||||
|
*
|
||||||
|
* Returns: (transfer full): The buffer in @pad or NULL if no buffer was
|
||||||
|
* queued. You should unref the buffer after usage.
|
||||||
|
*/
|
||||||
|
GstBuffer *
|
||||||
|
gst_aggregator_pad_steal_buffer_unlocked (GstAggregatorPad * pad)
|
||||||
|
{
|
||||||
|
GstBuffer *buffer = NULL;
|
||||||
|
|
||||||
|
if (pad->buffer) {
|
||||||
|
GST_TRACE_OBJECT (pad, "Consuming buffer");
|
||||||
|
buffer = pad->buffer;
|
||||||
|
pad->buffer = NULL;
|
||||||
|
if (pad->priv->pending_eos) {
|
||||||
|
pad->priv->pending_eos = FALSE;
|
||||||
|
pad->eos = TRUE;
|
||||||
|
}
|
||||||
|
PAD_BROADCAST_EVENT (pad);
|
||||||
|
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* gst_aggregator_pad_steal_buffer:
|
* gst_aggregator_pad_steal_buffer:
|
||||||
* @pad: the pad to get buffer from
|
* @pad: the pad to get buffer from
|
||||||
|
@ -1976,19 +2004,9 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
|
||||||
{
|
{
|
||||||
GstBuffer *buffer = NULL;
|
GstBuffer *buffer = NULL;
|
||||||
|
|
||||||
PAD_LOCK_EVENT (pad);
|
PAD_LOCK (pad);
|
||||||
if (pad->buffer) {
|
buffer = gst_aggregator_pad_steal_buffer_unlocked (pad);
|
||||||
GST_TRACE_OBJECT (pad, "Consuming buffer");
|
PAD_UNLOCK (pad);
|
||||||
buffer = pad->buffer;
|
|
||||||
pad->buffer = NULL;
|
|
||||||
if (pad->priv->pending_eos) {
|
|
||||||
pad->priv->pending_eos = FALSE;
|
|
||||||
pad->eos = TRUE;
|
|
||||||
}
|
|
||||||
PAD_BROADCAST_EVENT (pad);
|
|
||||||
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
|
|
||||||
}
|
|
||||||
PAD_UNLOCK_EVENT (pad);
|
|
||||||
|
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
@ -2006,10 +2024,10 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
|
||||||
{
|
{
|
||||||
GstBuffer *buffer = NULL;
|
GstBuffer *buffer = NULL;
|
||||||
|
|
||||||
PAD_LOCK_EVENT (pad);
|
PAD_LOCK (pad);
|
||||||
if (pad->buffer)
|
if (pad->buffer)
|
||||||
buffer = gst_buffer_ref (pad->buffer);
|
buffer = gst_buffer_ref (pad->buffer);
|
||||||
PAD_UNLOCK_EVENT (pad);
|
PAD_UNLOCK (pad);
|
||||||
|
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,6 +104,7 @@ GType gst_aggregator_pad_get_type (void);
|
||||||
***************************/
|
***************************/
|
||||||
|
|
||||||
GstBuffer * gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad);
|
GstBuffer * gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad);
|
||||||
|
GstBuffer * gst_aggregator_pad_steal_buffer_unlocked (GstAggregatorPad * pad);
|
||||||
GstBuffer * gst_aggregator_pad_get_buffer (GstAggregatorPad * pad);
|
GstBuffer * gst_aggregator_pad_get_buffer (GstAggregatorPad * pad);
|
||||||
|
|
||||||
/*********************
|
/*********************
|
||||||
|
|
|
@ -1199,7 +1199,7 @@ gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad,
|
||||||
GstBuffer *buf;
|
GstBuffer *buf;
|
||||||
/* Buffer done, drop it */
|
/* Buffer done, drop it */
|
||||||
gst_buffer_replace (&pad->buffer, NULL);
|
gst_buffer_replace (&pad->buffer, NULL);
|
||||||
buf = gst_aggregator_pad_steal_buffer (aggpad);
|
buf = gst_aggregator_pad_steal_buffer_unlocked (aggpad);
|
||||||
if (buf)
|
if (buf)
|
||||||
gst_buffer_unref (buf);
|
gst_buffer_unref (buf);
|
||||||
}
|
}
|
||||||
|
@ -1208,7 +1208,7 @@ gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) {
|
if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) {
|
||||||
GstBuffer *aggpadbuf = gst_aggregator_pad_steal_buffer (aggpad);
|
GstBuffer *aggpadbuf = gst_aggregator_pad_steal_buffer_unlocked (aggpad);
|
||||||
|
|
||||||
/* skip gap buffer */
|
/* skip gap buffer */
|
||||||
GST_LOG_OBJECT (pad, "skipping GAP buffer");
|
GST_LOG_OBJECT (pad, "skipping GAP buffer");
|
||||||
|
@ -1330,7 +1330,7 @@ gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad,
|
||||||
|
|
||||||
/* Buffer done, drop it */
|
/* Buffer done, drop it */
|
||||||
gst_buffer_replace (&pad->buffer, NULL);
|
gst_buffer_replace (&pad->buffer, NULL);
|
||||||
buf = gst_aggregator_pad_steal_buffer (aggpad);
|
buf = gst_aggregator_pad_steal_buffer_unlocked (aggpad);
|
||||||
if (buf)
|
if (buf)
|
||||||
gst_buffer_unref (buf);
|
gst_buffer_unref (buf);
|
||||||
GST_DEBUG_OBJECT (pad, "Finished mixing buffer, waiting for next");
|
GST_DEBUG_OBJECT (pad, "Finished mixing buffer, waiting for next");
|
||||||
|
|
Loading…
Reference in a new issue