v4l2bufferpool: Acquire cannot return a buffer from another pool

Return a buffer from an otherpool has unwanted side effects that lead to leaks and
prevents deactivating the pool. Instead, we change the _process() API so it can
replace the internal buffer with the buffer from the downstream pool. This implied
moving from _fill() to _create() method in the src.
This commit is contained in:
Nicolas Dufresne 2014-04-29 13:05:41 -04:00
parent 97d0ca853e
commit 815c9b7d35
5 changed files with 58 additions and 63 deletions

View file

@ -1160,48 +1160,16 @@ gst_v4l2_buffer_pool_acquire_buffer (GstBufferPool * bpool, GstBuffer ** buffer,
GST_LOG_OBJECT (pool, "copy buffer %p->%p", *buffer, copy);
/* and requeue so that we can continue capturing */
ret = gst_v4l2_buffer_pool_qbuf (pool, *buffer);
gst_v4l2_buffer_pool_release_buffer (bpool, *buffer);
*buffer = copy;
}
break;
}
case GST_V4L2_IO_USERPTR:
{
struct UserPtrData *data;
/* dequeue filled userptr */
ret = gst_v4l2_buffer_pool_dqbuf (pool, buffer);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto done;
data = gst_mini_object_steal_qdata (GST_MINI_OBJECT (*buffer),
GST_V4L2_IMPORT_QUARK);
/* and requeue so that we can continue capturing */
gst_v4l2_buffer_pool_prepare_buffer (pool, *buffer, NULL);
ret = gst_v4l2_buffer_pool_qbuf (pool, *buffer);
*buffer = data->buffer;
data->buffer = NULL;
_unmap_userptr_frame (data);
break;
}
case GST_V4L2_IO_DMABUF_IMPORT:
{
GstBuffer *tmp;
/* dequeue filled dmabuf */
/* dequeue filled buffer */
ret = gst_v4l2_buffer_pool_dqbuf (pool, buffer);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto done;
tmp = gst_mini_object_steal_qdata (GST_MINI_OBJECT (*buffer),
GST_V4L2_IMPORT_QUARK);
/* and requeue so that we can continue capturing */
gst_v4l2_buffer_pool_prepare_buffer (pool, *buffer, NULL);
ret = gst_v4l2_buffer_pool_qbuf (pool, *buffer);
*buffer = tmp;
break;
}
default:
@ -1342,10 +1310,8 @@ gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer)
GST_BUFFER_POOL_CLASS (parent_class)->release_buffer (bpool,
buffer);
} else {
/* the buffer is queued in the device but maybe not played yet. We just
* leave it there and not make it available for future calls to acquire
* for now. The buffer will be dequeued and reused later. */
GST_LOG_OBJECT (pool, "buffer %u is queued", index);
/* We keep a ref on queued buffer, so this should never happen */
g_assert_not_reached ();
}
break;
}
@ -1533,7 +1499,7 @@ cleanup:
/**
* gst_v4l2_buffer_pool_process:
* @bpool: a #GstBufferPool
* @buf: a #GstBuffer
* @buf: a #GstBuffer, maybe be replaced
*
* Process @buf in @bpool. For capture devices, this functions fills @buf with
* data from the device. For output devices, this functions send the contents of
@ -1542,7 +1508,7 @@ cleanup:
* Returns: %GST_FLOW_OK on success.
*/
GstFlowReturn
gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer * buf)
gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf)
{
GstFlowReturn ret = GST_FLOW_OK;
GstBufferPool *bpool = GST_BUFFER_POOL_CAST (pool);
@ -1559,7 +1525,7 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer * buf)
switch (obj->mode) {
case GST_V4L2_IO_RW:
/* capture into the buffer */
ret = gst_v4l2_do_read (pool, buf);
ret = gst_v4l2_do_read (pool, *buf);
break;
case GST_V4L2_IO_MMAP:
@ -1567,8 +1533,8 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer * buf)
{
GstBuffer *tmp;
if (buf->pool == bpool) {
if (gst_buffer_get_size (buf) == 0)
if ((*buf)->pool == bpool) {
if (gst_buffer_get_size (*buf) == 0)
goto eos;
else
/* nothing, data was inside the buffer when we did _acquire() */
@ -1596,9 +1562,26 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer * buf)
}
case GST_V4L2_IO_USERPTR:
{
struct UserPtrData *data;
/* Replace our buffer with downstream allocated buffer */
data = gst_mini_object_steal_qdata (GST_MINI_OBJECT (*buf),
GST_V4L2_IMPORT_QUARK);
gst_buffer_replace (buf, data->buffer);
_unmap_userptr_frame (data);
break;
}
case GST_V4L2_IO_DMABUF_IMPORT:
{
/* Nothing to do buffer is from the other pool */
GstBuffer *tmp;
/* Replace our buffer with downstream allocated buffer */
tmp = gst_mini_object_steal_qdata (GST_MINI_OBJECT (*buf),
GST_V4L2_IMPORT_QUARK);
gst_buffer_replace (buf, tmp);
gst_buffer_unref (tmp);
break;
}
@ -1624,9 +1607,9 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer * buf)
{
GstBuffer *to_queue;
if (buf->pool == bpool) {
if ((*buf)->pool == bpool) {
/* nothing, we can queue directly */
to_queue = gst_buffer_ref (buf);
to_queue = gst_buffer_ref (*buf);
GST_LOG_OBJECT (pool, "processing buffer from our pool");
} else {
GstBufferPoolAcquireParams params = { 0 };
@ -1641,7 +1624,7 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer * buf)
if (ret != GST_FLOW_OK)
goto acquire_failed;
ret = gst_v4l2_buffer_pool_prepare_buffer (pool, to_queue, buf);
ret = gst_v4l2_buffer_pool_prepare_buffer (pool, to_queue, *buf);
if (ret != GST_FLOW_OK) {
gst_buffer_unref (to_queue);
goto prepare_failed;

View file

@ -84,7 +84,7 @@ GType gst_v4l2_buffer_pool_get_type (void);
GstBufferPool * gst_v4l2_buffer_pool_new (GstV4l2Object *obj, GstCaps *caps);
GstFlowReturn gst_v4l2_buffer_pool_process (GstV4l2BufferPool * bpool, GstBuffer * buf);
GstFlowReturn gst_v4l2_buffer_pool_process (GstV4l2BufferPool * bpool, GstBuffer ** buf);
gboolean gst_v4l2_buffer_pool_flush (GstV4l2BufferPool * pool);

View file

@ -568,8 +568,8 @@ gst_v4l2sink_show_frame (GstBaseSink * bsink, GstBuffer * buf)
goto activate_failed;
}
ret =
gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL_CAST (obj->pool), buf);
ret = gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL_CAST (obj->pool),
&buf);
return ret;

