poll: Refactor and make more lockfree

Refactor the wakeup of the poll thread.
Always make a control socket to make things easier.
Make more methods lockfree.
This commit is contained in:
Wim Taymans 2010-11-03 17:56:24 +01:00
parent e266d4d397
commit 22fa4470e2

View file

@ -93,34 +93,7 @@
#define GST_CAT_DEFAULT GST_CAT_POLL
#ifndef G_OS_WIN32
/* the poll/select call is also performed on a control socket, that way
* we can send special commands to control it
*/
/* FIXME: Shouldn't we check or return the return value
* of write()?
*/
#define SEND_COMMAND(set, command, result) \
G_STMT_START { \
unsigned char c = command; \
result = write (set->control_write_fd.fd, &c, 1); \
if (result > 0) \
set->control_pending++; \
} G_STMT_END
#define READ_COMMAND(set, command, res) \
G_STMT_START { \
if (set->control_pending > 0) { \
res = read (set->control_read_fd.fd, &command, 1); \
if (res == 1) \
set->control_pending--; \
} else \
res = 0; \
} G_STMT_END
#define GST_POLL_CMD_WAKEUP 'W' /* restart the poll/select call */
#else /* G_OS_WIN32 */
#ifdef G_OS_WIN32
typedef struct _WinsockFd WinsockFd;
struct _WinsockFd
@ -166,7 +139,6 @@ struct _GstPoll
#endif
gboolean controllable;
gboolean new_controllable;
volatile gint waiting;
volatile gint control_pending;
volatile gint flushing;
@ -174,6 +146,10 @@ struct _GstPoll
volatile gint rebuild;
};
static gboolean gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd,
gboolean active);
static gboolean gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd);
#define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing))
#define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
@ -184,6 +160,65 @@ struct _GstPoll
#define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
#define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
/* the poll/select call is also performed on a control socket, that way
* we can send special commands to control it */
static inline gboolean
raise_wakeup (GstPoll * set)
{
gboolean result = TRUE;
if (g_atomic_int_exchange_and_add (&set->control_pending, 1) == 0) {
#ifndef G_OS_WIN32
result = (write (set->control_write_fd.fd, "W", 1) == 1);
#else
result = SetEvent (set->wakeup_event);
#endif
}
return result;
}
static inline gboolean
release_wakeup (GstPoll * set)
{
gboolean result = TRUE;
if (g_atomic_int_dec_and_test (&set->control_pending)) {
#ifndef G_OS_WIN32
gchar buf[1];
result = (read (set->control_read_fd.fd, buf, 1) == 1);
#else
result = ResetEvent (set->wakeup_event);
#endif
}
return result;
}
static inline gint
release_all_wakeup (GstPoll * set)
{
gboolean result;
gint old;
while (TRUE) {
if (!(old = g_atomic_int_get (&set->control_pending)))
/* nothing pending, just exit */
break;
/* try to remove all pending control messages */
if (g_atomic_int_compare_and_exchange (&set->control_pending, old, 0)) {
/* we managed to remove all messages, read the control socket */
#ifndef G_OS_WIN32
gchar buf[1];
result = (read (set->control_read_fd.fd, buf, 1) == 1);
#else
result = ResetEvent (set->wakeup_event);
#endif
break;
}
}
return old;
}
static gint
find_index (GArray * array, GstPollFD * fd)
{
@ -519,6 +554,21 @@ gst_poll_new (gboolean controllable)
nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
nset->control_read_fd.fd = -1;
nset->control_write_fd.fd = -1;
{
gint control_sock[2];
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
goto no_socket_pair;
fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
nset->control_read_fd.fd = control_sock[0];
nset->control_write_fd.fd = control_sock[1];
gst_poll_add_fd_unlocked (nset, &nset->control_read_fd);
gst_poll_fd_ctl_read_unlocked (nset, &nset->control_read_fd, TRUE);
}
#else
nset->mode = GST_POLL_MODE_WINDOWS;
nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
@ -530,17 +580,19 @@ gst_poll_new (gboolean controllable)
nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
#endif
if (!gst_poll_set_controllable (nset, controllable))
goto not_controllable;
nset->controllable = controllable;
return nset;
/* ERRORS */
not_controllable:
#ifndef G_OS_WIN32
no_socket_pair:
{
GST_WARNING ("%p: can't create socket pair !", nset);
gst_poll_free (nset);
return NULL;
}
#endif
}
/**
@ -1098,44 +1150,6 @@ gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
return res;
}
static void
gst_poll_check_ctrl_commands (GstPoll * set, gint res, gboolean * restarting)
{
g_mutex_lock (set->lock);
/* check if the poll/select was aborted due to a command */
if (set->controllable) {
#ifndef G_OS_WIN32
while (TRUE) {
guchar cmd;
gint result;
/* we do not check the read status of the control socket here because
* there may have been a write to the socket between the time the
* poll/select finished and before we got the mutex back, and we need
* to clear out the control socket before leaving */
READ_COMMAND (set, cmd, result);
if (result <= 0) {
/* no more commands, quit the loop */
break;
}
/* if the control socket is the only socket with activity when we get
* here, we restart the _wait operation, else we allow the caller to
* process the other file descriptors */
if (res == 1 &&
gst_poll_fd_can_read_unlocked (set, &set->control_read_fd))
*restarting = TRUE;
}
#else
if (WaitForSingleObject (set->wakeup_event, 0) == WAIT_OBJECT_0) {
ResetEvent (set->wakeup_event);
*restarting = TRUE;
}
#endif
}
g_mutex_unlock (set->lock);
}
/**
* gst_poll_wait:
* @set: a #GstPoll.
@ -1351,12 +1365,15 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
}
if (!is_timer) {
gst_poll_check_ctrl_commands (set, res, &restarting);
/* Applications needs to clear the control socket themselves for timer
* polls.
* For other polls, we need to clear the control socket. If there was only
* one socket with activity and it was the control socket, we need to
* restart */
if (res == 1 && release_all_wakeup (set) > 0)
restarting = TRUE;
}
/* update the controllable state if needed */
set->controllable = set->new_controllable;
if (G_UNLIKELY (IS_FLUSHING (set))) {
/* we got woken up and we are flushing, we need to stop */
errno = EBUSY;
@ -1412,46 +1429,9 @@ gst_poll_set_controllable (GstPoll * set, gboolean controllable)
GST_LOG ("%p: controllable : %d", set, controllable);
g_mutex_lock (set->lock);
#ifndef G_OS_WIN32
if (controllable && set->control_read_fd.fd < 0) {
gint control_sock[2];
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
goto no_socket_pair;
fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
set->control_read_fd.fd = control_sock[0];
set->control_write_fd.fd = control_sock[1];
gst_poll_add_fd_unlocked (set, &set->control_read_fd);
}
if (set->control_read_fd.fd >= 0)
gst_poll_fd_ctl_read_unlocked (set, &set->control_read_fd, controllable);
#endif
/* delay the change of the controllable state if we are waiting */
set->new_controllable = controllable;
if (GET_WAITING (set) == 0)
set->controllable = controllable;
g_mutex_unlock (set->lock);
set->controllable = controllable;
return TRUE;
/* ERRORS */
#ifndef G_OS_WIN32
no_socket_pair:
{
GST_WARNING ("%p: can't create socket pair !", set);
g_mutex_unlock (set->lock);
return FALSE;
}
#endif
}
/**
@ -1470,21 +1450,11 @@ gst_poll_restart (GstPoll * set)
{
g_return_if_fail (set != NULL);
g_mutex_lock (set->lock);
if (set->controllable && GET_WAITING (set) > 0) {
#ifndef G_OS_WIN32
gint result;
/* if we are waiting, we can send the command, else we do not have to
* bother, future calls will automatically pick up the new fdset */
SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
#else
SetEvent (set->wakeup_event);
#endif
/* we are controllable and waiting, wake up the waiter. The socket will be
* cleared by the _wait() thread and the poll will be restarted */
raise_wakeup (set);
}
g_mutex_unlock (set->lock);
}
/**
@ -1504,8 +1474,6 @@ gst_poll_set_flushing (GstPoll * set, gboolean flushing)
{
g_return_if_fail (set != NULL);
g_mutex_lock (set->lock);
/* update the new state first */
SET_FLUSHING (set, flushing);
@ -1513,16 +1481,8 @@ gst_poll_set_flushing (GstPoll * set, gboolean flushing)
/* we are flushing, controllable and waiting, wake up the waiter. When we
* stop the flushing operation we don't clear the wakeup fd here, this will
* happen in the _wait() thread. */
#ifndef G_OS_WIN32
gint result;
SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
#else
SetEvent (set->wakeup_event);
#endif
raise_wakeup (set);
}
g_mutex_unlock (set->lock);
}
/**
@ -1546,22 +1506,12 @@ gst_poll_set_flushing (GstPoll * set, gboolean flushing)
gboolean
gst_poll_write_control (GstPoll * set)
{
gboolean res = FALSE;
gboolean res;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (set->timer, FALSE);
g_mutex_lock (set->lock);
if (set->controllable) {
#ifndef G_OS_WIN32
gint result;
SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
res = (result > 0);
#else
res = SetEvent (set->wakeup_event);
#endif
}
g_mutex_unlock (set->lock);
res = raise_wakeup (set);
return res;
}
@ -1582,22 +1532,12 @@ gst_poll_write_control (GstPoll * set)
gboolean
gst_poll_read_control (GstPoll * set)
{
gboolean res = FALSE;
gboolean res;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (set->timer, FALSE);
g_mutex_lock (set->lock);
if (set->controllable) {
#ifndef G_OS_WIN32
guchar cmd;
gint result;
READ_COMMAND (set, cmd, result);
res = (result > 0);
#else
res = ResetEvent (set->wakeup_event);
#endif
}
g_mutex_unlock (set->lock);
res = release_wakeup (set);
return res;
}