GstPoll: add methods to use gstpoll for timeouts

Add a special timer mode in GstPoll that makes it only use the control socket
with a timeout to schedule timeouts. Also add a pair of methods to wakeup the
timeout thread.
API: GstPoll::gst_poll_new_timer()
API: GstPoll::gst_poll_write_control()
API: GstPoll::gst_poll_read_control()
This commit is contained in:
Wim Taymans 2009-02-03 17:49:02 +01:00
parent d807bca61c
commit 5cfb02af4a
3 changed files with 145 additions and 23 deletions

View file

@ -1718,11 +1718,14 @@ gst_poll_fd_ignored
gst_poll_fd_init gst_poll_fd_init
gst_poll_free gst_poll_free
gst_poll_new gst_poll_new
gst_poll_new_timer
gst_poll_remove_fd gst_poll_remove_fd
gst_poll_restart gst_poll_restart
gst_poll_set_controllable gst_poll_set_controllable
gst_poll_set_flushing gst_poll_set_flushing
gst_poll_wait gst_poll_wait
gst_poll_read_control
gst_poll_write_control
</SECTION> </SECTION>
<SECTION> <SECTION>

View file

@ -91,17 +91,22 @@
/* FIXME: Shouldn't we check or return the return value /* FIXME: Shouldn't we check or return the return value
* of write()? * of write()?
*/ */
#define SEND_COMMAND(set, command) \ #define SEND_COMMAND(set, command, result) \
G_STMT_START { \ G_STMT_START { \
unsigned char c = command; \ unsigned char c = command; \
ssize_t res; \ result = write (set->control_write_fd.fd, &c, 1); \
res = write (set->control_write_fd.fd, &c, 1); \ if (result > 0) \
set->control_pending = TRUE; \ set->control_pending++; \
} G_STMT_END } G_STMT_END
#define READ_COMMAND(set, command, res) \ #define READ_COMMAND(set, command, res) \
G_STMT_START { \ G_STMT_START { \
res = read (set->control_read_fd.fd, &command, 1); \ if (set->control_pending > 0) { \
res = read (set->control_read_fd.fd, &command, 1); \
if (res == 1) \
set->control_pending--; \
} else \
res = 0; \
} G_STMT_END } G_STMT_END
#define GST_POLL_CMD_WAKEUP 'W' /* restart the poll/select call */ #define GST_POLL_CMD_WAKEUP 'W' /* restart the poll/select call */
@ -149,9 +154,10 @@ struct _GstPoll
gboolean controllable; gboolean controllable;
gboolean new_controllable; gboolean new_controllable;
gboolean waiting; guint waiting;
guint control_pending;
gboolean flushing; gboolean flushing;
gboolean control_pending; gboolean timer;
}; };
static gint static gint
@ -466,7 +472,6 @@ gst_poll_new (gboolean controllable)
nset = g_slice_new0 (GstPoll); nset = g_slice_new0 (GstPoll);
nset->lock = g_mutex_new (); nset->lock = g_mutex_new ();
nset->control_pending = FALSE;
#ifndef G_OS_WIN32 #ifndef G_OS_WIN32
nset->mode = GST_POLL_MODE_AUTO; nset->mode = GST_POLL_MODE_AUTO;
nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd)); nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
@ -497,6 +502,34 @@ not_controllable:
} }
} }
/**
* gst_poll_new_timer:
*
* Create a new poll object that can be used for scheduling cancelable timeouts.
* A timeout is performed with gst_poll_wait(). Multiple timeouts can be
* performed from different threads.
*
* Returns: a new #GstPoll, or %NULL in case of an error. Free with
* gst_poll_free().
*
* Since: 0.10.23
*/
GstPoll *
gst_poll_new_timer (void)
{
GstPoll *poll;
/* make a new controllable poll set */
if (!(poll = gst_poll_new (TRUE)))
goto done;
/* we are a timer */
poll->timer = TRUE;
done:
return poll;
}
/** /**
* gst_poll_free: * gst_poll_free:
* @set: a file descriptor set. * @set: a file descriptor set.
@ -984,7 +1017,7 @@ gst_poll_check_ctrl_commands (GstPoll * set, gint res, gboolean * restarting)
/* check if the poll/select was aborted due to a command */ /* check if the poll/select was aborted due to a command */
if (set->controllable) { if (set->controllable) {
#ifndef G_OS_WIN32 #ifndef G_OS_WIN32
while (TRUE && set->control_pending) { while (TRUE) {
guchar cmd; guchar cmd;
gint result; gint result;
@ -995,7 +1028,6 @@ gst_poll_check_ctrl_commands (GstPoll * set, gint res, gboolean * restarting)
READ_COMMAND (set, cmd, result); READ_COMMAND (set, cmd, result);
if (result <= 0) { if (result <= 0) {
/* no more commands, quit the loop */ /* no more commands, quit the loop */
set->control_pending = FALSE;
break; break;
} }
@ -1042,15 +1074,16 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
g_mutex_lock (set->lock); g_mutex_lock (set->lock);
/* we cannot wait from multiple threads */ /* we cannot wait from multiple threads unless we are a timer */
if (G_UNLIKELY (set->waiting)) if (G_UNLIKELY (set->waiting > 0 && !set->timer))
goto already_waiting; goto already_waiting;
/* flushing, exit immediatly */ /* flushing, exit immediatly */
if (G_UNLIKELY (set->flushing)) if (G_UNLIKELY (set->flushing))
goto flushing; goto flushing;
set->waiting = TRUE; /* add one more waiter */
set->waiting++;
do { do {
GstPollMode mode; GstPollMode mode;
@ -1213,8 +1246,8 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
g_mutex_lock (set->lock); g_mutex_lock (set->lock);
/* FIXME, can we only do this check when (res > 0)? */ if (!set->timer)
gst_poll_check_ctrl_commands (set, res, &restarting); gst_poll_check_ctrl_commands (set, res, &restarting);
/* update the controllable state if needed */ /* update the controllable state if needed */
set->controllable = set->new_controllable; set->controllable = set->new_controllable;
@ -1227,7 +1260,7 @@ gst_poll_wait (GstPoll * set, GstClockTime timeout)
} }
} while (G_UNLIKELY (restarting)); } while (G_UNLIKELY (restarting));
set->waiting = FALSE; set->waiting--;
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
@ -1249,6 +1282,7 @@ flushing:
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
winsock_error: winsock_error:
{ {
set->waiting--;
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
return -1; return -1;
} }
@ -1297,7 +1331,7 @@ gst_poll_set_controllable (GstPoll * set, gboolean controllable)
/* delay the change of the controllable state if we are waiting */ /* delay the change of the controllable state if we are waiting */
set->new_controllable = controllable; set->new_controllable = controllable;
if (!set->waiting) if (set->waiting == 0)
set->controllable = controllable; set->controllable = controllable;
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
@ -1332,11 +1366,13 @@ gst_poll_restart (GstPoll * set)
g_mutex_lock (set->lock); g_mutex_lock (set->lock);
if (set->controllable && set->waiting) { if (set->controllable && set->waiting > 0) {
#ifndef G_OS_WIN32 #ifndef G_OS_WIN32
gint result;
/* if we are waiting, we can send the command, else we do not have to /* if we are waiting, we can send the command, else we do not have to
* bother, future calls will automatically pick up the new fdset */ * bother, future calls will automatically pick up the new fdset */
SEND_COMMAND (set, GST_POLL_CMD_WAKEUP); SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
#else #else
SetEvent (set->wakeup_event); SetEvent (set->wakeup_event);
#endif #endif
@ -1367,12 +1403,14 @@ gst_poll_set_flushing (GstPoll * set, gboolean flushing)
/* update the new state first */ /* update the new state first */
set->flushing = flushing; set->flushing = flushing;
if (flushing && set->controllable && set->waiting) { if (flushing && set->controllable && set->waiting > 0) {
/* we are flushing, controllable and waiting, wake up the waiter. When we /* we are flushing, controllable and waiting, wake up the waiter. When we
* stop the flushing operation we don't clear the wakeup fd here, this will * stop the flushing operation we don't clear the wakeup fd here, this will
* happen in the _wait() thread. */ * happen in the _wait() thread. */
#ifndef G_OS_WIN32 #ifndef G_OS_WIN32
SEND_COMMAND (set, GST_POLL_CMD_WAKEUP); gint result;
SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
#else #else
SetEvent (set->wakeup_event); SetEvent (set->wakeup_event);
#endif #endif
@ -1380,3 +1418,80 @@ gst_poll_set_flushing (GstPoll * set, gboolean flushing)
g_mutex_unlock (set->lock); g_mutex_unlock (set->lock);
} }
/**
* gst_poll_write_control:
* @set: a #GstPoll.
*
* Write a byte to the control socket of the controllable @set.
* This function is mostly usefull for timer #GstPoll objects created with
* gst_poll_new_timer().
*
* It will make any current and future gst_poll_wait() function return with
* 1, meaning the control socket is set. After an equal amount of calls to
* gst_poll_read_control() have been performed, calls to gst_poll_wait() will
* block again until their timeout expired.
*
* Returns: %TRUE on success. %FALSE when @set is not controllable or when the
* byte could not be written.
*
* Since: 0.10.23
*/
gboolean
gst_poll_write_control (GstPoll * set)
{
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_mutex_lock (set->lock);
if (set->controllable) {
#ifndef G_OS_WIN32
gint result;
SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
res = (result > 0);
#else
SetEvent (set->wakeup_event);
#endif
}
g_mutex_unlock (set->lock);
return res;
}
/**
* gst_poll_read_control:
* @set: a #GstPoll.
*
* Read a byte from the control socket of the controllable @set.
* This function is mostly usefull for timer #GstPoll objects created with
* gst_poll_new_timer().
*
* Returns: %TRUE on success. %FALSE when @set is not controllable or when there
* was no byte to read.
*
* Since: 0.10.23
*/
gboolean
gst_poll_read_control (GstPoll * set)
{
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_mutex_lock (set->lock);
if (set->controllable) {
#ifndef G_OS_WIN32
guchar cmd;
gint result;
READ_COMMAND (set, cmd, result);
res = (result > 0);
#else
ResetEvent (set->wakeup_event);
#endif
}
g_mutex_unlock (set->lock);
return res;
}

View file

@ -64,6 +64,7 @@ typedef struct {
#define GST_POLL_FD_INIT { -1, -1 } #define GST_POLL_FD_INIT { -1, -1 }
GstPoll* gst_poll_new (gboolean controllable); GstPoll* gst_poll_new (gboolean controllable);
GstPoll* gst_poll_new_timer (void);
void gst_poll_free (GstPoll *set); void gst_poll_free (GstPoll *set);
void gst_poll_fd_init (GstPollFD *fd); void gst_poll_fd_init (GstPollFD *fd);
@ -86,6 +87,9 @@ gboolean gst_poll_set_controllable (GstPoll *set, gboolean controllable);
void gst_poll_restart (GstPoll *set); void gst_poll_restart (GstPoll *set);
void gst_poll_set_flushing (GstPoll *set, gboolean flushing); void gst_poll_set_flushing (GstPoll *set, gboolean flushing);
gboolean gst_poll_write_control (GstPoll *set);
gboolean gst_poll_read_control (GstPoll *set);
G_END_DECLS G_END_DECLS
#endif /* __GST_POLL_H__ */ #endif /* __GST_POLL_H__ */