bufferpool: Refactor stopping of the pool

Move some methods around.
Make sure we check for config parsing errors.
Increment the outstanding buffers before calling acquire so that we can be sure
that set_active() doesn't free the pool from under us.
This commit is contained in:
Wim Taymans 2011-02-21 18:43:19 +01:00
parent e566910a18
commit 0986f3a64a

View file

@ -49,9 +49,9 @@
struct _GstBufferPoolPrivate
{
GStaticRecMutex rec_lock;
guint size;
guint min_buffers;
guint max_buffers;
guint size;
guint prefix;
guint postfix;
guint align;
@ -149,6 +149,30 @@ gst_buffer_pool_new (void)
return result;
}
static GstFlowReturn
default_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GstBufferPoolParams * params)
{
guint size, align;
GstBufferPoolPrivate *priv = pool->priv;
*buffer = gst_buffer_new ();
align = priv->align - 1;
size = priv->prefix + priv->postfix + priv->size + align;
if (size > 0) {
guint8 *memptr;
memptr = g_malloc (size);
GST_BUFFER_MALLOCDATA (*buffer) = memptr;
memptr = (guint8 *) ((guintptr) (memptr + align) & ~align);
GST_BUFFER_DATA (*buffer) = memptr + priv->prefix;
GST_BUFFER_SIZE (*buffer) = priv->size;
}
return GST_FLOW_OK;
}
/* the default implementation for preallocating the buffers
* in the pool */
static gboolean
@ -190,12 +214,34 @@ alloc_failed:
}
}
/* must be called with the lock */
static gboolean
do_start (GstBufferPool * pool)
{
if (!pool->started) {
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* start the pool, subclasses should allocate buffers and put them
* in the queue */
if (G_LIKELY (pclass->start)) {
if (!pclass->start (pool))
return FALSE;
}
pool->started = TRUE;
}
return TRUE;
}
static void
default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
{
gst_buffer_unref (buffer);
}
/* must be called with the lock */
static gboolean
default_stop (GstBufferPool * pool)
{
@ -214,26 +260,7 @@ default_stop (GstBufferPool * pool)
return TRUE;
}
static gboolean
do_start (GstBufferPool * pool)
{
if (!pool->started) {
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* start the pool, subclasses should allocate buffers and put them
* in the queue */
if (G_LIKELY (pclass->start)) {
if (!pclass->start (pool))
return FALSE;
}
pool->started = TRUE;
}
return TRUE;
}
/* must be called with the lock */
static gboolean
do_stop (GstBufferPool * pool)
{
@ -335,12 +362,28 @@ static gboolean
default_set_config (GstBufferPool * pool, GstStructure * config)
{
GstBufferPoolPrivate *priv = pool->priv;
guint size, min_buffers, max_buffers;
guint prefix, postfix, align;
/* parse the config and keep around */
gst_buffer_pool_config_get (config, &priv->size, &priv->min_buffers,
&priv->max_buffers, &priv->prefix, &priv->postfix, &priv->align);
if (!gst_buffer_pool_config_get (config, &size, &min_buffers,
&max_buffers, &prefix, &postfix, &align))
goto wrong_config;
priv->size = size;
priv->min_buffers = min_buffers;
priv->max_buffers = max_buffers;
priv->prefix = prefix;
priv->postfix = postfix;
priv->align = align;
return TRUE;
wrong_config:
{
GST_WARNING_OBJECT (pool, "invalid config");
return FALSE;
}
}
/**
@ -492,30 +535,6 @@ gst_buffer_pool_config_get (GstStructure * config, guint * size,
GST_QUARK (ALIGN), G_TYPE_UINT, align, NULL);
}
static GstFlowReturn
default_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GstBufferPoolParams * params)
{
guint size, align;
GstBufferPoolPrivate *priv = pool->priv;
*buffer = gst_buffer_new ();
align = priv->align - 1;
size = priv->prefix + priv->postfix + priv->size + align;
if (size > 0) {
guint8 *memptr;
memptr = g_malloc (size);
GST_BUFFER_MALLOCDATA (*buffer) = memptr;
memptr = (guint8 *) ((guintptr) (memptr + align) & ~align);
GST_BUFFER_DATA (*buffer) = memptr + priv->prefix;
GST_BUFFER_SIZE (*buffer) = priv->size;
}
return GST_FLOW_OK;
}
static GstFlowReturn
default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GstBufferPoolParams * params)
@ -527,12 +546,12 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
while (TRUE) {
if (g_atomic_int_get (&pool->flushing))
if (G_UNLIKELY (g_atomic_int_get (&pool->flushing)))
return GST_FLOW_WRONG_STATE;
/* try to get a buffer from the queue */
*buffer = gst_atomic_queue_pop (pool->queue);
if (*buffer) {
if (G_LIKELY (*buffer)) {
gst_poll_read_control (pool->poll);
result = GST_FLOW_OK;
break;
@ -561,6 +580,24 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
return result;
}
static inline void
dec_outstanding (GstBufferPool * pool)
{
if (g_atomic_int_dec_and_test (&pool->outstanding)) {
/* all buffers are returned to the pool, see if we need to free them */
if (g_atomic_int_get (&pool->flushing)) {
/* take the lock so that set_active is not run concurrently */
GST_BUFFER_POOL_LOCK (pool);
/* recheck the flushing state in the lock, the pool could have been
* set to active again */
if (g_atomic_int_get (&pool->flushing))
do_stop (pool);
GST_BUFFER_POOL_UNLOCK (pool);
}
}
}
/**
* gst_buffer_pool_acquire_buffer:
* @pool: a #GstBufferPool
@ -587,13 +624,17 @@ gst_buffer_pool_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* assume we'll have one more outstanding buffer we need to do that so
* that concurrent set_active doesn't clear the buffers */
g_atomic_int_inc (&pool->outstanding);
if (G_LIKELY (pclass->acquire_buffer))
result = pclass->acquire_buffer (pool, buffer, params);
else
result = GST_FLOW_NOT_SUPPORTED;
if (G_LIKELY (result == GST_FLOW_OK && *buffer))
g_atomic_int_inc (&pool->outstanding);
if (G_UNLIKELY (result != GST_FLOW_OK))
dec_outstanding (pool);
return result;
}
@ -630,25 +671,5 @@ gst_buffer_pool_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
if (G_LIKELY (pclass->release_buffer))
pclass->release_buffer (pool, buffer);
if (G_UNLIKELY (g_atomic_int_get (&pool->flushing))) {
if (g_atomic_int_dec_and_test (&pool->outstanding)) {
/* take the lock so that set_active is not run concurrently */
GST_BUFFER_POOL_LOCK (pool);
/* recheck the flushing state in the lock, the pool could have been
* set to active again */
if (g_atomic_int_get (&pool->flushing)) {
if (!do_stop (pool))
goto stop_failed;
}
GST_BUFFER_POOL_UNLOCK (pool);
}
}
return;
stop_failed:
{
GST_WARNING_OBJECT (pool, "stop failed");
GST_BUFFER_POOL_UNLOCK (pool);
return;
}
dec_outstanding (pool);
}