bufferpool: Switch from GstAtomicQueue to GstVecDeque and a mutex/cond

While the atomic queue itself is lock-free, all its usage had to be
synchronized externally via a GstPoll and gst_poll_read_control() /
gst_poll_write_control(). Both functions were always taking a mutex
internally since cd06aea1, so the implementation was just very
complicated but not lock-free at all.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/2714

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6684>
This commit is contained in:
Sebastian Dröge 2024-04-18 17:08:36 +03:00 committed by GStreamer Marge Bot
parent 3c7ddf902a
commit 8e589eec08

View file

@ -70,8 +70,7 @@
#endif
#include <sys/types.h>
#include "gstatomicqueue.h"
#include "gstpoll.h"
#include "gstvecdeque.h"
#include "gstinfo.h"
#include "gstquark.h"
#include "gstvalue.h"
@ -92,8 +91,9 @@ GST_DEBUG_CATEGORY_STATIC (gst_buffer_pool_debug);
struct _GstBufferPoolPrivate
{
GstAtomicQueue *queue;
GstPoll *poll;
GMutex queue_lock;
GCond queue_cond;
GstVecDeque *queue;
GRecMutex rec_lock;
@ -159,8 +159,10 @@ gst_buffer_pool_init (GstBufferPool * pool)
g_rec_mutex_init (&priv->rec_lock);
priv->poll = gst_poll_new_timer ();
priv->queue = gst_atomic_queue_new (16);
priv->queue = gst_vec_deque_new (16);
g_mutex_init (&priv->queue_lock);
g_cond_init (&priv->queue_cond);
pool->flushing = 1;
priv->active = FALSE;
priv->configured = FALSE;
@ -171,10 +173,6 @@ gst_buffer_pool_init (GstBufferPool * pool)
gst_allocation_params_init (&priv->params);
gst_buffer_pool_config_set_allocator (priv->config, priv->allocator,
&priv->params);
/* 1 control write for flushing - the flush token */
gst_poll_write_control (priv->poll);
/* 1 control write for marking that we are not waiting for poll - the wait token */
gst_poll_write_control (priv->poll);
GST_DEBUG_OBJECT (pool, "created");
}
@ -207,8 +205,9 @@ gst_buffer_pool_finalize (GObject * object)
GST_DEBUG_OBJECT (pool, "%p finalize", pool);
gst_atomic_queue_unref (priv->queue);
gst_poll_free (priv->poll);
gst_vec_deque_free (priv->queue);
g_mutex_clear (&priv->queue_lock);
g_cond_clear (&priv->queue_cond);
gst_structure_free (priv->config);
g_rec_mutex_clear (&priv->rec_lock);
@ -397,6 +396,10 @@ do_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
if (G_LIKELY (pclass->free_buffer))
pclass->free_buffer (pool, buffer);
g_mutex_lock (&priv->queue_lock);
g_cond_signal (&priv->queue_cond);
g_mutex_unlock (&priv->queue_lock);
}
/* must be called with the lock */
@ -405,23 +408,18 @@ default_stop (GstBufferPool * pool)
{
GstBufferPoolPrivate *priv = pool->priv;
GstBuffer *buffer;
gboolean cleared;
/* clear the pool */
while ((buffer = gst_atomic_queue_pop (priv->queue))) {
while (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* We put the buffer into the queue but did not finish writing control
* yet, let's wait a bit and retry */
g_thread_yield ();
continue;
} else {
/* Critical error but GstPoll already complained */
break;
}
}
g_mutex_lock (&priv->queue_lock);
while ((buffer = gst_vec_deque_pop_head (priv->queue))) {
g_mutex_unlock (&priv->queue_lock);
do_free_buffer (pool, buffer);
g_mutex_lock (&priv->queue_lock);
}
return priv->cur_buffers == 0;
cleared = priv->cur_buffers == 0;
g_mutex_unlock (&priv->queue_lock);
return cleared;
}
/* must be called with the lock */
@ -458,9 +456,11 @@ do_set_flushing (GstBufferPool * pool, gboolean flushing)
return;
if (flushing) {
/* Wake up any waiters */
g_mutex_lock (&priv->queue_lock);
g_atomic_int_set (&pool->flushing, 1);
/* Write the flush token to wake up any waiters */
gst_poll_write_control (priv->poll);
g_cond_broadcast (&priv->queue_cond);
g_mutex_unlock (&priv->queue_lock);
if (pclass->flush_start)
pclass->flush_start (pool);
@ -468,19 +468,6 @@ do_set_flushing (GstBufferPool * pool, gboolean flushing)
if (pclass->flush_stop)
pclass->flush_stop (pool);
while (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* This should not really happen unless flushing and unflushing
* happens on different threads. Let's wait a bit to get back flush
* token from the thread that was setting it to flushing */
g_thread_yield ();
continue;
} else {
/* Critical error but GstPoll already complained */
break;
}
}
g_atomic_int_set (&pool->flushing, 0);
}
}
@ -1112,24 +1099,16 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GstFlowReturn result;
GstBufferPoolPrivate *priv = pool->priv;
g_mutex_lock (&priv->queue_lock);
while (TRUE) {
if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool)))
goto flushing;
/* try to get a buffer from the queue */
*buffer = gst_atomic_queue_pop (priv->queue);
*buffer = gst_vec_deque_pop_head (priv->queue);
g_mutex_unlock (&priv->queue_lock);
if (G_LIKELY (*buffer)) {
while (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* We put the buffer into the queue but did not finish writing control
* yet, let's wait a bit and retry */
g_thread_yield ();
continue;
} else {
/* Critical error but GstPoll already complained */
break;
}
}
result = GST_FLOW_OK;
GST_LOG_OBJECT (pool, "acquired buffer %p", *buffer);
break;
@ -1152,34 +1131,14 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
break;
}
/* now we release the control socket, we wait for a buffer release or
* flushing */
if (!gst_poll_read_control (pool->priv->poll)) {
if (errno == EWOULDBLOCK) {
/* This means that we have two threads trying to allocate buffers
* already, and the other one already got the wait token. This
* means that we only have to wait for the poll now and not write the
* token afterwards: we will be woken up once the other thread is
* woken up and that one will write the wait token it removed */
GST_LOG_OBJECT (pool, "waiting for free buffers or flushing");
gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE);
} else {
/* This is a critical error, GstPoll already gave a warning */
result = GST_FLOW_ERROR;
break;
}
} else {
/* We're the first thread waiting, we got the wait token and have to
* write it again later
* OR
* We're a second thread and just consumed the flush token and block all
* other threads, in which case we must not wait and give it back
* immediately */
if (!GST_BUFFER_POOL_IS_FLUSHING (pool)) {
GST_LOG_OBJECT (pool, "waiting for free buffers or flushing");
gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE);
}
gst_poll_write_control (pool->priv->poll);
/* now we wait for a buffer release or flushing */
g_mutex_lock (&priv->queue_lock);
while (gst_vec_deque_get_length (priv->queue) == 0
&& !GST_BUFFER_POOL_IS_FLUSHING (pool)
&& g_atomic_int_get (&priv->cur_buffers) >= priv->max_buffers) {
GST_LOG_OBJECT (pool, "waiting for free buffers or flushing");
g_cond_wait (&priv->queue_cond, &priv->queue_lock);
GST_LOG_OBJECT (pool, "waited for free buffers or flushing");
}
}
@ -1188,6 +1147,7 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
/* ERRORS */
flushing:
{
g_mutex_unlock (&priv->queue_lock);
GST_DEBUG_OBJECT (pool, "we are flushing");
return GST_FLOW_FLUSHING;
}
@ -1331,8 +1291,10 @@ default_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
goto not_writable;
/* keep it around in our queue */
gst_atomic_queue_push (pool->priv->queue, buffer);
gst_poll_write_control (pool->priv->poll);
g_mutex_lock (&pool->priv->queue_lock);
gst_vec_deque_push_tail (pool->priv->queue, buffer);
g_cond_signal (&pool->priv->queue_cond);
g_mutex_unlock (&pool->priv->queue_lock);
return;
@ -1358,7 +1320,10 @@ not_writable:
discard:
{
do_free_buffer (pool, buffer);
gst_poll_write_control (pool->priv->poll);
g_mutex_lock (&pool->priv->queue_lock);
g_cond_signal (&pool->priv->queue_cond);
g_mutex_unlock (&pool->priv->queue_lock);
return;
}
}