v4l2: Record buffer states in pool to fix dequeue race

The `gst_v4l2_buffer_pool_dqbuf` function contains this ominous comment:

    /* get our GstBuffer with that index from the pool, if the buffer was
     * outstanding we have a serious problem.
     */
    outbuf = pool->buffers[group->buffer.index];

Unfortunately it is common for buffers in _output_ buffer pools to be
both queued and outstanding at the same time. This can happen if the
upstream element keeps a reference to the buffer, or in an encoder
element itself when it keeps a reference to the input buffer for each
frame.

Since the current code doesn't handle this case properly we can end up
with crashes in other elements such as:

    (gst-launch-1.0:32559): CRITICAL **: 17:33:35.740: gst_video_frame_map_id: assertion 'GST_IS_BUFFER (buffer)' failed

and:

    (gst-launch-1.0:231): GStreamer-CRITICAL **: 00:16:20.882: write map requested on non-writable buffer

Both these crashes are caused by a race condition related to releasing
the same buffer twice from two different threads. If a buffer is queued
and outstanding this situation is possible:

**Thread 1**
- Calls `gst_buffer_unref` decrementing the reference count to zero.
- The core GstBufferPool object marks the buffer non-outstanding.
- Calls the V4L2 release buffer function.
- If the buffer is _not_ queued:
  - Release it back to the free pool (containing non-queued buffers).

**Thread 2**
- Dequeues the queued output buffer.
  - Marks the buffer as not queued.
- If the buffer is _not_ outstanding:
  - Calls the V4L2 release buffer function.
  - Release it back to the free pool (containing non-queued buffers).

If both of these threads run at exactly the same time there is a small
window where the buffer is marked both not outstanding and not queued
but before it has been released. In this case the buffer will be freed
twice causing the above crashes.

Unfortunately the variable recording whether a buffer is outstanding is
part of the core `GstBuffer` object and is managed by `GstBufferPool` so
it's not as straightforward as adding a mutex. Instead we can fix this
by additionally recording the buffer state in `GstV4l2BufferPool`, and
handle "internal" and "external" buffer release separately so we can
detect when a buffer becomes not outstanding.

In the new solution:
- The "external" buffer pool release and the "dqbuf" functions
  atomically update the buffer state and determine if a buffer is still
  queued or outstanding.
- Subsequent code and a new
  `gst_v4l2_buffer_pool_complete_release_buffer` function can proceed to
  release (or not) a buffer knowing that it's not racing with another
  thread.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1010>
This commit is contained in:
James Cowgill 2021-09-27 16:52:22 +01:00 committed by GStreamer Marge Bot
parent ffcf697c2d
commit 1e27ea63af
2 changed files with 126 additions and 32 deletions

View file

