From 1e27ea63afbee24e47301f9212d2ec1ddf254c41 Mon Sep 17 00:00:00 2001 From: James Cowgill Date: Mon, 27 Sep 2021 16:52:22 +0100 Subject: [PATCH] v4l2: Record buffer states in pool to fix dequeue race The `gst_v4l2_buffer_pool_dqbuf` function contains this ominous comment: /* get our GstBuffer with that index from the pool, if the buffer was * outstanding we have a serious problem. */ outbuf = pool->buffers[group->buffer.index]; Unfortunately it is common for buffers in _output_ buffer pools to be both queued and outstanding at the same time. This can happen if the upstream element keeps a reference to the buffer, or in an encoder element itself when it keeps a reference to the input buffer for each frame. Since the current code doesn't handle this case properly we can end up with crashes in other elements such as: (gst-launch-1.0:32559): CRITICAL **: 17:33:35.740: gst_video_frame_map_id: assertion 'GST_IS_BUFFER (buffer)' failed and: (gst-launch-1.0:231): GStreamer-CRITICAL **: 00:16:20.882: write map requested on non-writable buffer Both these crashes are caused by a race condition related to releasing the same buffer twice from two different threads. If a buffer is queued and outstanding this situation is possible: **Thread 1** - Calls `gst_buffer_unref` decrementing the reference count to zero. - The core GstBufferPool object marks the buffer non-outstanding. - Calls the V4L2 release buffer function. - If the buffer is _not_ queued: - Release it back to the free pool (containing non-queued buffers). **Thread 2** - Dequeues the queued output buffer. - Marks the buffer as not queued. - If the buffer is _not_ outstanding: - Calls the V4L2 release buffer function. - Release it back to the free pool (containing non-queued buffers). If both of these threads run at exactly the same time there is a small window where the buffer is marked both not outstanding and not queued but before it has been released. In this case the buffer will be freed twice causing the above crashes. Unfortunately the variable recording whether a buffer is outstanding is part of the core `GstBuffer` object and is managed by `GstBufferPool` so it's not as straightforward as adding a mutex. Instead we can fix this by additionally recording the buffer state in `GstV4l2BufferPool`, and handle "internal" and "external" buffer release separately so we can detect when a buffer becomes not outstanding. In the new solution: - The "external" buffer pool release and the "dqbuf" functions atomically update the buffer state and determine if a buffer is still queued or outstanding. - Subsequent code and a new `gst_v4l2_buffer_pool_complete_release_buffer` function can proceed to release (or not) a buffer knowing that it's not racing with another thread. Part-of: --- .../sys/v4l2/gstv4l2bufferpool.c | 157 ++++++++++++++---- .../sys/v4l2/gstv4l2bufferpool.h | 1 + 2 files changed, 126 insertions(+), 32 deletions(-) diff --git a/subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.c b/subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.c index 45c672d114..58667de434 100644 --- a/subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.c +++ b/subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.c @@ -66,8 +66,22 @@ enum _GstV4l2BufferPoolAcquireFlags GST_V4L2_BUFFER_POOL_ACQUIRE_FLAG_LAST }; -static void gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, - GstBuffer * buffer); +/* Buffer state flags */ +enum _GstV4l2BufferState +{ + /* Buffer is free (either on the GstBufferPool free queue, or no GstBuffer has + * been allocated yet) */ + BUFFER_STATE_FREE = 0, + + /* Buffer had outstanding external users */ + BUFFER_STATE_OUTSTANDING = 1, + + /* Buffer is on one of the kernel queues */ + BUFFER_STATE_QUEUED = 2, +}; + +static void gst_v4l2_buffer_pool_complete_release_buffer (GstBufferPool * bpool, + GstBuffer * buffer, gboolean queued); static gboolean gst_v4l2_is_buffer_valid (GstBuffer * buffer, GstV4l2MemoryGroup ** out_group) @@ -458,6 +472,11 @@ gst_v4l2_buffer_pool_alloc_buffer (GstBufferPool * bpool, GstBuffer ** buffer, for (i = 0; i < group->n_mem; i++) gst_buffer_append_memory (newbuf, group->mem[i]); + + if (g_atomic_int_get (&pool->buffer_state[group->buffer.index])) { + GST_WARNING_OBJECT (pool, "newly allocated buffer %u is not free", + group->buffer.index); + } } else if (newbuf == NULL) { goto allocation_failed; } @@ -723,16 +742,21 @@ gst_v4l2_buffer_pool_streamoff (GstV4l2BufferPool * pool) } for (i = 0; i < VIDEO_MAX_FRAME; i++) { - if (pool->buffers[i]) { + gint old_buffer_state = + g_atomic_int_and (&pool->buffer_state[i], ~BUFFER_STATE_QUEUED); + if (old_buffer_state & BUFFER_STATE_QUEUED) { GstBuffer *buffer = pool->buffers[i]; GstBufferPool *bpool = GST_BUFFER_POOL (pool); pool->buffers[i] = NULL; - if (V4L2_TYPE_IS_OUTPUT (pool->obj->type)) - gst_v4l2_buffer_pool_release_buffer (bpool, buffer); - else /* Don't re-enqueue capture buffer on stop */ - pclass->release_buffer (bpool, buffer); + if (!(old_buffer_state & BUFFER_STATE_OUTSTANDING)) { + if (V4L2_TYPE_IS_OUTPUT (pool->obj->type)) + gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer, FALSE); + + else /* Don't re-enqueue capture buffer on stop */ + pclass->release_buffer (bpool, buffer); + } g_atomic_int_add (&pool->num_queued, -1); } @@ -1175,14 +1199,18 @@ gst_v4l2_buffer_pool_qbuf (GstV4l2BufferPool * pool, GstBuffer * buf, GstV4l2MemoryGroup * group, guint32 * frame_number) { const GstV4l2Object *obj = pool->obj; + gint old_buffer_state; gint index; index = group->buffer.index; - if (pool->buffers[index] != NULL) + old_buffer_state = + g_atomic_int_or (&pool->buffer_state[index], BUFFER_STATE_QUEUED); + if (old_buffer_state & BUFFER_STATE_QUEUED) goto already_queued; - GST_LOG_OBJECT (pool, "queuing buffer %i", index); + GST_LOG_OBJECT (pool, "queuing buffer %i, previous-state = %i", index, + old_buffer_state); if (V4L2_TYPE_IS_OUTPUT (obj->type)) { enum v4l2_field field; @@ -1238,6 +1266,7 @@ was_orphaned: { GST_DEBUG_OBJECT (pool, "pool was orphaned, not queuing back buffer."); GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_TAG_MEMORY); + g_atomic_int_and (&pool->buffer_state[index], ~BUFFER_STATE_QUEUED); GST_OBJECT_UNLOCK (pool); return GST_FLOW_FLUSHING; } @@ -1248,6 +1277,7 @@ queue_failed: GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_TAG_MEMORY); g_atomic_int_add (&pool->num_queued, -1); pool->buffers[index] = NULL; + g_atomic_int_and (&pool->buffer_state[index], ~BUFFER_STATE_QUEUED); GST_OBJECT_UNLOCK (pool); return GST_FLOW_ERROR; } @@ -1255,7 +1285,7 @@ queue_failed: static GstFlowReturn gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer, - gboolean wait) + gboolean * outstanding, gboolean wait) { GstFlowReturn res; GstBuffer *outbuf = NULL; @@ -1265,6 +1295,7 @@ gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer, GstVideoMeta *vmeta; gsize size; gint i; + gint old_buffer_state; if ((res = gst_v4l2_buffer_pool_poll (pool, wait)) < GST_FLOW_OK) goto poll_failed; @@ -1287,14 +1318,23 @@ gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer, if (res != GST_FLOW_OK) goto dqbuf_failed; - /* get our GstBuffer with that index from the pool, if the buffer was - * outstanding we have a serious problem. - */ + old_buffer_state = + g_atomic_int_and (&pool->buffer_state[group->buffer.index], + ~BUFFER_STATE_QUEUED); + if (!(old_buffer_state & BUFFER_STATE_QUEUED)) + goto no_buffer; + + if (outstanding) { + *outstanding = (old_buffer_state & BUFFER_STATE_OUTSTANDING) != 0; + } else if (old_buffer_state & BUFFER_STATE_OUTSTANDING) { + GST_WARNING_OBJECT (pool, "unexpected outstanding buffer %u", + group->buffer.index); + } + outbuf = pool->buffers[group->buffer.index]; if (outbuf == NULL) goto no_buffer; - /* mark the buffer outstanding */ pool->buffers[group->buffer.index] = NULL; if (g_atomic_int_dec_and_test (&pool->num_queued)) { GST_OBJECT_LOCK (pool); @@ -1309,10 +1349,10 @@ gst_v4l2_buffer_pool_dqbuf (GstV4l2BufferPool * pool, GstBuffer ** buffer, for (i = 0; i < group->n_mem; i++) { GST_LOG_OBJECT (pool, "dequeued buffer %p seq:%d (ix=%d), mem %p used %d, plane=%d, flags %08x, ts %" - GST_TIME_FORMAT ", pool-queued=%d, buffer=%p", outbuf, - group->buffer.sequence, group->buffer.index, group->mem[i], + GST_TIME_FORMAT ", pool-queued=%d, buffer=%p, previous-state=%i", + outbuf, group->buffer.sequence, group->buffer.index, group->mem[i], group->planes[i].bytesused, i, group->buffer.flags, - GST_TIME_ARGS (timestamp), pool->num_queued, outbuf); + GST_TIME_ARGS (timestamp), pool->num_queued, outbuf, old_buffer_state); if (vmeta) { vmeta->offset[i] = size; @@ -1473,7 +1513,7 @@ gst_v4l2_buffer_pool_acquire_buffer (GstBufferPool * bpool, GstBuffer ** buffer, /* just dequeue a buffer, we basically use the queue of v4l2 as the * storage for our buffers. This function does poll first so we can * interrupt it fine. */ - ret = gst_v4l2_buffer_pool_dqbuf (pool, buffer, TRUE); + ret = gst_v4l2_buffer_pool_dqbuf (pool, buffer, NULL, TRUE); break; } default: @@ -1514,23 +1554,48 @@ gst_v4l2_buffer_pool_acquire_buffer (GstBufferPool * bpool, GstBuffer ** buffer, break; } done: + /* Mark buffer as outstanding */ + if (ret == GST_FLOW_OK) { + GstV4l2MemoryGroup *group; + if (gst_v4l2_is_buffer_valid (*buffer, &group)) { + GST_LOG_OBJECT (pool, "mark buffer %u outstanding", group->buffer.index); + g_atomic_int_or (&pool->buffer_state[group->buffer.index], + BUFFER_STATE_OUTSTANDING); + } + } + return ret; } +/* + * Completes a release buffer operation + * + * Before calling this function: + * - The buffer state (if applicable) must have already been updated. + * - The buffer must not be outstanding. + * - The "queued" argument contains whether the buffer is currently queued. + */ static void -gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer) +gst_v4l2_buffer_pool_complete_release_buffer (GstBufferPool * bpool, + GstBuffer * buffer, gboolean queued) { GstV4l2BufferPool *pool = GST_V4L2_BUFFER_POOL (bpool); GstBufferPoolClass *pclass = GST_BUFFER_POOL_CLASS (parent_class); GstV4l2Object *obj = pool->obj; - GST_DEBUG_OBJECT (pool, "release buffer %p", buffer); + GST_DEBUG_OBJECT (pool, "complete release buffer %p (queued = %s)", buffer, + queued ? "yes" : "no"); switch (obj->type) { case V4L2_BUF_TYPE_VIDEO_CAPTURE: case V4L2_BUF_TYPE_VIDEO_CAPTURE_MPLANE: /* capture, put the buffer back in the queue so that we can refill it * later. */ + if (queued) { + GST_WARNING_OBJECT (pool, + "capture buffer %p was release while still queued", buffer); + } + switch (obj->mode) { case GST_V4L2_IO_RW: /* release back in the pool */ @@ -1594,7 +1659,7 @@ gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer) index = group->buffer.index; - if (pool->buffers[index] == NULL) { + if (!queued) { GST_LOG_OBJECT (pool, "buffer %u not queued, putting on free list", index); @@ -1628,6 +1693,25 @@ gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer) } } +static void +gst_v4l2_buffer_pool_release_buffer (GstBufferPool * bpool, GstBuffer * buffer) +{ + GstV4l2BufferPool *pool = GST_V4L2_BUFFER_POOL (bpool); + GstV4l2MemoryGroup *group; + gboolean queued = FALSE; + + if (gst_v4l2_is_buffer_valid (buffer, &group)) { + gint old_buffer_state = + g_atomic_int_and (&pool->buffer_state[group->buffer.index], + ~BUFFER_STATE_OUTSTANDING); + queued = (old_buffer_state & BUFFER_STATE_QUEUED) != 0; + GST_LOG_OBJECT (pool, "mark buffer %u not outstanding", + group->buffer.index); + } + + gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer, queued); +} + static void gst_v4l2_buffer_pool_dispose (GObject * object) { @@ -1678,6 +1762,8 @@ gst_v4l2_buffer_pool_init (GstV4l2BufferPool * pool) g_cond_init (&pool->empty_cond); pool->empty = TRUE; pool->orphaned = FALSE; + for (gint i = 0; i < VIDEO_MAX_FRAME; i++) + g_atomic_int_set (&pool->buffer_state[i], BUFFER_STATE_FREE); } static void @@ -1931,13 +2017,13 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf, } /* buffer not from our pool, grab a frame and copy it into the target */ - if ((ret = gst_v4l2_buffer_pool_dqbuf (pool, &tmp, TRUE)) + if ((ret = gst_v4l2_buffer_pool_dqbuf (pool, &tmp, NULL, TRUE)) != GST_FLOW_OK) goto done; /* An empty buffer on capture indicates the end of stream */ if (gst_buffer_get_size (tmp) == 0) { - gst_v4l2_buffer_pool_release_buffer (bpool, tmp); + gst_v4l2_buffer_pool_complete_release_buffer (bpool, tmp, FALSE); /* Legacy M2M devices return empty buffer when drained */ if (GST_V4L2_IS_M2M (obj->device_caps)) @@ -1947,7 +2033,7 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf, ret = gst_v4l2_buffer_pool_copy_buffer (pool, *buf, tmp); /* an queue the buffer again after the copy */ - gst_v4l2_buffer_pool_release_buffer (bpool, tmp); + gst_v4l2_buffer_pool_complete_release_buffer (bpool, tmp, FALSE); if (ret != GST_FLOW_OK) goto copy_failed; @@ -2014,6 +2100,7 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf, GstBuffer *buffer; GstV4l2MemoryGroup *group; gint index; + gboolean outstanding; if ((*buf)->pool != bpool) goto copying; @@ -2025,7 +2112,8 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf, GST_LOG_OBJECT (pool, "processing buffer %i from our pool", index); - if (pool->buffers[index] != NULL) { + if (g_atomic_int_get (&pool->buffer_state[index]) & + BUFFER_STATE_QUEUED) { GST_LOG_OBJECT (pool, "buffer %i already queued, copying", index); goto copying; } @@ -2076,6 +2164,8 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf, gst_v4l2_allocator_flush (pool->vallocator); pool->buffers[group->buffer.index] = NULL; + g_atomic_int_and (&pool->buffer_state[group->buffer.index], + ~BUFFER_STATE_QUEUED); gst_mini_object_set_qdata (GST_MINI_OBJECT (to_queue), GST_V4L2_IMPORT_QUARK, NULL, NULL); @@ -2089,20 +2179,23 @@ gst_v4l2_buffer_pool_process (GstV4l2BufferPool * pool, GstBuffer ** buf, gst_buffer_unref (to_queue); /* release as many buffer as possible */ - while (gst_v4l2_buffer_pool_dqbuf (pool, &buffer, FALSE) == - GST_FLOW_OK) { - if (buffer->pool == NULL) - gst_v4l2_buffer_pool_release_buffer (bpool, buffer); + while (gst_v4l2_buffer_pool_dqbuf (pool, &buffer, &outstanding, + FALSE) == GST_FLOW_OK) { + if (!outstanding) + gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer, + FALSE); } if (g_atomic_int_get (&pool->num_queued) >= pool->min_latency) { /* all buffers are queued, try to dequeue one and release it back * into the pool so that _acquire can get to it again. */ - ret = gst_v4l2_buffer_pool_dqbuf (pool, &buffer, TRUE); - if (ret == GST_FLOW_OK && buffer->pool == NULL) + ret = + gst_v4l2_buffer_pool_dqbuf (pool, &buffer, &outstanding, TRUE); + if (ret == GST_FLOW_OK && !outstanding) /* release the rendered buffer back into the pool. This wakes up any * thread waiting for a buffer in _acquire(). */ - gst_v4l2_buffer_pool_release_buffer (bpool, buffer); + gst_v4l2_buffer_pool_complete_release_buffer (bpool, buffer, + FALSE); } break; } diff --git a/subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.h b/subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.h index f4fdf92b4e..10742634f0 100644 --- a/subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.h +++ b/subprojects/gst-plugins-good/sys/v4l2/gstv4l2bufferpool.h @@ -93,6 +93,7 @@ struct _GstV4l2BufferPool gboolean flushing; GstBuffer *buffers[VIDEO_MAX_FRAME]; + volatile gint buffer_state[VIDEO_MAX_FRAME]; /* signal handlers */ gulong group_released_handler;