From 0986f3a64ad50ad5c4e2e791f13d22a1d26b655a Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 21 Feb 2011 18:43:19 +0100 Subject: [PATCH] bufferpool: Refactor stopping of the pool Move some methods around. Make sure we check for config parsing errors. Increment the outstanding buffers before calling acquire so that we can be sure that set_active() doesn't free the pool from under us. --- gst/gstbufferpool.c | 165 +++++++++++++++++++++++++------------------- 1 file changed, 93 insertions(+), 72 deletions(-) diff --git a/gst/gstbufferpool.c b/gst/gstbufferpool.c index 7ab221a02a..8ac4cb6ca0 100644 --- a/gst/gstbufferpool.c +++ b/gst/gstbufferpool.c @@ -49,9 +49,9 @@ struct _GstBufferPoolPrivate { GStaticRecMutex rec_lock; + guint size; guint min_buffers; guint max_buffers; - guint size; guint prefix; guint postfix; guint align; @@ -149,6 +149,30 @@ gst_buffer_pool_new (void) return result; } +static GstFlowReturn +default_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer, + GstBufferPoolParams * params) +{ + guint size, align; + GstBufferPoolPrivate *priv = pool->priv; + + *buffer = gst_buffer_new (); + + align = priv->align - 1; + size = priv->prefix + priv->postfix + priv->size + align; + if (size > 0) { + guint8 *memptr; + + memptr = g_malloc (size); + GST_BUFFER_MALLOCDATA (*buffer) = memptr; + memptr = (guint8 *) ((guintptr) (memptr + align) & ~align); + GST_BUFFER_DATA (*buffer) = memptr + priv->prefix; + GST_BUFFER_SIZE (*buffer) = priv->size; + } + + return GST_FLOW_OK; +} + /* the default implementation for preallocating the buffers * in the pool */ static gboolean @@ -190,12 +214,34 @@ alloc_failed: } } +/* must be called with the lock */ +static gboolean +do_start (GstBufferPool * pool) +{ + if (!pool->started) { + GstBufferPoolClass *pclass; + + pclass = GST_BUFFER_POOL_GET_CLASS (pool); + + /* start the pool, subclasses should allocate buffers and put them + * in the queue */ + if (G_LIKELY (pclass->start)) { + if (!pclass->start (pool)) + return FALSE; + } + pool->started = TRUE; + } + return TRUE; +} + + static void default_free_buffer (GstBufferPool * pool, GstBuffer * buffer) { gst_buffer_unref (buffer); } +/* must be called with the lock */ static gboolean default_stop (GstBufferPool * pool) { @@ -214,26 +260,7 @@ default_stop (GstBufferPool * pool) return TRUE; } -static gboolean -do_start (GstBufferPool * pool) -{ - - if (!pool->started) { - GstBufferPoolClass *pclass; - - pclass = GST_BUFFER_POOL_GET_CLASS (pool); - - /* start the pool, subclasses should allocate buffers and put them - * in the queue */ - if (G_LIKELY (pclass->start)) { - if (!pclass->start (pool)) - return FALSE; - } - pool->started = TRUE; - } - return TRUE; -} - +/* must be called with the lock */ static gboolean do_stop (GstBufferPool * pool) { @@ -335,12 +362,28 @@ static gboolean default_set_config (GstBufferPool * pool, GstStructure * config) { GstBufferPoolPrivate *priv = pool->priv; + guint size, min_buffers, max_buffers; + guint prefix, postfix, align; /* parse the config and keep around */ - gst_buffer_pool_config_get (config, &priv->size, &priv->min_buffers, - &priv->max_buffers, &priv->prefix, &priv->postfix, &priv->align); + if (!gst_buffer_pool_config_get (config, &size, &min_buffers, + &max_buffers, &prefix, &postfix, &align)) + goto wrong_config; + + priv->size = size; + priv->min_buffers = min_buffers; + priv->max_buffers = max_buffers; + priv->prefix = prefix; + priv->postfix = postfix; + priv->align = align; return TRUE; + +wrong_config: + { + GST_WARNING_OBJECT (pool, "invalid config"); + return FALSE; + } } /** @@ -492,30 +535,6 @@ gst_buffer_pool_config_get (GstStructure * config, guint * size, GST_QUARK (ALIGN), G_TYPE_UINT, align, NULL); } -static GstFlowReturn -default_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer, - GstBufferPoolParams * params) -{ - guint size, align; - GstBufferPoolPrivate *priv = pool->priv; - - *buffer = gst_buffer_new (); - - align = priv->align - 1; - size = priv->prefix + priv->postfix + priv->size + align; - if (size > 0) { - guint8 *memptr; - - memptr = g_malloc (size); - GST_BUFFER_MALLOCDATA (*buffer) = memptr; - memptr = (guint8 *) ((guintptr) (memptr + align) & ~align); - GST_BUFFER_DATA (*buffer) = memptr + priv->prefix; - GST_BUFFER_SIZE (*buffer) = priv->size; - } - - return GST_FLOW_OK; -} - static GstFlowReturn default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, GstBufferPoolParams * params) @@ -527,12 +546,12 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, pclass = GST_BUFFER_POOL_GET_CLASS (pool); while (TRUE) { - if (g_atomic_int_get (&pool->flushing)) + if (G_UNLIKELY (g_atomic_int_get (&pool->flushing))) return GST_FLOW_WRONG_STATE; /* try to get a buffer from the queue */ *buffer = gst_atomic_queue_pop (pool->queue); - if (*buffer) { + if (G_LIKELY (*buffer)) { gst_poll_read_control (pool->poll); result = GST_FLOW_OK; break; @@ -561,6 +580,24 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, return result; } +static inline void +dec_outstanding (GstBufferPool * pool) +{ + if (g_atomic_int_dec_and_test (&pool->outstanding)) { + /* all buffers are returned to the pool, see if we need to free them */ + if (g_atomic_int_get (&pool->flushing)) { + /* take the lock so that set_active is not run concurrently */ + GST_BUFFER_POOL_LOCK (pool); + /* recheck the flushing state in the lock, the pool could have been + * set to active again */ + if (g_atomic_int_get (&pool->flushing)) + do_stop (pool); + + GST_BUFFER_POOL_UNLOCK (pool); + } + } +} + /** * gst_buffer_pool_acquire_buffer: * @pool: a #GstBufferPool @@ -587,13 +624,17 @@ gst_buffer_pool_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, pclass = GST_BUFFER_POOL_GET_CLASS (pool); + /* assume we'll have one more outstanding buffer we need to do that so + * that concurrent set_active doesn't clear the buffers */ + g_atomic_int_inc (&pool->outstanding); + if (G_LIKELY (pclass->acquire_buffer)) result = pclass->acquire_buffer (pool, buffer, params); else result = GST_FLOW_NOT_SUPPORTED; - if (G_LIKELY (result == GST_FLOW_OK && *buffer)) - g_atomic_int_inc (&pool->outstanding); + if (G_UNLIKELY (result != GST_FLOW_OK)) + dec_outstanding (pool); return result; } @@ -630,25 +671,5 @@ gst_buffer_pool_release_buffer (GstBufferPool * pool, GstBuffer * buffer) if (G_LIKELY (pclass->release_buffer)) pclass->release_buffer (pool, buffer); - if (G_UNLIKELY (g_atomic_int_get (&pool->flushing))) { - if (g_atomic_int_dec_and_test (&pool->outstanding)) { - /* take the lock so that set_active is not run concurrently */ - GST_BUFFER_POOL_LOCK (pool); - /* recheck the flushing state in the lock, the pool could have been - * set to active again */ - if (g_atomic_int_get (&pool->flushing)) { - if (!do_stop (pool)) - goto stop_failed; - } - GST_BUFFER_POOL_UNLOCK (pool); - } - } - return; - -stop_failed: - { - GST_WARNING_OBJECT (pool, "stop failed"); - GST_BUFFER_POOL_UNLOCK (pool); - return; - } + dec_outstanding (pool); }