bufferpool: Fix handling of the GstPoll

Especially if multiple threads are waiting for buffers to be available again,
the current code was wrong. Fix this and document clearly how the GstPoll is
supposed to be used.

Also fix some potential races with reading from the GstPoll before writing
actually happened.

https://bugzilla.gnome.org/show_bug.cgi?id=767979
This commit is contained in:
Sebastian Dröge 2016-06-29 14:05:18 +02:00
parent 4ff2721810
commit 60a2087cb1

View file

@ -170,9 +170,9 @@ gst_buffer_pool_init (GstBufferPool * pool)
gst_allocation_params_init (&priv->params);
gst_buffer_pool_config_set_allocator (priv->config, priv->allocator,
&priv->params);
/* 1 control write for flushing */
/* 1 control write for flushing - the flush token */
gst_poll_write_control (priv->poll);
/* 1 control write for marking that we are not waiting for poll */
/* 1 control write for marking that we are not waiting for poll - the wait token */
gst_poll_write_control (priv->poll);
GST_DEBUG_OBJECT (pool, "created");
@ -390,7 +390,17 @@ default_stop (GstBufferPool * pool)
/* clear the pool */
while ((buffer = gst_atomic_queue_pop (priv->queue))) {
gst_poll_read_control (priv->poll);
while (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* We put the buffer into the queue but did not finish writing control
* yet, let's wait a bit and retry */
g_thread_yield ();
continue;
} else {
/* Critical error but GstPoll already complained */
break;
}
}
do_free_buffer (pool, buffer);
}
return priv->cur_buffers == 0;
@ -431,6 +441,7 @@ do_set_flushing (GstBufferPool * pool, gboolean flushing)
if (flushing) {
g_atomic_int_set (&pool->flushing, 1);
/* Write the flush token to wake up any waiters */
gst_poll_write_control (priv->poll);
if (pclass->flush_start)
@ -439,7 +450,19 @@ do_set_flushing (GstBufferPool * pool, gboolean flushing)
if (pclass->flush_stop)
pclass->flush_stop (pool);
gst_poll_read_control (priv->poll);
while (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* This should not really happen unless flushing and unflushing
* happens on different threads. Let's wait a bit to get back flush
* token from the thread that was setting it to flushing */
g_thread_yield ();
continue;
} else {
/* Critical error but GstPoll already complained */
break;
}
}
g_atomic_int_set (&pool->flushing, 0);
}
}
@ -1080,7 +1103,17 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
/* try to get a buffer from the queue */
*buffer = gst_atomic_queue_pop (priv->queue);
if (G_LIKELY (*buffer)) {
gst_poll_read_control (priv->poll);
while (!gst_poll_read_control (priv->poll)) {
if (errno == EWOULDBLOCK) {
/* We put the buffer into the queue but did not finish writing control
* yet, let's wait a bit and retry */
g_thread_yield ();
continue;
} else {
/* Critical error but GstPoll already complained */
break;
}
}
result = GST_FLOW_OK;
GST_LOG_OBJECT (pool, "acquired buffer %p", *buffer);
break;
@ -1105,10 +1138,33 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
/* now we release the control socket, we wait for a buffer release or
* flushing */
gst_poll_read_control (pool->priv->poll);
GST_LOG_OBJECT (pool, "waiting for free buffers or flushing");
gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE);
gst_poll_write_control (pool->priv->poll);
if (!gst_poll_read_control (pool->priv->poll)) {
if (errno == EWOULDBLOCK) {
/* This means that we have two threads trying to allocate buffers
* already, and the other one already got the wait token. This
* means that we only have to wait for the poll now and not write the
* token afterwards: we will be woken up once the other thread is
* woken up and that one will write the wait token it removed */
GST_LOG_OBJECT (pool, "waiting for free buffers or flushing");
gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE);
} else {
/* This is a critical error, GstPoll already gave a warning */
result = GST_FLOW_ERROR;
break;
}
} else {
/* We're the first thread waiting, we got the wait token and have to
* write it again later
* OR
* We're a second thread and just consumed the flush token and block all
* other threads, in which case we must not wait and give it back
* immediately */
if (!GST_BUFFER_POOL_IS_FLUSHING (pool)) {
GST_LOG_OBJECT (pool, "waiting for free buffers or flushing");
gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE);
}
gst_poll_write_control (pool->priv->poll);
}
}
return result;