Add new function gst_poll_fd_ignored() for improved Windows compatibility.

Original commit message from CVS:
Patch by: Ole André Vadla Ravnås
<ole dot andre dot ravnas at tandberg dot com>
* docs/gst/gstreamer-sections.txt:
* gst/gstpoll.c: (gst_poll_winsock_error_to_errno),
(gst_poll_update_winsock_event_mask),
(gst_poll_prepare_winsock_active_sets),
(gst_poll_collect_winsock_events), (gst_poll_new), (gst_poll_free),
(gst_poll_add_fd_unlocked), (gst_poll_fd_ctl_write),
(gst_poll_fd_ctl_read_unlocked), (gst_poll_fd_ignored),
(gst_poll_fd_has_error), (gst_poll_fd_can_read_unlocked),
(gst_poll_check_ctrl_commands), (gst_poll_wait):
* gst/gstpoll.h:
* win32/common/libgstreamer.def:
Add new function gst_poll_fd_ignored() for improved Windows
compatibility.
Various minor fixes and cleanups. See #520808.
This commit is contained in:
Ole André Vadla Ravnås 2008-03-18 10:54:52 +00:00 committed by Wim Taymans
parent ede8ee12bb
commit bf20f01fba
5 changed files with 250 additions and 98 deletions

View file

@ -1,3 +1,23 @@
2008-03-18 Wim Taymans <wim.taymans@collabora.co.uk>
Patch by: Ole André Vadla Ravnås
<ole dot andre dot ravnas at tandberg dot com>
* docs/gst/gstreamer-sections.txt:
* gst/gstpoll.c: (gst_poll_winsock_error_to_errno),
(gst_poll_update_winsock_event_mask),
(gst_poll_prepare_winsock_active_sets),
(gst_poll_collect_winsock_events), (gst_poll_new), (gst_poll_free),
(gst_poll_add_fd_unlocked), (gst_poll_fd_ctl_write),
(gst_poll_fd_ctl_read_unlocked), (gst_poll_fd_ignored),
(gst_poll_fd_has_error), (gst_poll_fd_can_read_unlocked),
(gst_poll_check_ctrl_commands), (gst_poll_wait):
* gst/gstpoll.h:
* win32/common/libgstreamer.def:
Add new function gst_poll_fd_ignored() for improved Windows
compatibility.
Various minor fixes and cleanups. See #520808.
2008-03-17 Tim-Philipp Müller <tim at centricular dot net> 2008-03-17 Tim-Philipp Müller <tim at centricular dot net>
* gst/gstindex.c: (gst_index_entry_free): * gst/gstindex.c: (gst_index_entry_free):

View file

@ -1617,6 +1617,7 @@ gst_poll_fd_ctl_read
gst_poll_fd_ctl_write gst_poll_fd_ctl_write
gst_poll_fd_has_closed gst_poll_fd_has_closed
gst_poll_fd_has_error gst_poll_fd_has_error
gst_poll_fd_ignored
gst_poll_fd_init gst_poll_fd_init
gst_poll_free gst_poll_free
gst_poll_new gst_poll_new

View file