View file

@ -112,7 +112,7 @@ static GstCaps *gst_v4l2src_get_caps (GstBaseSrc * src, GstCaps * filter);
static gboolean gst_v4l2src_query (GstBaseSrc * bsrc, GstQuery * query);
static gboolean gst_v4l2src_decide_allocation (GstBaseSrc * src,
GstQuery * query);
static GstFlowReturn gst_v4l2src_fill (GstPushSrc * src, GstBuffer * out);
static GstFlowReturn gst_v4l2src_create (GstPushSrc * src, GstBuffer ** out);
static GstCaps *gst_v4l2src_fixate (GstBaseSrc * basesrc, GstCaps * caps);
static gboolean gst_v4l2src_negotiate (GstBaseSrc * basesrc);
@ -185,7 +185,7 @@ gst_v4l2src_class_init (GstV4l2SrcClass * klass)
basesrc_class->decide_allocation =
GST_DEBUG_FUNCPTR (gst_v4l2src_decide_allocation);
pushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_v4l2src_fill);
pushsrc_class->create = GST_DEBUG_FUNCPTR (gst_v4l2src_create);
klass->v4l2_class_devices = NULL;
@ -612,7 +612,7 @@ gst_v4l2src_change_state (GstElement * element, GstStateChange transition)
}
static GstFlowReturn
gst_v4l2src_fill (GstPushSrc * src, GstBuffer * buf)
gst_v4l2src_create (GstPushSrc * src, GstBuffer ** buf)
{
GstV4l2Src *v4l2src = GST_V4L2SRC (src);
GstV4l2Object *obj = v4l2src->v4l2object;
@ -621,14 +621,19 @@ gst_v4l2src_fill (GstPushSrc * src, GstBuffer * buf)
GstClockTime abs_time, base_time, timestamp, duration;
GstClockTime delay;
ret = GST_BASE_SRC_CLASS (parent_class)->alloc (GST_BASE_SRC (src), 0,
obj->sizeimage, buf);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto alloc_failed;
ret =
gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL_CAST (obj->pool), buf);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto error;
timestamp = GST_BUFFER_TIMESTAMP (buf);
timestamp = GST_BUFFER_TIMESTAMP (*buf);
duration = obj->duration;
/* timestamps, LOCK to get clock and base time. */
@ -688,8 +693,8 @@ gst_v4l2src_fill (GstPushSrc * src, GstBuffer * buf)
}
/* set buffer metadata */
GST_BUFFER_OFFSET (buf) = v4l2src->offset++;
GST_BUFFER_OFFSET_END (buf) = v4l2src->offset;
GST_BUFFER_OFFSET (*buf) = v4l2src->offset++;
GST_BUFFER_OFFSET_END (*buf) = v4l2src->offset;
if (G_LIKELY (abs_time != GST_CLOCK_TIME_NONE)) {
/* the time now is the time of the clock minus the base time */
@ -718,12 +723,19 @@ gst_v4l2src_fill (GstPushSrc * src, GstBuffer * buf)
GST_INFO_OBJECT (src, "sync to %" GST_TIME_FORMAT " out ts %" GST_TIME_FORMAT,
GST_TIME_ARGS (v4l2src->ctrl_time), GST_TIME_ARGS (timestamp));
GST_BUFFER_TIMESTAMP (buf) = timestamp;
GST_BUFFER_DURATION (buf) = duration;
GST_BUFFER_TIMESTAMP (*buf) = timestamp;
GST_BUFFER_DURATION (*buf) = duration;
return ret;
/* ERROR */
alloc_failed:
{
if (ret != GST_FLOW_FLUSHING)
GST_ELEMENT_ERROR (src, RESOURCE, NO_SPACE_LEFT,
("Failed to allocate a buffer"), (NULL));
return ret;
}
error:
{
GST_DEBUG_OBJECT (src, "error processing buffer %d (%s)", ret,

View file

@ -313,7 +313,7 @@ gst_v4l2_video_dec_finish (GstVideoDecoder * decoder)
buffer = gst_buffer_new ();
ret =
gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL (self->
v4l2output->pool), buffer);
v4l2output->pool), &buffer);
gst_buffer_unref (buffer);
}
GST_VIDEO_DECODER_STREAM_LOCK (decoder);
@ -396,7 +396,7 @@ gst_v4l2_video_dec_loop (GstVideoDecoder * decoder)
GST_LOG_OBJECT (decoder, "Process output buffer");
ret =
gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL (self->
v4l2capture->pool), buffer);
v4l2capture->pool), &buffer);
if (ret != GST_FLOW_OK)
goto beach;
@ -485,7 +485,7 @@ gst_v4l2_video_dec_handle_frame (GstVideoDecoder * decoder,
gst_v4l2_object_unlock_stop (self->v4l2output);
ret =
gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL (self->
v4l2output->pool), codec_data);
v4l2output->pool), &codec_data);
gst_v4l2_object_unlock (self->v4l2output);
GST_VIDEO_DECODER_STREAM_LOCK (decoder);
@ -538,7 +538,7 @@ gst_v4l2_video_dec_handle_frame (GstVideoDecoder * decoder,
GST_VIDEO_DECODER_STREAM_UNLOCK (decoder);
ret =
gst_v4l2_buffer_pool_process (GST_V4L2_BUFFER_POOL (self->v4l2output->
pool), frame->input_buffer);
pool), &frame->input_buffer);
GST_VIDEO_DECODER_STREAM_LOCK (decoder);
if (ret == GST_FLOW_FLUSHING) {