diff --git a/gst/gstpoll.c b/gst/gstpoll.c index ab80ac35af..55588c318b 100644 --- a/gst/gstpoll.c +++ b/gst/gstpoll.c @@ -147,9 +147,13 @@ struct _GstPoll GstPollMode mode; GMutex *lock; - + /* array of fds, always written to and read from with lock */ GArray *fds; + /* array of active fds, only written to from the waiting thread with the + * lock and read from with the lock or without the lock from the waiting + * thread */ GArray *active_fds; + #ifndef G_OS_WIN32 GstPollFD control_read_fd; GstPollFD control_write_fd; @@ -163,12 +167,23 @@ struct _GstPoll gboolean controllable; gboolean new_controllable; - guint waiting; - guint control_pending; - gboolean flushing; + volatile gint waiting; + volatile gint control_pending; + volatile gint flushing; gboolean timer; + volatile gint rebuild; }; +#define IS_FLUSHING(s) (g_atomic_int_get(&(s)->flushing)) +#define SET_FLUSHING(s,val) (g_atomic_int_set(&(s)->flushing, (val))) + +#define INC_WAITING(s) (g_atomic_int_exchange_and_add(&(s)->waiting, 1)) +#define DEC_WAITING(s) (g_atomic_int_exchange_and_add(&(s)->waiting, -1)) +#define GET_WAITING(s) (g_atomic_int_get(&(s)->waiting)) + +#define TEST_REBUILD(s) (g_atomic_int_compare_and_exchange(&(s)->rebuild, 1, 0)) +#define MARK_REBUILD(s) (g_atomic_int_set(&(s)->rebuild, 1)) + static gint find_index (GArray * array, GstPollFD * fd) { @@ -217,14 +232,22 @@ selectable_fds (const GstPoll * set) { guint i; + g_mutex_lock (set->lock); for (i = 0; i < set->fds->len; i++) { struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i); if (pfd->fd >= FD_SETSIZE) - return FALSE; + goto too_many; } + g_mutex_unlock (set->lock); return TRUE; + +too_many: + { + g_mutex_unlock (set->lock); + return FALSE; + } } /* check if the timeout will convert to a timeout value used for poll() @@ -643,6 +666,7 @@ gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd) fd->idx = set->fds->len - 1; #endif + MARK_REBUILD (set); } else { GST_WARNING ("%p: couldn't find fd !", set); } @@ -718,6 +742,7 @@ gst_poll_remove_fd (GstPoll * set, GstPollFD * fd) /* mark fd as removed by setting the index to -1 */ fd->idx = -1; + MARK_REBUILD (set); } else { GST_WARNING ("%p: couldn't find fd !", set); } @@ -769,6 +794,7 @@ gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active) gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT, active); #endif + MARK_REBUILD (set); } else { GST_WARNING ("%p: couldn't find fd !", set); } @@ -799,6 +825,7 @@ gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active) #else gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active); #endif + MARK_REBUILD (set); } else { GST_WARNING ("%p: couldn't find fd !", set); } @@ -870,6 +897,7 @@ gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd) WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx); wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE); + MARK_REBUILD (set); } g_mutex_unlock (set->lock); @@ -1132,25 +1160,27 @@ gint gst_poll_wait (GstPoll * set, GstClockTime timeout) { gboolean restarting; + gboolean is_timer; int res; + gint old_waiting; g_return_val_if_fail (set != NULL, -1); - g_mutex_lock (set->lock); - GST_DEBUG ("timeout :%" GST_TIME_FORMAT, GST_TIME_ARGS (timeout)); + is_timer = set->timer; + + /* add one more waiter */ + old_waiting = INC_WAITING (set); + /* we cannot wait from multiple threads unless we are a timer */ - if (G_UNLIKELY (set->waiting > 0 && !set->timer)) + if (G_UNLIKELY (old_waiting > 0 && !is_timer)) goto already_waiting; /* flushing, exit immediatly */ - if (G_UNLIKELY (set->flushing)) + if (G_UNLIKELY (IS_FLUSHING (set))) goto flushing; - /* add one more waiter */ - set->waiting++; - do { GstPollMode mode; @@ -1159,16 +1189,18 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout) mode = choose_mode (set, timeout); + if (TEST_REBUILD (set)) { + g_mutex_lock (set->lock); #ifndef G_OS_WIN32 - g_array_set_size (set->active_fds, set->fds->len); - memcpy (set->active_fds->data, set->fds->data, - set->fds->len * sizeof (struct pollfd)); + g_array_set_size (set->active_fds, set->fds->len); + memcpy (set->active_fds->data, set->fds->data, + set->fds->len * sizeof (struct pollfd)); #else - if (!gst_poll_prepare_winsock_active_sets (set)) - goto winsock_error; + if (!gst_poll_prepare_winsock_active_sets (set)) + goto winsock_error; #endif - - g_mutex_unlock (set->lock); + g_mutex_unlock (set->lock); + } switch (mode) { case GST_POLL_MODE_AUTO: @@ -1316,15 +1348,16 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout) } } - g_mutex_lock (set->lock); - - if (!set->timer) + if (!is_timer) { + g_mutex_lock (set->lock); gst_poll_check_ctrl_commands (set, res, &restarting); + g_mutex_unlock (set->lock); + } /* update the controllable state if needed */ set->controllable = set->new_controllable; - if (G_UNLIKELY (set->flushing)) { + if (G_UNLIKELY (IS_FLUSHING (set))) { /* we got woken up and we are flushing, we need to stop */ errno = EBUSY; res = -1; @@ -1332,30 +1365,28 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout) } } while (G_UNLIKELY (restarting)); - set->waiting--; - - g_mutex_unlock (set->lock); + DEC_WAITING (set); return res; /* ERRORS */ already_waiting: { - g_mutex_unlock (set->lock); + DEC_WAITING (set); errno = EPERM; return -1; } flushing: { - g_mutex_unlock (set->lock); + DEC_WAITING (set); errno = EBUSY; return -1; } #ifdef G_OS_WIN32 winsock_error: { - set->waiting--; g_mutex_unlock (set->lock); + DEC_WAITING (set); return -1; } #endif @@ -1405,7 +1436,7 @@ gst_poll_set_controllable (GstPoll * set, gboolean controllable) /* delay the change of the controllable state if we are waiting */ set->new_controllable = controllable; - if (set->waiting == 0) + if (GET_WAITING (set) == 0) set->controllable = controllable; g_mutex_unlock (set->lock); @@ -1441,7 +1472,7 @@ gst_poll_restart (GstPoll * set) g_mutex_lock (set->lock); - if (set->controllable && set->waiting > 0) { + if (set->controllable && GET_WAITING (set) > 0) { #ifndef G_OS_WIN32 gint result; @@ -1476,9 +1507,9 @@ gst_poll_set_flushing (GstPoll * set, gboolean flushing) g_mutex_lock (set->lock); /* update the new state first */ - set->flushing = flushing; + SET_FLUSHING (set, flushing); - if (flushing && set->controllable && set->waiting > 0) { + if (flushing && set->controllable && GET_WAITING (set) > 0) { /* we are flushing, controllable and waiting, wake up the waiter. When we * stop the flushing operation we don't clear the wakeup fd here, this will * happen in the _wait() thread. */