bufferpool: fix max_buffers handling

When max_buffers > 0 and the pool is empty, actually try to allocate more
buffers up to the max_buffers limit.
We need to add a counter for this to count how many buffers we allocated and
check this against the max_buffers limit.
Reorganise and clean up some code.

Fixes https://bugzilla.gnome.org/show_bug.cgi?id=681153
This commit is contained in:
Wim Taymans 2012-08-10 12:23:03 +02:00
parent fe082cbe24
commit fb78756679

View file

@ -108,6 +108,7 @@ struct _GstBufferPoolPrivate
guint size;
guint min_buffers;
guint max_buffers;
guint cur_buffers;
GstAllocator *allocator;
GstAllocationParams params;
};
@ -247,6 +248,58 @@ mark_meta_pooled (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
return TRUE;
}
static GstFlowReturn
do_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GstBufferPoolAcquireParams * params)
{
GstBufferPoolPrivate *priv = pool->priv;
GstFlowReturn result;
gint cur_buffers, max_buffers;
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
if (G_UNLIKELY (!pclass->alloc_buffer))
goto no_function;
max_buffers = priv->max_buffers;
/* increment the allocation counter */
cur_buffers = g_atomic_int_add (&priv->cur_buffers, 1);
if (max_buffers && cur_buffers >= max_buffers)
goto max_reached;
result = pclass->alloc_buffer (pool, buffer, params);
if (G_UNLIKELY (result != GST_FLOW_OK))
goto alloc_failed;
gst_buffer_foreach_meta (*buffer, mark_meta_pooled, pool);
GST_LOG_OBJECT (pool, "allocated buffer %d/%d, %p", cur_buffers,
max_buffers, buffer);
return result;
/* ERRORS */
no_function:
{
GST_ERROR_OBJECT (pool, "no alloc function");
return GST_FLOW_NOT_SUPPORTED;
}
max_reached:
{
GST_DEBUG_OBJECT (pool, "max buffers reached");
g_atomic_int_add (&priv->cur_buffers, -1);
return GST_FLOW_EOS;
}
alloc_failed:
{
GST_WARNING_OBJECT (pool, "alloc function failed");
g_atomic_int_add (&priv->cur_buffers, -1);
return result;
}
}
/* the default implementation for preallocating the buffers
* in the pool */
static gboolean
@ -258,19 +311,13 @@ default_start (GstBufferPool * pool)
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* no alloc function, error */
if (G_UNLIKELY (pclass->alloc_buffer == NULL))
goto no_alloc;
/* we need to prealloc buffers */
for (i = 0; i < priv->min_buffers; i++) {
GstBuffer *buffer;
if (pclass->alloc_buffer (pool, &buffer, NULL) != GST_FLOW_OK)
if (do_alloc_buffer (pool, &buffer, NULL) != GST_FLOW_OK)
goto alloc_failed;
gst_buffer_foreach_meta (buffer, mark_meta_pooled, pool);
GST_LOG_OBJECT (pool, "prealloced buffer %d: %p", i, buffer);
/* release to the queue, we call the vmethod directly, we don't need to do
* the other refcount handling right now. */
if (G_LIKELY (pclass->release_buffer))
@ -279,14 +326,9 @@ default_start (GstBufferPool * pool)
return TRUE;
/* ERRORS */
no_alloc:
{
GST_WARNING_OBJECT (pool, "no alloc function");
return FALSE;
}
alloc_failed:
{
GST_WARNING_OBJECT (pool, "alloc function failed");
GST_WARNING_OBJECT (pool, "failed to allocate buffer");
return FALSE;
}
}
@ -295,7 +337,9 @@ alloc_failed:
static gboolean
do_start (GstBufferPool * pool)
{
if (!pool->priv->started) {
GstBufferPoolPrivate *priv = pool->priv;
if (!priv->started) {
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
@ -307,7 +351,7 @@ do_start (GstBufferPool * pool)
if (!pclass->start (pool))
return FALSE;
}
pool->priv->started = TRUE;
priv->started = TRUE;
}
return TRUE;
}
@ -323,19 +367,22 @@ default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
static gboolean
default_stop (GstBufferPool * pool)
{
GstBufferPoolPrivate *priv = pool->priv;
GstBuffer *buffer;
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* clear the pool */
while ((buffer = gst_atomic_queue_pop (pool->priv->queue))) {
while ((buffer = gst_atomic_queue_pop (priv->queue))) {
GST_LOG_OBJECT (pool, "freeing %p", buffer);
gst_poll_read_control (pool->priv->poll);
gst_poll_read_control (priv->poll);
if (G_LIKELY (pclass->free_buffer))
pclass->free_buffer (pool, buffer);
}
priv->cur_buffers = 0;
return TRUE;
}
@ -343,7 +390,9 @@ default_stop (GstBufferPool * pool)
static gboolean
do_stop (GstBufferPool * pool)
{
if (pool->priv->started) {
GstBufferPoolPrivate *priv = pool->priv;
if (priv->started) {
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
@ -353,7 +402,7 @@ do_stop (GstBufferPool * pool)
if (!pclass->stop (pool))
return FALSE;
}
pool->priv->started = FALSE;
priv->started = FALSE;
}
return TRUE;
}
@ -380,18 +429,21 @@ gboolean
gst_buffer_pool_set_active (GstBufferPool * pool, gboolean active)
{
gboolean res = TRUE;
GstBufferPoolPrivate *priv;
g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), FALSE);
GST_LOG_OBJECT (pool, "active %d", active);
priv = pool->priv;
GST_BUFFER_POOL_LOCK (pool);
/* just return if we are already in the right state */
if (pool->priv->active == active)
if (priv->active == active)
goto was_ok;
/* we need to be configured */
if (!pool->priv->configured)
if (!priv->configured)
goto not_configured;
if (active) {
@ -399,25 +451,25 @@ gst_buffer_pool_set_active (GstBufferPool * pool, gboolean active)
goto start_failed;
/* unset the flushing state now */
gst_poll_read_control (pool->priv->poll);
gst_poll_read_control (priv->poll);
g_atomic_int_set (&pool->flushing, 0);
} else {
gint outstanding;
/* set to flushing first */
g_atomic_int_set (&pool->flushing, 1);
gst_poll_write_control (pool->priv->poll);
gst_poll_write_control (priv->poll);
/* when all buffers are in the pool, free them. Else they will be
* freed when they are released */
outstanding = g_atomic_int_get (&pool->priv->outstanding);
outstanding = g_atomic_int_get (&priv->outstanding);
GST_LOG_OBJECT (pool, "outstanding buffers %d", outstanding);
if (outstanding == 0) {
if (!do_stop (pool))
goto stop_failed;
}
}
pool->priv->active = active;
priv->active = active;
GST_BUFFER_POOL_UNLOCK (pool);
return res;
@ -491,6 +543,7 @@ default_set_config (GstBufferPool * pool, GstStructure * config)
priv->size = size;
priv->min_buffers = min_buffers;
priv->max_buffers = max_buffers;
priv->cur_buffers = 0;
if (priv->allocator)
gst_object_unref (priv->allocator);
@ -531,17 +584,20 @@ gst_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
{
gboolean result;
GstBufferPoolClass *pclass;
GstBufferPoolPrivate *priv;
g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), FALSE);
g_return_val_if_fail (config != NULL, FALSE);
priv = pool->priv;
GST_BUFFER_POOL_LOCK (pool);
/* can't change the settings when active */
if (pool->priv->active)
if (priv->active)
goto was_active;
/* we can't change when outstanding buffers */
if (g_atomic_int_get (&pool->priv->outstanding) != 0)
if (g_atomic_int_get (&priv->outstanding) != 0)
goto have_outstanding;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
@ -553,12 +609,12 @@ gst_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
result = FALSE;
if (result) {
if (pool->priv->config)
gst_structure_free (pool->priv->config);
pool->priv->config = config;
if (priv->config)
gst_structure_free (priv->config);
priv->config = config;
/* now we are configured */
pool->priv->configured = TRUE;
priv->configured = TRUE;
} else {
gst_structure_free (config);
}
@ -917,49 +973,41 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GstBufferPoolAcquireParams * params)
{
GstFlowReturn result;
GstBufferPoolClass *pclass;
GstBufferPoolPrivate *priv = pool->priv;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
while (TRUE) {
if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool)))
goto flushing;
/* try to get a buffer from the queue */
*buffer = gst_atomic_queue_pop (pool->priv->queue);
*buffer = gst_atomic_queue_pop (priv->queue);
if (G_LIKELY (*buffer)) {
gst_poll_read_control (pool->priv->poll);
gst_poll_read_control (priv->poll);
result = GST_FLOW_OK;
GST_LOG_OBJECT (pool, "acquired buffer %p", *buffer);
break;
}
/* no buffer */
if (priv->max_buffers == 0) {
/* no max_buffers, we allocate some more */
if (G_LIKELY (pclass->alloc_buffer)) {
result = pclass->alloc_buffer (pool, buffer, params);
if (result == GST_FLOW_OK && *buffer)
gst_buffer_foreach_meta (*buffer, mark_meta_pooled, pool);
else
result = GST_FLOW_ERROR;
} else
result = GST_FLOW_NOT_SUPPORTED;
GST_LOG_OBJECT (pool, "alloc buffer %p", *buffer);
/* no buffer, try to allocate some more */
GST_LOG_OBJECT (pool, "no buffer, trying to allocate");
result = do_alloc_buffer (pool, buffer, NULL);
if (G_LIKELY (result == GST_FLOW_OK))
/* we have a buffer, return it */
break;
if (G_UNLIKELY (result != GST_FLOW_EOS))
/* something went wrong, return error */
break;
}
/* check if we need to wait */
if (params && (params->flags & GST_BUFFER_POOL_ACQUIRE_FLAG_DONTWAIT)) {
GST_LOG_OBJECT (pool, "no more buffers");
result = GST_FLOW_EOS;
break;
}
/* now wait */
GST_LOG_OBJECT (pool, "waiting for free buffers");
gst_poll_wait (pool->priv->poll, GST_CLOCK_TIME_NONE);
gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE);
}
return result;