@ -66,8 +66,22 @@ enum _GstV4l2BufferPoolAcquireFlags
GST_V4L2_BUFFER_POOL_ACQUIRE_FLAG_LAST
};
static void gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool,
GstBuffer * buffer);
/* Buffer state flags */
enum _GstV4l2BufferState
{
/* Buffer is free (either on the GstBufferPool free queue, or no GstBuffer has
* been allocated yet) */
BUFFER_STATE_FREE = 0,
/* Buffer had outstanding external users */
BUFFER_STATE_OUTSTANDING = 1,
/* Buffer is on one of the kernel queues */
BUFFER_STATE_QUEUED = 2,
};
static void gst_v4l2_buffer_pool_complete_release_buffer (GstBufferPool * bpool,
GstBuffer * buffer, gboolean queued);
static gboolean
gst_v4l2_is_buffer_valid (GstBuffer * buffer, GstV4l2MemoryGroup ** out_group)
@ -458,6 +472,11 @@ gst_v4l2_buffer_pool_alloc_buffer (GstBufferPool * bpool, GstBuffer ** buffer,
for (i = 0; i < group->n_mem; i++)
gst_buffer_append_memory (newbuf, group->mem[i]);
if (g_atomic_int_get (&pool->buffer_state[group->buffer.index])) {
GST_WARNING_OBJECT (pool, "newly allocated buffer %u is not free",
group->buffer.index);
}
} else if (newbuf == NULL) {
goto allocation_failed;
}
@ -723,16 +742,21 @@ gst_v4l2_buffer_pool_streamoff (GstV4l2BufferPool * pool)
}
for (i = 0; i < VIDEO_MAX_FRAME; i++) {
if (pool->buffers[i]) {
gint old_buffer_state =
g_atomic_int_and (&pool->buffer_state[i], ~BUFFER_STATE_QUEUED);
if (old_buffer_state & BUFFER_STATE_QUEUED) {
GstBuffer *buffer = pool->buffers[i];
GstBufferPool *bpool = GST_BUFFER_POOL (pool);
pool->buffers[i] = NULL;
if (V4L2_TYPE_IS_OUTPUT (pool->obj->type))
gst_v4l2_buffer_pool_release_buffer (bpool, buffer);
else /* Don't re-enqueue capture buffer on stop */
pclass->release_buffer (bpool, buffer);
if (!(old_buffer_state & BUFFER_STATE_OUTSTANDING)) {
if (V4L2_TYPE_IS_OUTPUT (pool->obj->type))
gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer, FALSE);
else /* Don't re-enqueue capture buffer on stop */
pclass->release_buffer (bpool, buffer);
}
g_atomic_int_add (&pool->num_queued, -1);
}
@ -1175,14 +1199,18 @@ gst_v4l2_buffer_pool_qbuf (GstV4l2BufferPool * pool, GstBuffer * buf,
GstV4l2MemoryGroup * group, guint32 * frame_number)
{
const GstV4l2Object *obj = pool->obj;
gint old_buffer_state;
gint index;
index = group->buffer.index;
if (pool->buffers[index] != NULL)
old_buffer_state =
g_atomic_int_or (&pool->buffer_state[index], BUFFER_STATE_QUEUED);
if (old_buffer_state & BUFFER_STATE_QUEUED)
goto already_queued;
GST_LOG_OBJECT (pool, "queuing buffer %i", index);
GST_LOG_OBJECT (pool, "queuing buffer %i, previous-state = %i", index,
old_buffer_state);
if (V4L2_TYPE_IS_OUTPUT (obj->type)) {
enum v4l2_field field;
@ -1238,6 +1266,7 @@ was_orphaned:
{
GST_DEBUG_OBJECT (pool, "pool was orphaned, not queuing back buffer.");
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_TAG_MEMORY);
g_atomic_int_and (&pool->buffer_state[index], ~BUFFER_STATE_QUEUED);
GST_OBJECT_UNLOCK (pool);
return GST_FLOW_FLUSHING;
}
@ -1248,6 +1277,7 @@ queue_failed:
GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_TAG_MEMORY);
g_atomic_int_add (&pool->num_queued, -1);
pool->buffers[index] = NULL;
g_atomic_int_and (&pool->buffer_state[index], ~BUFFER_STATE_QUEUED);
GST_OBJECT_UNLOCK (pool);
return GST_FLOW_ERROR;
}
@ -1255,7 +1285,7 @@ queue_failed:
static GstFlowReturn
gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer,
gboolean wait)
gboolean * outstanding, gboolean wait)
{
GstFlowReturn res;
GstBuffer *outbuf = NULL;
@ -1265,6 +1295,7 @@ gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer,
GstVideoMeta *vmeta;
gsize size;
gint i;
gint old_buffer_state;
if ((res = gst_v4l2_buffer_pool_poll (pool, wait)) < GST_FLOW_OK)
goto poll_failed;
@ -1287,14 +1318,23 @@ gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer,
if (res != GST_FLOW_OK)
goto dqbuf_failed;
/* get our GstBuffer with that index from the pool, if the buffer was
* outstanding we have a serious problem.
*/
old_buffer_state =
g_atomic_int_and (&pool->buffer_state[group->buffer.index],
~BUFFER_STATE_QUEUED);
if (!(old_buffer_state & BUFFER_STATE_QUEUED))
goto no_buffer;
if (outstanding) {
*outstanding = (old_buffer_state & BUFFER_STATE_OUTSTANDING) != 0;
} else if (old_buffer_state & BUFFER_STATE_OUTSTANDING) {
GST_WARNING_OBJECT (pool, "unexpected outstanding buffer %u",
group->buffer.index);
}
outbuf = pool->buffers[group->buffer.index];
if (outbuf == NULL)
goto no_buffer;
/* mark the buffer outstanding */
pool->buffers[group->buffer.index] = NULL;
if (g_atomic_int_dec_and_test (&pool->num_queued)) {
GST_OBJECT_LOCK (pool);
@ -1309,10 +1349,10 @@ gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer,
for (i = 0; i < group->n_mem; i++) {
GST_LOG_OBJECT (pool,
"dequeued buffer %p seq:%d (ix=%d), mem %p used %d, plane=%d, flags %08x, ts %"
GST_TIME_FORMAT ", pool-queued=%d, buffer=%p", outbuf,
group->buffer.sequence, group->buffer.index, group->mem[i],
GST_TIME_FORMAT ", pool-queued=%d, buffer=%p, previous-state=%i",
outbuf, group->buffer.sequence, group->buffer.index, group->mem[i],
group->planes[i].bytesused, i, group->buffer.flags,
GST_TIME_ARGS (timestamp), pool->num_queued, outbuf);
GST_TIME_ARGS (timestamp), pool->num_queued, outbuf, old_buffer_state);
if (vmeta) {
vmeta->offset[i] = size;
@ -1473,7 +1513,7 @@ gst_v4l2_buffer_pool_acquire_buffer (GstBufferPool * bpool, GstBuffer ** buffer,
/* just dequeue a buffer, we basically use the queue of v4l2 as the
* storage for our buffers. This function does poll first so we can
* interrupt it fine. */
ret = gst_v4l2_buffer_pool_dqbuf (pool, buffer, TRUE);
ret = gst_v4l2_buffer_pool_dqbuf (pool, buffer, NULL, TRUE);
break;
}
default:
@ -1514,23 +1554,48 @@ gst_v4l2_buffer_pool_acquire_buffer (GstBufferPool * bpool, GstBuffer ** buffer,
break;
}
done:
/* Mark buffer as outstanding */
if (ret == GST_FLOW_OK) {
GstV4l2MemoryGroup *group;
if (gst_v4l2_is_buffer_valid (*buffer, &group)) {
GST_LOG_OBJECT (pool, "mark buffer %u outstanding", group->buffer.index);
g_atomic_int_or (&pool->buffer_state[group->buffer.index],
BUFFER_STATE_OUTSTANDING);
}
}
return ret;
}
/*
* Completes a release buffer operation
*
* Before calling this function:
* - The buffer state (if applicable) must have already been updated.
* - The buffer must not be outstanding.
* - The "queued" argument contains whether the buffer is currently queued.
*/
static void
gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer)
gst_v4l2_buffer_pool_complete_release_buffer (GstBufferPool * bpool,
GstBuffer * buffer, gboolean queued)
{
GstV4l2BufferPool *pool = GST_V4L2_BUFFER_POOL (bpool);
GstBufferPoolClass *pclass = GST_BUFFER_POOL_CLASS (parent_class);
GstV4l2Object *obj = pool->obj;
GST_DEBUG_OBJECT (pool, "release buffer %p", buffer);
GST_DEBUG_OBJECT (pool, "complete release buffer %p (queued = %s)", buffer,
queued ? "yes" : "no");
switch (obj->type) {
case V4L2_BUF_TYPE_VIDEO_CAPTURE:
case V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE:
/* capture, put the buffer back in the queue so that we can refill it
* later. */
if (queued) {
GST_WARNING_OBJECT (pool,
"capture buffer %p was release while still queued", buffer);
}
switch (obj->mode) {
case GST_V4L2_IO_RW:
/* release back in the pool */
@ -1594,7 +1659,7 @@ gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer)
index = group->buffer.index;
if (pool->buffers[index] == NULL) {
if (!queued) {
GST_LOG_OBJECT (pool, "buffer %u not queued, putting on free list",
index);
@ -1628,6 +1693,25 @@ gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer)
}
}
static void
gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer)
{
GstV4l2BufferPool *pool = GST_V4L2_BUFFER_POOL (bpool);
GstV4l2MemoryGroup *group;
gboolean queued = FALSE;
if (gst_v4l2_is_buffer_valid (buffer, &group)) {
gint old_buffer_state =
g_atomic_int_and (&pool->buffer_state[group->buffer.index],
~BUFFER_STATE_OUTSTANDING);
queued = (old_buffer_state & BUFFER_STATE_QUEUED) != 0;
GST_LOG_OBJECT (pool, "mark buffer %u not outstanding",
group->buffer.index);
}
gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer, queued);
}
static void
gst_v4l2_buffer_pool_dispose (GObject * object)
{
@ -1678,6 +1762,8 @@ gst_v4l2_buffer_pool_init (GstV4l2BufferPool * pool)
g_cond_init (&pool->empty_cond);
pool->empty = TRUE;
pool->orphaned = FALSE;
for (gint i = 0; i < VIDEO_MAX_FRAME; i++)
g_atomic_int_set (&pool->buffer_state[i], BUFFER_STATE_FREE);
}
static void
@ -1931,13 +2017,13 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
}
/* buffer not from our pool, grab a frame and copy it into the target */
if ((ret = gst_v4l2_buffer_pool_dqbuf (pool, &tmp, TRUE))
if ((ret = gst_v4l2_buffer_pool_dqbuf (pool, &tmp, NULL, TRUE))
!= GST_FLOW_OK)
goto done;
/* An empty buffer on capture indicates the end of stream */
if (gst_buffer_get_size (tmp) == 0) {
gst_v4l2_buffer_pool_release_buffer (bpool, tmp);
gst_v4l2_buffer_pool_complete_release_buffer (bpool, tmp, FALSE);
/* Legacy M2M devices return empty buffer when drained */
if (GST_V4L2_IS_M2M (obj->device_caps))
@ -1947,7 +2033,7 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
ret = gst_v4l2_buffer_pool_copy_buffer (pool, *buf, tmp);
/* an queue the buffer again after the copy */
gst_v4l2_buffer_pool_release_buffer (bpool, tmp);
gst_v4l2_buffer_pool_complete_release_buffer (bpool, tmp, FALSE);
if (ret != GST_FLOW_OK)
goto copy_failed;
@ -2014,6 +2100,7 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
GstBuffer *buffer;
GstV4l2MemoryGroup *group;
gint index;
gboolean outstanding;
if ((*buf)->pool != bpool)
goto copying;
@ -2025,7 +2112,8 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
GST_LOG_OBJECT (pool, "processing buffer %i from our pool", index);
if (pool->buffers[index] != NULL) {
if (g_atomic_int_get (&pool->buffer_state[index]) &
BUFFER_STATE_QUEUED) {
GST_LOG_OBJECT (pool, "buffer %i already queued, copying", index);
goto copying;
}
@ -2076,6 +2164,8 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
gst_v4l2_allocator_flush (pool->vallocator);
pool->buffers[group->buffer.index] = NULL;
g_atomic_int_and (&pool->buffer_state[group->buffer.index],
~BUFFER_STATE_QUEUED);
gst_mini_object_set_qdata (GST_MINI_OBJECT (to_queue),
GST_V4L2_IMPORT_QUARK, NULL, NULL);
@ -2089,20 +2179,23 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf,
gst_buffer_unref (to_queue);
/* release as many buffer as possible */
while (gst_v4l2_buffer_pool_dqbuf (pool, &buffer, FALSE) ==
GST_FLOW_OK) {
if (buffer->pool == NULL)
gst_v4l2_buffer_pool_release_buffer (bpool, buffer);
while (gst_v4l2_buffer_pool_dqbuf (pool, &buffer, &outstanding,
FALSE) == GST_FLOW_OK) {
if (!outstanding)
gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer,
FALSE);
}
if (g_atomic_int_get (&pool->num_queued) >= pool->min_latency) {
/* all buffers are queued, try to dequeue one and release it back
* into the pool so that _acquire can get to it again. */
ret = gst_v4l2_buffer_pool_dqbuf (pool, &buffer, TRUE);
if (ret == GST_FLOW_OK && buffer->pool == NULL)
ret =
gst_v4l2_buffer_pool_dqbuf (pool, &buffer, &outstanding, TRUE);
if (ret == GST_FLOW_OK && !outstanding)
/* release the rendered buffer back into the pool. This wakes up any
* thread waiting for a buffer in _acquire(). */
gst_v4l2_buffer_pool_release_buffer (bpool, buffer);
gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer,
FALSE);
}
break;
}

View file

@ -93,6 +93,7 @@ struct _GstV4l2BufferPool
gboolean flushing;
GstBuffer *buffers[VIDEO_MAX_FRAME];
volatile gint buffer_state[VIDEO_MAX_FRAME];
/* signal handlers */
gulong group_released_handler;