From a711d9f04fa1cf7eb1a3c2f6b59d7e85b4b700f5 Mon Sep 17 00:00:00 2001 From: Peter Kjellerstedt Date: Thu, 28 Feb 2008 10:18:02 +0000 Subject: [PATCH] plugins/elements/: Port to GstPoll. See #505417. Original commit message from CVS: Patch by: Peter Kjellerstedt * plugins/elements/gstfdsink.c: (gst_fd_sink_render), (gst_fd_sink_start), (gst_fd_sink_stop), (gst_fd_sink_unlock), (gst_fd_sink_unlock_stop), (gst_fd_sink_update_fd): * plugins/elements/gstfdsink.h: * plugins/elements/gstfdsrc.c: (gst_fd_src_update_fd), (gst_fd_src_start), (gst_fd_src_stop), (gst_fd_src_unlock), (gst_fd_src_unlock_stop), (gst_fd_src_create), (gst_fd_src_uri_set_uri): * plugins/elements/gstfdsrc.h: Port to GstPoll. See #505417. --- ChangeLog | 15 +++++ plugins/elements/gstfdsink.c | 109 +++++++++++++---------------------- plugins/elements/gstfdsink.h | 2 +- plugins/elements/gstfdsrc.c | 98 ++++++++++++------------------- plugins/elements/gstfdsrc.h | 2 +- 5 files changed, 95 insertions(+), 131 deletions(-) diff --git a/ChangeLog b/ChangeLog index f63c81cba4..9699287362 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,18 @@ +2008-02-28 Wim Taymans + + Patch by: Peter Kjellerstedt + + * plugins/elements/gstfdsink.c: (gst_fd_sink_render), + (gst_fd_sink_start), (gst_fd_sink_stop), (gst_fd_sink_unlock), + (gst_fd_sink_unlock_stop), (gst_fd_sink_update_fd): + * plugins/elements/gstfdsink.h: + * plugins/elements/gstfdsrc.c: (gst_fd_src_update_fd), + (gst_fd_src_start), (gst_fd_src_stop), (gst_fd_src_unlock), + (gst_fd_src_unlock_stop), (gst_fd_src_create), + (gst_fd_src_uri_set_uri): + * plugins/elements/gstfdsrc.h: + Port to GstPoll. See #505417. + 2008-02-27 Jan Schmidt * win32/common/libgstreamer.def: diff --git a/plugins/elements/gstfdsink.c b/plugins/elements/gstfdsink.c index 628750183f..a6970b1648 100644 --- a/plugins/elements/gstfdsink.c +++ b/plugins/elements/gstfdsink.c @@ -56,30 +56,6 @@ #include "gstfdsink.h" -/* We add a control socket as in fdsrc to make it shutdown quickly when it's blocking on the fd. - * Select is used to determine when the fd is ready for use. When the element state is changed, - * it happens from another thread while fdsink is select'ing on the fd. The state-change thread - * sends a control message, so fdsink wakes up and changes state immediately otherwise - * it would stay blocked until it receives some data. */ - -/* the select call is also performed on the control sockets, that way - * we can send special commands to unblock the select call */ -#define CONTROL_STOP 'S' /* stop the select call */ -#define CONTROL_SOCKETS(sink) sink->control_sock -#define WRITE_SOCKET(sink) sink->control_sock[1] -#define READ_SOCKET(sink) sink->control_sock[0] - -#define SEND_COMMAND(sink, command) \ -G_STMT_START { \ - unsigned char c; c = command; \ - write (WRITE_SOCKET(sink), &c, 1); \ -} G_STMT_END - -#define READ_COMMAND(sink, command, res) \ -G_STMT_START { \ - res = read(READ_SOCKET(sink), &command, 1); \ -} G_STMT_END - static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, @@ -236,16 +212,14 @@ static GstFlowReturn gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer) { GstFdSink *fdsink; - -#ifndef HAVE_WIN32 - fd_set readfds; - fd_set writefds; - gint retval; -#endif guint8 *data; guint size; gint written; +#ifndef HAVE_WIN32 + gint retval; +#endif + fdsink = GST_FD_SINK (sink); g_return_val_if_fail (fdsink->fd >= 0, GST_FLOW_ERROR); @@ -255,24 +229,18 @@ gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer) again: #ifndef HAVE_WIN32 - - FD_ZERO (&readfds); - FD_SET (READ_SOCKET (fdsink), &readfds); - - FD_ZERO (&writefds); - FD_SET (fdsink->fd, &writefds); - do { GST_DEBUG_OBJECT (fdsink, "going into select, have %d bytes to write", size); - retval = select (FD_SETSIZE, &readfds, &writefds, NULL, NULL); - } while ((retval == -1 && errno == EINTR)); + retval = gst_poll_wait (fdsink->fdset, GST_CLOCK_TIME_NONE); + } while (retval == -1 && errno == EINTR); - if (retval == -1) - goto select_error; - - if (FD_ISSET (READ_SOCKET (fdsink), &readfds)) - goto stopped; + if (retval == -1) { + if (errno == EBUSY) + goto stopped; + else + goto select_error; + } #endif GST_DEBUG_OBJECT (fdsink, "writing %d bytes to file descriptor %d", size, @@ -381,20 +349,18 @@ static gboolean gst_fd_sink_start (GstBaseSink * basesink) { GstFdSink *fdsink; - gint control_sock[2]; + GstPollFD fd; fdsink = GST_FD_SINK (basesink); if (!gst_fd_sink_check_fd (fdsink, fdsink->fd)) return FALSE; - if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) + if ((fdsink->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL) goto socket_pair; - READ_SOCKET (fdsink) = control_sock[0]; - WRITE_SOCKET (fdsink) = control_sock[1]; - - fcntl (READ_SOCKET (fdsink), F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (fdsink), F_SETFL, O_NONBLOCK); + fd.fd = fdsink->fd; + gst_poll_add_fd (fdsink->fdset, &fd); + gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE); return TRUE; @@ -412,8 +378,10 @@ gst_fd_sink_stop (GstBaseSink * basesink) { GstFdSink *fdsink = GST_FD_SINK (basesink); - close (READ_SOCKET (fdsink)); - close (WRITE_SOCKET (fdsink)); + if (fdsink->fdset) { + gst_poll_free (fdsink->fdset); + fdsink->fdset = NULL; + } return TRUE; } @@ -423,8 +391,10 @@ gst_fd_sink_unlock (GstBaseSink * basesink) { GstFdSink *fdsink = GST_FD_SINK (basesink); - GST_LOG_OBJECT (fdsink, "Sending unlock command to queue"); - SEND_COMMAND (fdsink, CONTROL_STOP); + GST_LOG_OBJECT (fdsink, "Flushing"); + GST_OBJECT_LOCK (fdsink); + gst_poll_set_flushing (fdsink->fdset, TRUE); + GST_OBJECT_UNLOCK (fdsink); return TRUE; } @@ -434,20 +404,10 @@ gst_fd_sink_unlock_stop (GstBaseSink * basesink) { GstFdSink *fdsink = GST_FD_SINK (basesink); - /* read all stop commands */ - GST_LOG_OBJECT (fdsink, "Clearing unlock command queue"); - - while (TRUE) { - gchar command; - int res; - - READ_COMMAND (fdsink, command, res); - if (res < 0) { - GST_LOG_OBJECT (fdsink, "no more commands"); - /* no more commands */ - break; - } - } + GST_LOG_OBJECT (fdsink, "No longer flushing"); + GST_OBJECT_LOCK (fdsink); + gst_poll_set_flushing (fdsink->fdset, FALSE); + GST_OBJECT_UNLOCK (fdsink); return TRUE; } @@ -463,9 +423,20 @@ gst_fd_sink_update_fd (GstFdSink * fdsink, int new_fd) /* assign the fd */ GST_OBJECT_LOCK (fdsink); + if (fdsink->fdset) { + GstPollFD fd = { 0 }; + + fd.fd = fdsink->fd; + gst_poll_remove_fd (fdsink->fdset, &fd); + + fd.fd = new_fd; + gst_poll_add_fd (fdsink->fdset, &fd); + gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE); + } fdsink->fd = new_fd; g_free (fdsink->uri); fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd); + GST_OBJECT_UNLOCK (fdsink); return TRUE; diff --git a/plugins/elements/gstfdsink.h b/plugins/elements/gstfdsink.h index a2b3cc760f..f92360eff5 100644 --- a/plugins/elements/gstfdsink.h +++ b/plugins/elements/gstfdsink.h @@ -54,7 +54,7 @@ struct _GstFdSink { gchar *uri; - gint control_sock[2]; + GstPoll *fdset; int fd; guint64 bytes_written; diff --git a/plugins/elements/gstfdsrc.c b/plugins/elements/gstfdsrc.c index d986eb1b84..97ada88a82 100644 --- a/plugins/elements/gstfdsrc.c +++ b/plugins/elements/gstfdsrc.c @@ -50,24 +50,6 @@ #include "gstfdsrc.h" -/* the select call is also performed on the control sockets, that way - * we can send special commands to unblock the select call */ -#define CONTROL_STOP 'S' /* stop the select call */ -#define CONTROL_SOCKETS(src) src->control_sock -#define WRITE_SOCKET(src) src->control_sock[1] -#define READ_SOCKET(src) src->control_sock[0] - -#define SEND_COMMAND(src, command) \ -G_STMT_START { \ - unsigned char c; c = command; \ - write (WRITE_SOCKET(src), &c, 1); \ -} G_STMT_END - -#define READ_COMMAND(src, command, res) \ -G_STMT_START { \ - res = read(READ_SOCKET(src), &command, 1); \ -} G_STMT_END - #define DEFAULT_BLOCKSIZE 4096 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", @@ -193,6 +175,21 @@ gst_fd_src_update_fd (GstFdSrc * src) { struct stat stat_results; + /* we need to always update the fdset since it may not have existed when + * gst_fd_src_update_fd() was called earlier */ + if (src->fdset != NULL) { + GstPollFD fd; + + if (src->fd >= 0) { + fd.fd = src->fd; + gst_poll_remove_fd (src->fdset, &fd); + } + + fd.fd = src->new_fd; + gst_poll_add_fd (src->fdset, &fd); + gst_poll_fd_ctl_read (src->fdset, &fd, TRUE); + } + if (src->fd != src->new_fd) { GST_INFO_OBJECT (src, "Updating to fd %d", src->new_fd); @@ -227,20 +224,13 @@ static gboolean gst_fd_src_start (GstBaseSrc * bsrc) { GstFdSrc *src = GST_FD_SRC (bsrc); - gint control_sock[2]; src->curoffset = 0; - gst_fd_src_update_fd (src); - - if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) + if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL) goto socket_pair; - READ_SOCKET (src) = control_sock[0]; - WRITE_SOCKET (src) = control_sock[1]; - - fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK); + gst_fd_src_update_fd (src); return TRUE; @@ -258,8 +248,10 @@ gst_fd_src_stop (GstBaseSrc * bsrc) { GstFdSrc *src = GST_FD_SRC (bsrc); - close (READ_SOCKET (src)); - close (WRITE_SOCKET (src)); + if (src->fdset) { + gst_poll_free (src->fdset); + src->fdset = NULL; + } return TRUE; } @@ -269,8 +261,10 @@ gst_fd_src_unlock (GstBaseSrc * bsrc) { GstFdSrc *src = GST_FD_SRC (bsrc); - GST_LOG_OBJECT (src, "sending unlock command"); - SEND_COMMAND (src, CONTROL_STOP); + GST_LOG_OBJECT (src, "Flushing"); + GST_OBJECT_LOCK (src); + gst_poll_set_flushing (src->fdset, TRUE); + GST_OBJECT_UNLOCK (src); return TRUE; } @@ -280,22 +274,10 @@ gst_fd_src_unlock_stop (GstBaseSrc * bsrc) { GstFdSrc *src = GST_FD_SRC (bsrc); - GST_LOG_OBJECT (src, "clearing unlock command queue"); - - /* read all stop commands */ - while (TRUE) { - gchar command; - int res; - - GST_LOG_OBJECT (src, "reading command"); - - READ_COMMAND (src, command, res); - if (res < 0) { - GST_LOG_OBJECT (src, "no more commands"); - /* no more commands */ - break; - } - } + GST_LOG_OBJECT (src, "No longer flushing"); + GST_OBJECT_LOCK (src); + gst_poll_set_flushing (src->fdset, FALSE); + GST_OBJECT_UNLOCK (src); return TRUE; } @@ -352,26 +334,22 @@ gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) guint blocksize; #ifndef HAVE_WIN32 - fd_set readfds; gint retval; #endif src = GST_FD_SRC (psrc); #ifndef HAVE_WIN32 - FD_ZERO (&readfds); - FD_SET (src->fd, &readfds); - FD_SET (READ_SOCKET (src), &readfds); - do { - retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL); - } while ((retval == -1 && errno == EINTR)); + retval = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE); + } while (retval == -1 && errno == EINTR); - if (retval == -1) - goto select_error; - - if (FD_ISSET (READ_SOCKET (src), &readfds)) - goto stopped; + if (retval == -1) { + if (errno == EBUSY) + goto stopped; + else + goto select_error; + } #endif blocksize = GST_BASE_SRC (src)->blocksize; @@ -530,7 +508,7 @@ gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri) } g_free (protocol); - if (sscanf (uri, "fd://%d", &fd) != 1) + if (sscanf (uri, "fd://%d", &fd) != 1 || fd < 0) return FALSE; src->new_fd = fd; diff --git a/plugins/elements/gstfdsrc.h b/plugins/elements/gstfdsrc.h index 9c3a322a5c..2f8b419c87 100644 --- a/plugins/elements/gstfdsrc.h +++ b/plugins/elements/gstfdsrc.h @@ -64,7 +64,7 @@ struct _GstFdSrc { gchar *uri; - gint control_sock[2]; + GstPoll *fdset; gulong curoffset; /* current offset in file */ };