bufferpool: memory management cleanups

Use a lock to protect concurrect execution of set_config and set_active.
Start freeing the buffers when flushing and all buffers are returned to the
pool.
Make a copy of the config to avoid crashing with concurrent access.
This commit is contained in:
Wim Taymans 2011-02-21 12:18:41 +01:00
parent 1dff415d8a
commit 419a01af0f
2 changed files with 104 additions and 58 deletions

View file

@ -43,8 +43,12 @@
#define GST_BUFFER_POOL_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BUFFER_POOL, GstBufferPoolPrivate))
#define GST_BUFFER_POOL_LOCK(pool) (g_static_rec_mutex_lock(&pool->priv->rec_lock))
#define GST_BUFFER_POOL_UNLOCK(pool) (g_static_rec_mutex_unlock(&pool->priv->rec_lock))
struct _GstBufferPoolPrivate
{
GStaticRecMutex rec_lock;
guint min_buffers;
guint max_buffers;
guint size;
@ -93,20 +97,40 @@ gst_buffer_pool_init (GstBufferPool * pool)
{
pool->priv = GST_BUFFER_POOL_GET_PRIVATE (pool);
pool->config = gst_structure_id_new (GST_QUARK (BUFFER_POOL_CONFIG),
GST_QUARK (SIZE), G_TYPE_UINT, 0,
GST_QUARK (MIN_BUFFERS), G_TYPE_UINT, 0,
GST_QUARK (MAX_BUFFERS), G_TYPE_UINT, 0,
GST_QUARK (PREFIX), G_TYPE_UINT, 0,
GST_QUARK (POSTFIX), G_TYPE_UINT, 0,
GST_QUARK (ALIGN), G_TYPE_UINT, 1, NULL);
g_static_rec_mutex_init (&pool->priv->rec_lock);
pool->poll = gst_poll_new_timer ();
pool->queue = gst_atomic_queue_new (10);
pool->config = gst_structure_id_empty_new (GST_QUARK (BUFFER_POOL_CONFIG));
gst_buffer_pool_config_set (pool->config, 0, 0, 0, 0, 0, 1);
default_set_active (pool, FALSE);
GST_DEBUG_OBJECT (pool, "created");
}
static void
default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
{
gst_buffer_unref (buffer);
}
static void
flush_buffers (GstBufferPool * pool)
{
GstBuffer *buffer;
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* clear the pool */
while ((buffer = gst_atomic_queue_pop (pool->queue))) {
gst_poll_read_control (pool->poll);
if (G_LIKELY (pclass->free_buffer))
pclass->free_buffer (pool, buffer);
}
}
static void
gst_buffer_pool_finalize (GObject * object)
{
@ -117,9 +141,11 @@ gst_buffer_pool_finalize (GObject * object)
GST_DEBUG_OBJECT (pool, "finalize");
gst_buffer_pool_set_active (pool, FALSE);
flush_buffers (pool);
gst_atomic_queue_unref (pool->queue);
gst_poll_free (pool->poll);
gst_structure_free (pool->config);
g_static_rec_mutex_free (&pool->priv->rec_lock);
G_OBJECT_CLASS (gst_buffer_pool_parent_class)->finalize (object);
}
@ -142,23 +168,6 @@ gst_buffer_pool_new (void)
return result;
}
static void
default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
{
gst_buffer_unref (buffer);
}
static void
free_buffer (GstBufferPool * pool, GstBuffer * buffer)
{
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
if (G_LIKELY (pclass->free_buffer))
pclass->free_buffer (pool, buffer);
}
/* the default implementation for allocating and freeing the
* buffers when changing the active state */
static gboolean
@ -186,14 +195,6 @@ default_set_active (GstBufferPool * pool, gboolean active)
gst_atomic_queue_push (pool->queue, buffer);
gst_poll_write_control (pool->poll);
}
} else {
GstBuffer *buffer;
/* clear the pool */
while ((buffer = gst_atomic_queue_pop (pool->queue))) {
gst_poll_read_control (pool->poll);
free_buffer (pool, buffer);
}
}
return TRUE;
}
@ -219,26 +220,47 @@ gst_buffer_pool_set_active (GstBufferPool * pool, gboolean active)
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
GST_BUFFER_POOL_LOCK (pool);
/* just return if we are already in the right state */
if (!g_atomic_int_compare_and_exchange (&pool->active, !active, active))
return TRUE;
if (pool->active == active)
goto was_ok;
/* we need to be configured */
if (!g_atomic_int_get (&pool->configured))
return FALSE;
if (!pool->configured)
goto not_configured;
if (!active)
if (!active) {
g_atomic_int_set (&pool->flushing, TRUE);
gst_poll_write_control (pool->poll);
}
if (G_LIKELY (pclass->set_active))
res = pclass->set_active (pool, active);
else
res = TRUE;
if (active)
gst_poll_read_control (pool->poll);
if (res) {
if (active) {
gst_poll_read_control (pool->poll);
g_atomic_int_set (&pool->flushing, FALSE);
}
pool->active = active;
}
GST_BUFFER_POOL_UNLOCK (pool);
return res;
was_ok:
{
GST_BUFFER_POOL_UNLOCK (pool);
return TRUE;
}
not_configured:
{
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
}
static gboolean
@ -277,13 +299,14 @@ gst_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), FALSE);
g_return_val_if_fail (config != NULL, FALSE);
GST_BUFFER_POOL_LOCK (pool);
/* can't change the settings when active */
if (g_atomic_int_get (&pool->active))
return FALSE;
if (pool->active)
goto was_active;
/* we can't change when outstanding buffers */
if (g_atomic_int_get (&pool->outstanding) != 0)
return FALSE;
goto have_outstanding;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
@ -299,25 +322,48 @@ gst_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
pool->config = config;
/* now we are configured */
g_atomic_int_set (&pool->configured, 1);
pool->configured = TRUE;
}
GST_BUFFER_POOL_UNLOCK (pool);
return result;
/* ERRORS */
was_active:
{
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
have_outstanding:
{
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
}
/**
* gst_buffer_pool_get_config:
* @pool: a #GstBufferPool
*
* Get the current configuration of the pool. This configuration is read-only,
* use gst_structure_copy() to make a writable copy.
* Get a copy of the current configuration of the pool. This configuration
* can either be modified and used for the gst_buffer_pool_set_config() call
* or it must be freed after usage.
*
* Returns: a copy of the current configuration of @pool. use
* gst_structure_free() after usage or gst_buffer_pool_set_config().
*/
const GstStructure *
GstStructure *
gst_buffer_pool_get_config (GstBufferPool * pool)
{
GstStructure *result;
g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), NULL);
return pool->config;
GST_BUFFER_POOL_UNLOCK (pool);
result = gst_structure_copy (pool->config);
GST_BUFFER_POOL_UNLOCK (pool);
return result;
}
/**
@ -411,7 +457,7 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
while (TRUE) {
if (!g_atomic_int_get (&pool->active))
if (g_atomic_int_get (&pool->flushing))
return GST_FLOW_WRONG_STATE;
/* try to get a buffer from the queue */
@ -485,14 +531,9 @@ gst_buffer_pool_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
static void
default_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
{
if (G_UNLIKELY (!g_atomic_int_get (&pool->active))) {
/* we are inactive, remove the buffer again */
free_buffer (pool, buffer);
} else {
/* keep it around in our queue */
gst_atomic_queue_push (pool->queue, buffer);
gst_poll_write_control (pool->poll);
}
/* keep it around in our queue */
gst_atomic_queue_push (pool->queue, buffer);
gst_poll_write_control (pool->poll);
}
/**
@ -519,5 +560,9 @@ gst_buffer_pool_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
if (G_LIKELY (pclass->release_buffer))
pclass->release_buffer (pool, buffer);
g_atomic_int_exchange_and_add (&pool->outstanding, -1);
if (G_UNLIKELY (g_atomic_int_get (&pool->flushing))) {
if (g_atomic_int_dec_and_test (&pool->outstanding)) {
flush_buffers (pool);
}
}
}

View file

@ -97,6 +97,7 @@ struct _GstBufferPool {
/*< private >*/
gboolean active;
gboolean flushing;
gint outstanding;
GstAtomicQueue *queue;
GstPoll *poll;
@ -135,7 +136,7 @@ GstBufferPool * gst_buffer_pool_new (void);
gboolean gst_buffer_pool_set_active (GstBufferPool *pool, gboolean active);
gboolean gst_buffer_pool_set_config (GstBufferPool *pool, GstStructure *config);
const GstStructure * gst_buffer_pool_get_config (GstBufferPool *pool);
GstStructure * gst_buffer_pool_get_config (GstBufferPool *pool);
/* helpers for configuring the config structure */
void gst_buffer_pool_config_set (GstStructure *config, guint size,