@ -112,6 +112,7 @@ struct _WinsockFd
gint fd; gint fd;
glong event_mask; glong event_mask;
WSANETWORKEVENTS events; WSANETWORKEVENTS events;
glong ignored_event_mask;
}; };
#endif #endif
@ -137,6 +138,8 @@ struct _GstPoll
GstPollFD control_read_fd; GstPollFD control_read_fd;
GstPollFD control_write_fd; GstPollFD control_write_fd;
#else #else
GArray *active_fds_ignored;
GArray *events; GArray *events;
GArray *active_events; GArray *active_events;
@ -304,6 +307,32 @@ fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds)
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
} }
#else /* G_OS_WIN32 */ #else /* G_OS_WIN32 */
/*
* Translate errors thrown by the Winsock API used by GstPoll:
* WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
*/
static gint
gst_poll_winsock_error_to_errno (DWORD last_error)
{
switch (last_error) {
case WSA_INVALID_HANDLE:
case WSAEINVAL:
case WSAENOTSOCK:
return EBADF;
case WSA_NOT_ENOUGH_MEMORY:
return ENOMEM;
/*
* Anything else, including:
* WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
* WSANOTINITIALISED
*/
default:
return EINVAL;
}
}
static void static void
gst_poll_free_winsock_event (GstPoll * set, gint idx) gst_poll_free_winsock_event (GstPoll * set, gint idx)
{ {
@ -314,30 +343,104 @@ gst_poll_free_winsock_event (GstPoll * set, gint idx)
CloseHandle (event); CloseHandle (event);
} }
static gboolean static void
gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags, gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
gboolean active) gboolean active)
{ {
WinsockFd *wfd; WinsockFd *wfd;
HANDLE event;
glong new_mask;
gint ret;
wfd = &g_array_index (set->fds, WinsockFd, idx); wfd = &g_array_index (set->fds, WinsockFd, idx);
event = g_array_index (set->events, HANDLE, idx);
if (active) if (active)
new_mask = wfd->event_mask | flags; wfd->event_mask |= flags;
else else
new_mask = wfd->event_mask & ~flags; wfd->event_mask &= ~flags;
ret = WSAEventSelect (wfd->fd, event, new_mask); /* reset ignored state if the new mask doesn't overlap at all */
if (G_LIKELY (ret == 0)) if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
wfd->event_mask = new_mask; wfd->ignored_event_mask = 0;
else }
GST_ERROR ("WSAEventSelect failed: 0x%08x", GetLastError ());
return ret == 0; static gboolean
gst_poll_prepare_winsock_active_sets (GstPoll * set)
{
guint i;
g_array_set_size (set->active_fds, 0);
g_array_set_size (set->active_fds_ignored, 0);
g_array_set_size (set->active_events, 0);
g_array_append_val (set->active_events, set->wakeup_event);
for (i = 0; i < set->fds->len; i++) {
WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
HANDLE event = g_array_index (set->events, HANDLE, i);
if (wfd->ignored_event_mask == 0) {
gint ret;
g_array_append_val (set->active_fds, *wfd);
g_array_append_val (set->active_events, event);
ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
if (G_UNLIKELY (ret != 0)) {
errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
return FALSE;
}
} else {
g_array_append_val (set->active_fds_ignored, wfd);
}
}
return TRUE;
}
static gint
gst_poll_collect_winsock_events (GstPoll * set)
{
gint res, i;
/*
* We need to check which events are signaled, and call
* WSAEnumNetworkEvents for those that are, which resets
* the event and clears the internal network event records.
*/
res = 0;
for (i = 0; i < set->active_fds->len; i++) {
WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
DWORD wait_ret;
wait_ret = WaitForSingleObject (event, 0);
if (wait_ret == WAIT_OBJECT_0) {
gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
if (G_UNLIKELY (enum_ret != 0)) {
res = -1;
errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
break;
}
res++;
} else {
/* clear any previously stored result */
memset (&wfd->events, 0, sizeof (wfd->events));
}
}
/* If all went well we also need to reset the ignored fds. */
if (res >= 0) {
res += set->active_fds_ignored->len;
for (i = 0; i < set->active_fds_ignored->len; i++) {
WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
wfd->ignored_event_mask = 0;
}
g_array_set_size (set->active_fds_ignored, 0);
}
return res;
} }
#endif #endif
@ -371,6 +474,7 @@ gst_poll_new (gboolean controllable)
nset->mode = GST_POLL_MODE_WINDOWS; nset->mode = GST_POLL_MODE_WINDOWS;
nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd)); nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd)); nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE)); nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE)); nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
@ -420,6 +524,7 @@ gst_poll_free (GstPoll * set)
g_array_free (set->active_events, TRUE); g_array_free (set->active_events, TRUE);
g_array_free (set->events, TRUE); g_array_free (set->events, TRUE);
g_array_free (set->active_fds_ignored, TRUE);
#endif #endif
g_array_free (set->active_fds, TRUE); g_array_free (set->active_fds, TRUE);
@ -468,16 +573,15 @@ gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
HANDLE event; HANDLE event;
wfd.fd = fd->fd; wfd.fd = fd->fd;
wfd.event_mask = 0; wfd.event_mask = FD_CLOSE;
memset (&wfd.events, 0, sizeof (wfd.events)); memset (&wfd.events, 0, sizeof (wfd.events));
wfd.ignored_event_mask = 0;
event = WSACreateEvent (); event = WSACreateEvent ();
g_array_append_val (set->fds, wfd); g_array_append_val (set->fds, wfd);
g_array_append_val (set->events, event); g_array_append_val (set->events, event);
fd->idx = set->fds->len - 1; fd->idx = set->fds->len - 1;
gst_poll_update_winsock_event_mask (set, fd->idx, FD_CLOSE, TRUE);
#endif #endif
} }
@ -590,8 +694,7 @@ gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
else else
pfd->events &= ~POLLOUT; pfd->events &= ~POLLOUT;
#else #else
if (!gst_poll_update_winsock_event_mask (set, idx, FD_WRITE, active)) gst_poll_update_winsock_event_mask (set, idx, FD_WRITE, active);
idx = -1;
#endif #endif
} }
@ -616,8 +719,7 @@ gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
else else
pfd->events &= ~(POLLIN | POLLPRI); pfd->events &= ~(POLLIN | POLLPRI);
#else #else
if (!gst_poll_update_winsock_event_mask (set, idx, FD_READ, active)) gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
idx = -1;
#endif #endif
} }
@ -655,6 +757,45 @@ gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
return ret; return ret;
} }
/**
* gst_poll_fd_ignored:
* @set: a file descriptor set.
* @fd: a file descriptor.
*
* Mark @fd as ignored so that the next call to gst_poll_wait() will yield
* the same result for @fd as last time. This function must be called if no
* operation (read/write/recv/send/etc.) will be performed on @fd before
* the next call to gst_poll_wait().
*
* The reason why this is needed is because the underlying implementation
* might not allow querying the fd more than once between calls to one of
* the re-enabling operations.
*
* Since: 0.10.18
*/
void
gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
{
#ifdef G_OS_WIN32
gint idx;
g_return_if_fail (set != NULL);
g_return_if_fail (fd != NULL);
g_return_if_fail (fd->fd >= 0);
g_mutex_lock (set->lock);
idx = find_index (set->fds, fd);
if (idx >= 0) {
WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
}
g_mutex_unlock (set->lock);
#endif
}
/** /**
* gst_poll_fd_has_closed: * gst_poll_fd_has_closed:
* @set: a file descriptor set. * @set: a file descriptor set.
@ -730,7 +871,8 @@ gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) || res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
(wfd->events.iErrorCode[FD_READ_BIT] != 0) || (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
(wfd->events.iErrorCode[FD_WRITE_BIT] != 0); (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
(wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0);
#endif #endif
} }
@ -754,7 +896,7 @@ gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
#else #else
WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx); WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
res = (wfd->events.lNetworkEvents & FD_READ) != 0; res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
#endif #endif
} }
@ -831,6 +973,42 @@ gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
return res; return res;
} }
static void
gst_poll_check_ctrl_commands (GstPoll * set, gint res, gboolean * restarting)
{
/* check if the poll/select was aborted due to a command */
if (set->controllable) {
#ifndef G_OS_WIN32
while (TRUE) {
guchar cmd;
gint result;
/* we do not check the read status of the control socket here because
* there may have been a write to the socket between the time the
* poll/select finished and before we got the mutex back, and we need
* to clear out the control socket before leaving */
READ_COMMAND (set, cmd, result);
if (result <= 0) {
/* no more commands, quit the loop */
break;
}
/* if the control socket is the only socket with activity when we get
* here, we restart the _wait operation, else we allow the caller to
* process the other file descriptors */
if (res == 1 &&
gst_poll_fd_can_read_unlocked (set, &set->control_read_fd))
*restarting = TRUE;
}
#else
if (WaitForSingleObject (set->wakeup_event, 0) == WAIT_OBJECT_0) {
ResetEvent (set->wakeup_event);
*restarting = TRUE;
}
#endif
}
}
/** /**
* gst_poll_wait: * gst_poll_wait:
* @set: a #GstPoll. * @set: a #GstPoll.
@ -876,19 +1054,15 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
mode = choose_mode (set, timeout); mode = choose_mode (set, timeout);
g_array_set_size (set->active_fds, set->fds->len);
#ifndef G_OS_WIN32 #ifndef G_OS_WIN32
g_array_set_size (set->active_fds, set->fds->len);
memcpy (set->active_fds->data, set->fds->data, memcpy (set->active_fds->data, set->fds->data,
set->fds->len * sizeof (struct pollfd)); set->fds->len * sizeof (struct pollfd));
#else #else
memcpy (set->active_fds->data, set->fds->data, if (!gst_poll_prepare_winsock_active_sets (set))
set->fds->len * sizeof (WinsockFd)); goto winsock_error;
g_array_set_size (set->active_events, set->events->len + 1);
*((HANDLE *) set->active_events->data) = set->wakeup_event;
memcpy (set->active_events->data + sizeof (HANDLE), set->events->data,
set->events->len * sizeof (HANDLE));
#endif #endif
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
switch (mode) { switch (mode) {
@ -994,52 +1168,31 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
case GST_POLL_MODE_WINDOWS: case GST_POLL_MODE_WINDOWS:
{ {
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
gint ignore_count = set->active_fds_ignored->len;
DWORD t, wait_ret; DWORD t, wait_ret;
if (G_LIKELY (ignore_count == 0)) {
if (timeout != GST_CLOCK_TIME_NONE) if (timeout != GST_CLOCK_TIME_NONE)
t = GST_TIME_AS_MSECONDS (timeout); t = GST_TIME_AS_MSECONDS (timeout);
else else
t = INFINITE; t = INFINITE;
} else {
/* already one or more ignored fds, so we quickly sweep the others */
t = 0;
}
wait_ret = WSAWaitForMultipleEvents (set->active_events->len, wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
(HANDLE *) set->active_events->data, FALSE, t, FALSE); (HANDLE *) set->active_events->data, FALSE, t, FALSE);
if (wait_ret == WSA_WAIT_TIMEOUT) if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
res = 0; res = 0;
else if (wait_ret == WSA_WAIT_FAILED) } else if (wait_ret == WSA_WAIT_FAILED) {
res = -1; res = -1;
else { errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
} else {
/* the first entry is the wakeup event */ /* the first entry is the wakeup event */
if (wait_ret - WSA_WAIT_EVENT_0 >= 1) { if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
gint i, enum_ret; res = gst_poll_collect_winsock_events (set);
/*
* We need to check which events are signaled, and call
* WSAEnumNetworkEvents for those that are, which resets
* the event and clears the internal network event records.
*/
res = 0;
for (i = 0; i < set->active_fds->len; i++) {
WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
wait_ret = WaitForSingleObject (event, 0);
if (wait_ret == WAIT_OBJECT_0) {
enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
if (G_UNLIKELY (enum_ret != 0)) {
res = -1;
break;
}
res++;
} else {
/* clear any previously stored result */
memset (&wfd->events, 0, sizeof (wfd->events));
}
}
g_assert (res > 0);
} else { } else {
res = 1; /* wakeup event */ res = 1; /* wakeup event */
} }
@ -1054,38 +1207,7 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
g_mutex_lock (set->lock); g_mutex_lock (set->lock);
/* check if the poll/select was aborted due to a command, FIXME, If we have an gst_poll_check_ctrl_commands (set, res, &restarting);
* ERROR, we might not clear the control socket. */
if (res > 0 && set->controllable) {
#ifndef G_OS_WIN32
while (TRUE) {
guchar cmd;
gint result;
/* we do not check the read status of the control socket here because
* there may have been a write to the socket between the time the
* poll/select finished and before we got the mutex back, and we need
* to clear out the control socket before leaving */
READ_COMMAND (set, cmd, result);
if (result <= 0) {
/* no more commands, quit the loop */
break;
}
/* if the control socket is the only socket with activity when we get
* here, we restart the _wait operation, else we allow the caller to
* process the other file descriptors */
if (res == 1 &&
gst_poll_fd_can_read_unlocked (set, &set->control_read_fd))
restarting = TRUE;
}
#else
if (WaitForSingleObject (set->wakeup_event, 0) == WAIT_OBJECT_0) {
ResetEvent (set->wakeup_event);
restarting = TRUE;
}
#endif
}
/* update the controllable state if needed */ /* update the controllable state if needed */
set->controllable = set->new_controllable; set->controllable = set->new_controllable;
@ -1117,6 +1239,13 @@ flushing:
errno = EBUSY; errno = EBUSY;
return -1; return -1;
} }
#ifdef G_OS_WIN32
winsock_error:
{
g_mutex_unlock (set->lock);
return -1;
}
#endif
} }
/** /**

View file

@ -73,6 +73,7 @@ gboolean gst_poll_remove_fd (GstPoll *set, GstPollFD *fd);
gboolean gst_poll_fd_ctl_write (GstPoll *set, GstPollFD *fd, gboolean active); gboolean gst_poll_fd_ctl_write (GstPoll *set, GstPollFD *fd, gboolean active);
gboolean gst_poll_fd_ctl_read (GstPoll *set, GstPollFD *fd, gboolean active); gboolean gst_poll_fd_ctl_read (GstPoll *set, GstPollFD *fd, gboolean active);
void gst_poll_fd_ignored (GstPoll *set, GstPollFD *fd);
gboolean gst_poll_fd_has_closed (const GstPoll *set, GstPollFD *fd); gboolean gst_poll_fd_has_closed (const GstPoll *set, GstPollFD *fd);
gboolean gst_poll_fd_has_error (const GstPoll *set, GstPollFD *fd); gboolean gst_poll_fd_has_error (const GstPoll *set, GstPollFD *fd);

View file

@ -655,6 +655,7 @@ EXPORTS
gst_poll_fd_ctl_write gst_poll_fd_ctl_write
gst_poll_fd_has_closed gst_poll_fd_has_closed
gst_poll_fd_has_error gst_poll_fd_has_error
gst_poll_fd_ignored
gst_poll_fd_init gst_poll_fd_init
gst_poll_free gst_poll_free
gst_poll_new gst_poll_new