poll: Fix various race conditions with read_control() and write_control()

This addresses slightly different race conditions on Linux and Windows, and
fixes gst_poll_read_control() when control_pending == 0.

On Linux, the socketpair() used for control should not be made O_NONBLOCK.
If there's any propagation delay between set->control_write_fd.fd and
set->control_read_fd.fd, even the mutex now held will not be sufficient to
prevent a race condition.  There's no benefit to using O_NONBLOCK, here.
Only liabilities.

For Windows, it's necessary to fix the race condition between testing
set->control_pending and performing WAKE_EVENT()/RELEASE_EVENT().  This is
accomplished by acquiring and holding set->lock, for both of these operations.
We could optimize the Linux version by making this Windows-specific.

For consistency with the Linux implementation, Windows' RELEASE_EVENT()
has also been made to block, although it should never happen.

Also, changed release_wakeup() to return TRUE and decrement control_pending
only when > 0.  Furthermore, RELEASE_EVENT() is called only when
control_pending == 1.

Finally, changed control_pending to use normal, non-atomic arithmetic
operations, since it's now protected by set->lock.

Note: even though the underlying signaling mechanisms are blocking,
release_wakeup() is effectively non-blocking, as it will only attempt to read
from control_read_fd.fd after a byte has been written to control_write_fd.fd
or WaitForSingleObject() after it's been signaled.

https://bugzilla.gnome.org/show_bug.cgi?id=750397
This commit is contained in:
Matthew Gruenke 2016-06-29 18:57:42 +02:00 committed by Sebastian Dröge
parent e11539c30b
commit cd06aea103

View file

@ -133,7 +133,6 @@ struct _GstPoll
GArray *active_fds;
#ifndef G_OS_WIN32
gchar buf[1];
GstPollFD control_read_fd;
GstPollFD control_write_fd;
#else
@ -167,11 +166,104 @@ static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
#define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
#ifndef G_OS_WIN32
#define WAKE_EVENT(s) (write ((s)->control_write_fd.fd, "W", 1) == 1)
#define RELEASE_EVENT(s) (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1)
static gboolean
wake_event (GstPoll * set)
{
ssize_t num_written;
while ((num_written = write (set->control_write_fd.fd, "W", 1)) != 1) {
if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
g_critical ("%p: failed to wake event: %s", set, strerror (errno));
return FALSE;
}
}
return TRUE;
}
static gboolean
release_event (GstPoll * set)
{
gchar buf[1] = { '\0' };
ssize_t num_read;
while ((num_read = read (set->control_read_fd.fd, buf, 1)) != 1) {
if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
g_critical ("%p: failed to release event: %s", set, strerror (errno));
return FALSE;
}
}
return TRUE;
}
#else
#define WAKE_EVENT(s) (SetLastError (0), SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0)
#define RELEASE_EVENT(s) (ResetEvent ((s)->wakeup_event))
static void
format_last_error (gchar * buf, size_t buf_len)
{
DWORD flags = FORMAT_MESSAGE_FROM_SYSTEM;
LPCVOID src = NULL;
DWORD lang = 0;
DWORD id;
id = GetLastError ();
FormatMessage (flags, src, id, lang, buf, (DWORD) buf_len, NULL);
SetLastError (id);
}
static gboolean
wake_event (GstPoll * set)
{
SetLastError (0);
errno = 0;
if (!SetEvent (set->wakeup_event)) {
gchar msg[1024] = "<unknown>";
format_last_error (msg, sizeof (msg));
g_critical ("%p: failed to set wakup_event: %s", set, msg);
errno = EBADF;
return FALSE;
}
return TRUE;
}
static gboolean
release_event (GstPoll * set)
{
DWORD status;
SetLastError (0);
errno = 0;
if (status = WaitForSingleObject (set->wakeup_event, INFINITE)) {
const gchar *reason = "unknown";
gchar msg[1024] = "<unknown>";
switch (status) {
case WAIT_ABANDONED:
reason = "WAIT_ABANDONED";
break;
case WAIT_TIMEOUT:
reason = "WAIT_TIMEOUT";
break;
case WAIT_FAILED:
format_last_error (msg, sizeof (msg));
reason = msg;
break;
default:
reason = "other";
break;
}
g_critical ("%p: failed to block on wakup_event: %s", set, reason);
errno = EBADF;
return FALSE;
}
if (!ResetEvent (set->wakeup_event)) {
gchar msg[1024] = "<unknown>";
format_last_error (msg, sizeof (msg));
g_critical ("%p: failed to reset wakup_event: %s", set, msg);
errno = EBADF;
return FALSE;
}
return TRUE;
}
#endif
/* the poll/select call is also performed on a control socket, that way
@ -181,26 +273,50 @@ raise_wakeup (GstPoll * set)
{
gboolean result = TRUE;
if (g_atomic_int_add (&set->control_pending, 1) == 0) {
/* makes testing control_pending and WAKE_EVENT() atomic. */
g_mutex_lock (&set->lock);
if (set->control_pending == 0) {
/* raise when nothing pending */
GST_LOG ("%p: raise", set);
result = WAKE_EVENT (set);
result = wake_event (set);
}
if (result) {
set->control_pending++;
}
g_mutex_unlock (&set->lock);
return result;
}
/* note how bad things can happen when the 2 threads both raise and release the
* wakeup. This is however not a problem because you must always pair a raise
* with a release */
static inline gboolean
release_wakeup (GstPoll * set)
{
gboolean result = TRUE;
gboolean result = FALSE;
if (g_atomic_int_dec_and_test (&set->control_pending)) {
GST_LOG ("%p: release", set);
result = RELEASE_EVENT (set);
/* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
g_mutex_lock (&set->lock);
if (set->control_pending > 0) {
/* release, only if this was the last pending. */
if (set->control_pending == 1) {
GST_LOG ("%p: release", set);
result = release_event (set);
} else {
result = TRUE;
}
if (result) {
set->control_pending--;
}
} else {
errno = EWOULDBLOCK;
}
g_mutex_unlock (&set->lock);
return result;
}
@ -209,21 +325,20 @@ release_all_wakeup (GstPoll * set)
{
gint old;
while (TRUE) {
if (!(old = g_atomic_int_get (&set->control_pending)))
/* nothing pending, just exit */
break;
/* makes testing control_pending and RELEASE_EVENT() atomic. */
g_mutex_lock (&set->lock);
/* try to remove all pending control messages */
if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
/* we managed to remove all messages, read the control socket */
if (RELEASE_EVENT (set))
break;
else
/* retry again until we read it successfully */
g_atomic_int_add (&set->control_pending, 1);
if ((old = set->control_pending) > 0) {
GST_LOG ("%p: releasing %d", set, old);
if (release_event (set)) {
set->control_pending = 0;
} else {
old = 0;
}
}
g_mutex_unlock (&set->lock);
return old;
}
@ -568,9 +683,6 @@ gst_poll_new (gboolean controllable)
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
goto no_socket_pair;
fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
nset->control_read_fd.fd = control_sock[0];
nset->control_write_fd.fd = control_sock[1];
@ -592,6 +704,7 @@ gst_poll_new (gboolean controllable)
MARK_REBUILD (nset);
nset->controllable = controllable;
nset->control_pending = 0;
return nset;