poll: make timer polls lockfree

Make sure we don't take a mutex in the normal code path of the timer
poll.
This commit is contained in:
Wim Taymans 2010-10-21 01:15:44 +02:00
parent e0f15e666a
commit 73ee14302f

View file

@ -147,9 +147,13 @@ struct _GstPoll
GstPollMode mode; GstPollMode mode;
GMutex *lock; GMutex *lock;
/* array of fds, always written to and read from with lock */
GArray *fds; GArray *fds;
/* array of active fds, only written to from the waiting thread with the
* lock and read from with the lock or without the lock from the waiting
* thread */
GArray *active_fds; GArray *active_fds;
#ifndef G_OS_WIN32 #ifndef G_OS_WIN32
GstPollFD control_read_fd; GstPollFD control_read_fd;
GstPollFD control_write_fd; GstPollFD control_write_fd;
@ -163,12 +167,23 @@ struct _GstPoll
gboolean controllable; gboolean controllable;
gboolean new_controllable; gboolean new_controllable;
guint waiting; volatile gint waiting;
guint control_pending; volatile gint control_pending;
gboolean flushing; volatile gint flushing;
gboolean timer; gboolean timer;
volatile gint rebuild;
}; };
#define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing))
#define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val)))
#define INC_WAITING(s) (g_atomic_int_exchange_and_add(&(s)->waiting, 1))
#define DEC_WAITING(s) (g_atomic_int_exchange_and_add(&(s)->waiting, -1))
#define GET_WAITING(s) (g_atomic_int_get(&(s)->waiting))
#define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0))
#define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1))
static gint static gint
find_index (GArray * array, GstPollFD * fd) find_index (GArray * array, GstPollFD * fd)
{ {
@ -217,14 +232,22 @@ selectable_fds (const GstPoll * set)
{ {
guint i; guint i;
g_mutex_lock (set->lock);
for (i = 0; i < set->fds->len; i++) { for (i = 0; i < set->fds->len; i++) {
struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i); struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
if (pfd->fd >= FD_SETSIZE) if (pfd->fd >= FD_SETSIZE)
return FALSE; goto too_many;
} }
g_mutex_unlock (set->lock);
return TRUE; return TRUE;
too_many:
{
g_mutex_unlock (set->lock);
return FALSE;
}
} }
/* check if the timeout will convert to a timeout value used for poll() /* check if the timeout will convert to a timeout value used for poll()
@ -643,6 +666,7 @@ gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
fd->idx = set->fds->len - 1; fd->idx = set->fds->len - 1;
#endif #endif
MARK_REBUILD (set);
} else { } else {
GST_WARNING ("%p: couldn't find fd !", set); GST_WARNING ("%p: couldn't find fd !", set);
} }
@ -718,6 +742,7 @@ gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
/* mark fd as removed by setting the index to -1 */ /* mark fd as removed by setting the index to -1 */
fd->idx = -1; fd->idx = -1;
MARK_REBUILD (set);
} else { } else {
GST_WARNING ("%p: couldn't find fd !", set); GST_WARNING ("%p: couldn't find fd !", set);
} }
@ -769,6 +794,7 @@ gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT, gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
active); active);
#endif #endif
MARK_REBUILD (set);
} else { } else {
GST_WARNING ("%p: couldn't find fd !", set); GST_WARNING ("%p: couldn't find fd !", set);
} }
@ -799,6 +825,7 @@ gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
#else #else
gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active); gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
#endif #endif
MARK_REBUILD (set);
} else { } else {
GST_WARNING ("%p: couldn't find fd !", set); GST_WARNING ("%p: couldn't find fd !", set);
} }
@ -870,6 +897,7 @@ gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx); WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE); wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
MARK_REBUILD (set);
} }
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
@ -1132,25 +1160,27 @@ gint
gst_poll_wait (GstPoll * set, GstClockTime timeout) gst_poll_wait (GstPoll * set, GstClockTime timeout)
{ {
gboolean restarting; gboolean restarting;
gboolean is_timer;
int res; int res;
gint old_waiting;
g_return_val_if_fail (set != NULL, -1); g_return_val_if_fail (set != NULL, -1);
g_mutex_lock (set->lock);
GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout)); GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout));
is_timer = set->timer;
/* add one more waiter */
old_waiting = INC_WAITING (set);
/* we cannot wait from multiple threads unless we are a timer */ /* we cannot wait from multiple threads unless we are a timer */
if (G_UNLIKELY (set->waiting > 0 && !set->timer)) if (G_UNLIKELY (old_waiting > 0 && !is_timer))
goto already_waiting; goto already_waiting;
/* flushing, exit immediatly */ /* flushing, exit immediatly */
if (G_UNLIKELY (set->flushing)) if (G_UNLIKELY (IS_FLUSHING (set)))
goto flushing; goto flushing;
/* add one more waiter */
set->waiting++;
do { do {
GstPollMode mode; GstPollMode mode;
@ -1159,6 +1189,8 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
mode = choose_mode (set, timeout); mode = choose_mode (set, timeout);
if (TEST_REBUILD (set)) {
g_mutex_lock (set->lock);
#ifndef G_OS_WIN32 #ifndef G_OS_WIN32
g_array_set_size (set->active_fds, set->fds->len); g_array_set_size (set->active_fds, set->fds->len);
memcpy (set->active_fds->data, set->fds->data, memcpy (set->active_fds->data, set->fds->data,
@ -1167,8 +1199,8 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
if (!gst_poll_prepare_winsock_active_sets (set)) if (!gst_poll_prepare_winsock_active_sets (set))
goto winsock_error; goto winsock_error;
#endif #endif
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
}
switch (mode) { switch (mode) {
case GST_POLL_MODE_AUTO: case GST_POLL_MODE_AUTO:
@ -1316,15 +1348,16 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
} }
} }
if (!is_timer) {
g_mutex_lock (set->lock); g_mutex_lock (set->lock);
if (!set->timer)
gst_poll_check_ctrl_commands (set, res, &restarting); gst_poll_check_ctrl_commands (set, res, &restarting);
g_mutex_unlock (set->lock);
}
/* update the controllable state if needed */ /* update the controllable state if needed */
set->controllable = set->new_controllable; set->controllable = set->new_controllable;
if (G_UNLIKELY (set->flushing)) { if (G_UNLIKELY (IS_FLUSHING (set))) {
/* we got woken up and we are flushing, we need to stop */ /* we got woken up and we are flushing, we need to stop */
errno = EBUSY; errno = EBUSY;
res = -1; res = -1;
@ -1332,30 +1365,28 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
} }
} while (G_UNLIKELY (restarting)); } while (G_UNLIKELY (restarting));
set->waiting--; DEC_WAITING (set);
g_mutex_unlock (set->lock);
return res; return res;
/* ERRORS */ /* ERRORS */
already_waiting: already_waiting:
{ {
g_mutex_unlock (set->lock); DEC_WAITING (set);
errno = EPERM; errno = EPERM;
return -1; return -1;
} }
flushing: flushing:
{ {
g_mutex_unlock (set->lock); DEC_WAITING (set);
errno = EBUSY; errno = EBUSY;
return -1; return -1;
} }
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
winsock_error: winsock_error:
{ {
set->waiting--;
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
DEC_WAITING (set);
return -1; return -1;
} }
#endif #endif
@ -1405,7 +1436,7 @@ gst_poll_set_controllable (GstPoll * set, gboolean controllable)
/* delay the change of the controllable state if we are waiting */ /* delay the change of the controllable state if we are waiting */
set->new_controllable = controllable; set->new_controllable = controllable;
if (set->waiting == 0) if (GET_WAITING (set) == 0)
set->controllable = controllable; set->controllable = controllable;
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
@ -1441,7 +1472,7 @@ gst_poll_restart (GstPoll * set)
g_mutex_lock (set->lock); g_mutex_lock (set->lock);
if (set->controllable && set->waiting > 0) { if (set->controllable && GET_WAITING (set) > 0) {
#ifndef G_OS_WIN32 #ifndef G_OS_WIN32
gint result; gint result;
@ -1476,9 +1507,9 @@ gst_poll_set_flushing (GstPoll * set, gboolean flushing)
g_mutex_lock (set->lock); g_mutex_lock (set->lock);
/* update the new state first */ /* update the new state first */
set->flushing = flushing; SET_FLUSHING (set, flushing);
if (flushing && set->controllable && set->waiting > 0) { if (flushing && set->controllable && GET_WAITING (set) > 0) {
/* we are flushing, controllable and waiting, wake up the waiter. When we /* we are flushing, controllable and waiting, wake up the waiter. When we
* stop the flushing operation we don't clear the wakeup fd here, this will * stop the flushing operation we don't clear the wakeup fd here, this will
* happen in the _wait() thread. */ * happen in the _wait() thread. */