bufferpool: Rework buffer management a little

Add start/stop methods to allow for bulk allocation of buffers.
Free buffers only when all outstanding buffers returned.
Make things more threadsafe wrt flushing and starting/stopping by
keeping track of start and stop method calls.
This commit is contained in:
Wim Taymans 2011-02-21 17:33:38 +01:00
parent 419a01af0f
commit e566910a18
2 changed files with 149 additions and 60 deletions

View file

@ -67,7 +67,8 @@ static void gst_buffer_pool_finalize (GObject * object);
G_DEFINE_TYPE (GstBufferPool, gst_buffer_pool, GST_TYPE_OBJECT);
static gboolean default_set_active (GstBufferPool * pool, gboolean active);
static gboolean default_start (GstBufferPool * pool);
static gboolean default_stop (GstBufferPool * pool);
static gboolean default_set_config (GstBufferPool * pool,
GstStructure * config);
static GstFlowReturn default_alloc_buffer (GstBufferPool * pool,
@ -84,7 +85,8 @@ gst_buffer_pool_class_init (GstBufferPoolClass * klass)
gobject_class->finalize = gst_buffer_pool_finalize;
klass->set_active = default_set_active;
klass->start = default_start;
klass->stop = default_stop;
klass->set_config = default_set_config;
klass->acquire_buffer = default_acquire_buffer;
klass->alloc_buffer = default_alloc_buffer;
@ -101,36 +103,16 @@ gst_buffer_pool_init (GstBufferPool * pool)
pool->poll = gst_poll_new_timer ();
pool->queue = gst_atomic_queue_new (10);
pool->flushing = TRUE;
pool->active = FALSE;
pool->configured = FALSE;
pool->started = FALSE;
pool->config = gst_structure_id_empty_new (GST_QUARK (BUFFER_POOL_CONFIG));
gst_buffer_pool_config_set (pool->config, 0, 0, 0, 0, 0, 1);
default_set_active (pool, FALSE);
GST_DEBUG_OBJECT (pool, "created");
}
static void
default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
{
gst_buffer_unref (buffer);
}
static void
flush_buffers (GstBufferPool * pool)
{
GstBuffer *buffer;
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* clear the pool */
while ((buffer = gst_atomic_queue_pop (pool->queue))) {
gst_poll_read_control (pool->poll);
if (G_LIKELY (pclass->free_buffer))
pclass->free_buffer (pool, buffer);
}
}
static void
gst_buffer_pool_finalize (GObject * object)
{
@ -141,7 +123,6 @@ gst_buffer_pool_finalize (GObject * object)
GST_DEBUG_OBJECT (pool, "finalize");
gst_buffer_pool_set_active (pool, FALSE);
flush_buffers (pool);
gst_atomic_queue_unref (pool->queue);
gst_poll_free (pool->poll);
gst_structure_free (pool->config);
@ -168,33 +149,104 @@ gst_buffer_pool_new (void)
return result;
}
/* the default implementation for allocating and freeing the
* buffers when changing the active state */
/* the default implementation for preallocating the buffers
* in the pool */
static gboolean
default_set_active (GstBufferPool * pool, gboolean active)
default_start (GstBufferPool * pool)
{
guint i;
GstBufferPoolPrivate *priv = pool->priv;
GstBufferPoolClass *pclass;
if (active) {
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* no alloc function, error */
if (G_UNLIKELY (pclass->alloc_buffer == NULL))
goto no_alloc;
/* we need to prealloc buffers */
for (i = 0; i < priv->min_buffers; i++) {
GstBuffer *buffer;
if (pclass->alloc_buffer (pool, &buffer, NULL) != GST_FLOW_OK)
goto alloc_failed;
/* store in the queue */
gst_atomic_queue_push (pool->queue, buffer);
gst_poll_write_control (pool->poll);
}
return TRUE;
/* ERRORS */
no_alloc:
{
GST_WARNING_OBJECT (pool, "no alloc function");
return FALSE;
}
alloc_failed:
{
GST_WARNING_OBJECT (pool, "alloc function failed");
return FALSE;
}
}
static void
default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
{
gst_buffer_unref (buffer);
}
static gboolean
default_stop (GstBufferPool * pool)
{
GstBuffer *buffer;
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* clear the pool */
while ((buffer = gst_atomic_queue_pop (pool->queue))) {
gst_poll_read_control (pool->poll);
if (G_LIKELY (pclass->free_buffer))
pclass->free_buffer (pool, buffer);
}
return TRUE;
}
static gboolean
do_start (GstBufferPool * pool)
{
if (!pool->started) {
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
if (G_UNLIKELY (pclass->alloc_buffer == NULL))
return TRUE;
/* we need to prealloc buffers */
for (i = priv->min_buffers; i > 0; i--) {
GstBuffer *buffer;
if (pclass->alloc_buffer (pool, &buffer, NULL) != GST_FLOW_OK)
/* start the pool, subclasses should allocate buffers and put them
* in the queue */
if (G_LIKELY (pclass->start)) {
if (!pclass->start (pool))
return FALSE;
/* store in the queue */
gst_atomic_queue_push (pool->queue, buffer);
gst_poll_write_control (pool->poll);
}
pool->started = TRUE;
}
return TRUE;
}
static gboolean
do_stop (GstBufferPool * pool)
{
if (pool->started) {
GstBufferPoolClass *pclass;
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
if (G_LIKELY (pclass->stop)) {
if (!pclass->stop (pool))
return FALSE;
}
pool->started = FALSE;
}
return TRUE;
}
@ -214,7 +266,7 @@ gboolean
gst_buffer_pool_set_active (GstBufferPool * pool, gboolean active)
{
GstBufferPoolClass *pclass;
gboolean res;
gboolean res = TRUE;
g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), FALSE);
@ -229,35 +281,51 @@ gst_buffer_pool_set_active (GstBufferPool * pool, gboolean active)
if (!pool->configured)
goto not_configured;
if (!active) {
if (active) {
if (!do_start (pool))
goto start_failed;
/* unset the flushing state now */
gst_poll_read_control (pool->poll);
g_atomic_int_set (&pool->flushing, FALSE);
} else {
/* set to flushing first */
g_atomic_int_set (&pool->flushing, TRUE);
gst_poll_write_control (pool->poll);
}
if (G_LIKELY (pclass->set_active))
res = pclass->set_active (pool, active);
else
res = TRUE;
if (res) {
if (active) {
gst_poll_read_control (pool->poll);
g_atomic_int_set (&pool->flushing, FALSE);
/* when all buffers are in the pool, free them. Else they will be
* freed when they are released */
if (g_atomic_int_get (&pool->outstanding) == 0) {
if (!do_stop (pool))
goto stop_failed;
}
pool->active = active;
}
pool->active = active;
GST_BUFFER_POOL_UNLOCK (pool);
return res;
was_ok:
{
GST_DEBUG_OBJECT (pool, "pool was in the right state");
GST_BUFFER_POOL_UNLOCK (pool);
return TRUE;
}
not_configured:
{
GST_ERROR_OBJECT (pool, "pool was not configured");
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
start_failed:
{
GST_ERROR_OBJECT (pool, "start failed");
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
stop_failed:
{
GST_WARNING_OBJECT (pool, "stop failed");
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
@ -310,7 +378,7 @@ gst_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
pclass = GST_BUFFER_POOL_GET_CLASS (pool);
/* free the buffer when we are inactive */
/* set the new config */
if (G_LIKELY (pclass->set_config))
result = pclass->set_config (pool, config);
else
@ -331,11 +399,13 @@ gst_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
/* ERRORS */
was_active:
{
GST_WARNING_OBJECT (pool, "can't change config, we are active");
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
have_outstanding:
{
GST_WARNING_OBJECT (pool, "can't change config, have outstanding buffers");
GST_BUFFER_POOL_UNLOCK (pool);
return FALSE;
}
@ -562,7 +632,23 @@ gst_buffer_pool_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
if (G_UNLIKELY (g_atomic_int_get (&pool->flushing))) {
if (g_atomic_int_dec_and_test (&pool->outstanding)) {
flush_buffers (pool);
/* take the lock so that set_active is not run concurrently */
GST_BUFFER_POOL_LOCK (pool);
/* recheck the flushing state in the lock, the pool could have been
* set to active again */
if (g_atomic_int_get (&pool->flushing)) {
if (!do_stop (pool))
goto stop_failed;
}
GST_BUFFER_POOL_UNLOCK (pool);
}
}
return;
stop_failed:
{
GST_WARNING_OBJECT (pool, "stop failed");
GST_BUFFER_POOL_UNLOCK (pool);
return;
}
}

View file

@ -98,6 +98,7 @@ struct _GstBufferPool {
/*< private >*/
gboolean active;
gboolean flushing;
gboolean started;
gint outstanding;
GstAtomicQueue *queue;
GstPoll *poll;
@ -114,9 +115,11 @@ struct _GstBufferPoolClass {
GstObjectClass object_class;
/* vmethods */
gboolean (*set_active) (GstBufferPool *pool, gboolean active);
gboolean (*set_config) (GstBufferPool *pool, GstStructure *config);
gboolean (*start) (GstBufferPool *pool);
gboolean (*stop) (GstBufferPool *pool);
GstFlowReturn (*acquire_buffer) (GstBufferPool *pool, GstBuffer **buffer,
GstBufferPoolParams *params);
GstFlowReturn (*alloc_buffer) (GstBufferPool *pool, GstBuffer **buffer,