From cd06aea10387f1593fc2f6340e63e7326906c42f Mon Sep 17 00:00:00 2001 From: Matthew Gruenke Date: Wed, 29 Jun 2016 18:57:42 +0200 Subject: [PATCH] poll: Fix various race conditions with read_control() and write_control() This addresses slightly different race conditions on Linux and Windows, and fixes gst_poll_read_control() when control_pending == 0. On Linux, the socketpair() used for control should not be made O_NONBLOCK. If there's any propagation delay between set->control_write_fd.fd and set->control_read_fd.fd, even the mutex now held will not be sufficient to prevent a race condition. There's no benefit to using O_NONBLOCK, here. Only liabilities. For Windows, it's necessary to fix the race condition between testing set->control_pending and performing WAKE_EVENT()/RELEASE_EVENT(). This is accomplished by acquiring and holding set->lock, for both of these operations. We could optimize the Linux version by making this Windows-specific. For consistency with the Linux implementation, Windows' RELEASE_EVENT() has also been made to block, although it should never happen. Also, changed release_wakeup() to return TRUE and decrement control_pending only when > 0. Furthermore, RELEASE_EVENT() is called only when control_pending == 1. Finally, changed control_pending to use normal, non-atomic arithmetic operations, since it's now protected by set->lock. Note: even though the underlying signaling mechanisms are blocking, release_wakeup() is effectively non-blocking, as it will only attempt to read from control_read_fd.fd after a byte has been written to control_write_fd.fd or WaitForSingleObject() after it's been signaled. https://bugzilla.gnome.org/show_bug.cgi?id=750397 --- gst/gstpoll.c | 171 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 142 insertions(+), 29 deletions(-) diff --git a/gst/gstpoll.c b/gst/gstpoll.c index 04165a5ffa..1d711845bf 100644 --- a/gst/gstpoll.c +++ b/gst/gstpoll.c @@ -133,7 +133,6 @@ struct _GstPoll GArray *active_fds; #ifndef G_OS_WIN32 - gchar buf[1]; GstPollFD control_read_fd; GstPollFD control_write_fd; #else @@ -167,11 +166,104 @@ static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd); #define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1)) #ifndef G_OS_WIN32 -#define WAKE_EVENT(s) (write ((s)->control_write_fd.fd, "W", 1) == 1) -#define RELEASE_EVENT(s) (read ((s)->control_read_fd.fd, (s)->buf, 1) == 1) + +static gboolean +wake_event (GstPoll * set) +{ + ssize_t num_written; + while ((num_written = write (set->control_write_fd.fd, "W", 1)) != 1) { + if (num_written == -1 && errno != EAGAIN && errno != EINTR) { + g_critical ("%p: failed to wake event: %s", set, strerror (errno)); + return FALSE; + } + } + return TRUE; +} + +static gboolean +release_event (GstPoll * set) +{ + gchar buf[1] = { '\0' }; + ssize_t num_read; + while ((num_read = read (set->control_read_fd.fd, buf, 1)) != 1) { + if (num_read == -1 && errno != EAGAIN && errno != EINTR) { + g_critical ("%p: failed to release event: %s", set, strerror (errno)); + return FALSE; + } + } + return TRUE; +} + #else -#define WAKE_EVENT(s) (SetLastError (0), SetEvent ((s)->wakeup_event), errno = GetLastError () == NO_ERROR ? 0 : EACCES, errno == 0 ? 1 : 0) -#define RELEASE_EVENT(s) (ResetEvent ((s)->wakeup_event)) + +static void +format_last_error (gchar * buf, size_t buf_len) +{ + DWORD flags = FORMAT_MESSAGE_FROM_SYSTEM; + LPCVOID src = NULL; + DWORD lang = 0; + DWORD id; + id = GetLastError (); + FormatMessage (flags, src, id, lang, buf, (DWORD) buf_len, NULL); + SetLastError (id); +} + +static gboolean +wake_event (GstPoll * set) +{ + SetLastError (0); + errno = 0; + if (!SetEvent (set->wakeup_event)) { + gchar msg[1024] = ""; + format_last_error (msg, sizeof (msg)); + g_critical ("%p: failed to set wakup_event: %s", set, msg); + errno = EBADF; + return FALSE; + } + + return TRUE; +} + +static gboolean +release_event (GstPoll * set) +{ + DWORD status; + SetLastError (0); + errno = 0; + if (status = WaitForSingleObject (set->wakeup_event, INFINITE)) { + const gchar *reason = "unknown"; + gchar msg[1024] = ""; + switch (status) { + case WAIT_ABANDONED: + reason = "WAIT_ABANDONED"; + break; + case WAIT_TIMEOUT: + reason = "WAIT_TIMEOUT"; + break; + case WAIT_FAILED: + format_last_error (msg, sizeof (msg)); + reason = msg; + break; + default: + reason = "other"; + break; + } + g_critical ("%p: failed to block on wakup_event: %s", set, reason); + errno = EBADF; + return FALSE; + } + + if (!ResetEvent (set->wakeup_event)) { + gchar msg[1024] = ""; + format_last_error (msg, sizeof (msg)); + g_critical ("%p: failed to reset wakup_event: %s", set, msg); + errno = EBADF; + return FALSE; + } + + return TRUE; +} + #endif /* the poll/select call is also performed on a control socket, that way @@ -181,26 +273,50 @@ raise_wakeup (GstPoll * set) { gboolean result = TRUE; - if (g_atomic_int_add (&set->control_pending, 1) == 0) { + /* makes testing control_pending and WAKE_EVENT() atomic. */ + g_mutex_lock (&set->lock); + + if (set->control_pending == 0) { /* raise when nothing pending */ GST_LOG ("%p: raise", set); - result = WAKE_EVENT (set); + result = wake_event (set); } + + if (result) { + set->control_pending++; + } + + g_mutex_unlock (&set->lock); + return result; } -/* note how bad things can happen when the 2 threads both raise and release the - * wakeup. This is however not a problem because you must always pair a raise - * with a release */ static inline gboolean release_wakeup (GstPoll * set) { - gboolean result = TRUE; + gboolean result = FALSE; - if (g_atomic_int_dec_and_test (&set->control_pending)) { - GST_LOG ("%p: release", set); - result = RELEASE_EVENT (set); + /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */ + g_mutex_lock (&set->lock); + + if (set->control_pending > 0) { + /* release, only if this was the last pending. */ + if (set->control_pending == 1) { + GST_LOG ("%p: release", set); + result = release_event (set); + } else { + result = TRUE; + } + + if (result) { + set->control_pending--; + } + } else { + errno = EWOULDBLOCK; } + + g_mutex_unlock (&set->lock); + return result; } @@ -209,21 +325,20 @@ release_all_wakeup (GstPoll * set) { gint old; - while (TRUE) { - if (!(old = g_atomic_int_get (&set->control_pending))) - /* nothing pending, just exit */ - break; + /* makes testing control_pending and RELEASE_EVENT() atomic. */ + g_mutex_lock (&set->lock); - /* try to remove all pending control messages */ - if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) { - /* we managed to remove all messages, read the control socket */ - if (RELEASE_EVENT (set)) - break; - else - /* retry again until we read it successfully */ - g_atomic_int_add (&set->control_pending, 1); + if ((old = set->control_pending) > 0) { + GST_LOG ("%p: releasing %d", set, old); + if (release_event (set)) { + set->control_pending = 0; + } else { + old = 0; } } + + g_mutex_unlock (&set->lock); + return old; } @@ -568,9 +683,6 @@ gst_poll_new (gboolean controllable) if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) goto no_socket_pair; - fcntl (control_sock[0], F_SETFL, O_NONBLOCK); - fcntl (control_sock[1], F_SETFL, O_NONBLOCK); - nset->control_read_fd.fd = control_sock[0]; nset->control_write_fd.fd = control_sock[1]; @@ -592,6 +704,7 @@ gst_poll_new (gboolean controllable) MARK_REBUILD (nset); nset->controllable = controllable; + nset->control_pending = 0; return nset;