plugins/elements/: Port to GstPoll. See #505417.

Original commit message from CVS:
Patch by: Peter Kjellerstedt <pkj at axis dot com>
* plugins/elements/gstfdsink.c: (gst_fd_sink_render),
(gst_fd_sink_start), (gst_fd_sink_stop), (gst_fd_sink_unlock),
(gst_fd_sink_unlock_stop), (gst_fd_sink_update_fd):
* plugins/elements/gstfdsink.h:
* plugins/elements/gstfdsrc.c: (gst_fd_src_update_fd),
(gst_fd_src_start), (gst_fd_src_stop), (gst_fd_src_unlock),
(gst_fd_src_unlock_stop), (gst_fd_src_create),
(gst_fd_src_uri_set_uri):
* plugins/elements/gstfdsrc.h:
Port to GstPoll. See #505417.
This commit is contained in:
Peter Kjellerstedt 2008-02-28 10:18:02 +00:00 committed by Wim Taymans
parent 91bf3deeb7
commit a711d9f04f
5 changed files with 95 additions and 131 deletions

View file

@ -1,3 +1,18 @@
2008-02-28 Wim Taymans <wim.taymans@collabora.co.uk>
Patch by: Peter Kjellerstedt <pkj at axis dot com>
* plugins/elements/gstfdsink.c: (gst_fd_sink_render),
(gst_fd_sink_start), (gst_fd_sink_stop), (gst_fd_sink_unlock),
(gst_fd_sink_unlock_stop), (gst_fd_sink_update_fd):
* plugins/elements/gstfdsink.h:
* plugins/elements/gstfdsrc.c: (gst_fd_src_update_fd),
(gst_fd_src_start), (gst_fd_src_stop), (gst_fd_src_unlock),
(gst_fd_src_unlock_stop), (gst_fd_src_create),
(gst_fd_src_uri_set_uri):
* plugins/elements/gstfdsrc.h:
Port to GstPoll. See #505417.
2008-02-27 Jan Schmidt <jan.schmidt@sun.com> 2008-02-27 Jan Schmidt <jan.schmidt@sun.com>
* win32/common/libgstreamer.def: * win32/common/libgstreamer.def:

View file

@ -56,30 +56,6 @@
#include "gstfdsink.h" #include "gstfdsink.h"
/* We add a control socket as in fdsrc to make it shutdown quickly when it's blocking on the fd.
* Select is used to determine when the fd is ready for use. When the element state is changed,
* it happens from another thread while fdsink is select'ing on the fd. The state-change thread
* sends a control message, so fdsink wakes up and changes state immediately otherwise
* it would stay blocked until it receives some data. */
/* the select call is also performed on the control sockets, that way
* we can send special commands to unblock the select call */
#define CONTROL_STOP 'S' /* stop the select call */
#define CONTROL_SOCKETS(sink) sink->control_sock
#define WRITE_SOCKET(sink) sink->control_sock[1]
#define READ_SOCKET(sink) sink->control_sock[0]
#define SEND_COMMAND(sink, command) \
G_STMT_START { \
unsigned char c; c = command; \
write (WRITE_SOCKET(sink), &c, 1); \
} G_STMT_END
#define READ_COMMAND(sink, command, res) \
G_STMT_START { \
res = read(READ_SOCKET(sink), &command, 1); \
} G_STMT_END
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK, GST_PAD_SINK,
GST_PAD_ALWAYS, GST_PAD_ALWAYS,
@ -236,16 +212,14 @@ static GstFlowReturn
gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer) gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer)
{ {
GstFdSink *fdsink; GstFdSink *fdsink;
#ifndef HAVE_WIN32
fd_set readfds;
fd_set writefds;
gint retval;
#endif
guint8 *data; guint8 *data;
guint size; guint size;
gint written; gint written;
#ifndef HAVE_WIN32
gint retval;
#endif
fdsink = GST_FD_SINK (sink); fdsink = GST_FD_SINK (sink);
g_return_val_if_fail (fdsink->fd >= 0, GST_FLOW_ERROR); g_return_val_if_fail (fdsink->fd >= 0, GST_FLOW_ERROR);
@ -255,24 +229,18 @@ gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer)
again: again:
#ifndef HAVE_WIN32 #ifndef HAVE_WIN32
FD_ZERO (&readfds);
FD_SET (READ_SOCKET (fdsink), &readfds);
FD_ZERO (&writefds);
FD_SET (fdsink->fd, &writefds);
do { do {
GST_DEBUG_OBJECT (fdsink, "going into select, have %d bytes to write", GST_DEBUG_OBJECT (fdsink, "going into select, have %d bytes to write",
size); size);
retval = select (FD_SETSIZE, &readfds, &writefds, NULL, NULL); retval = gst_poll_wait (fdsink->fdset, GST_CLOCK_TIME_NONE);
} while ((retval == -1 && errno == EINTR)); } while (retval == -1 && errno == EINTR);
if (retval == -1) if (retval == -1) {
goto select_error; if (errno == EBUSY)
goto stopped;
if (FD_ISSET (READ_SOCKET (fdsink), &readfds)) else
goto stopped; goto select_error;
}
#endif #endif
GST_DEBUG_OBJECT (fdsink, "writing %d bytes to file descriptor %d", size, GST_DEBUG_OBJECT (fdsink, "writing %d bytes to file descriptor %d", size,
@ -381,20 +349,18 @@ static gboolean
gst_fd_sink_start (GstBaseSink * basesink) gst_fd_sink_start (GstBaseSink * basesink)
{ {
GstFdSink *fdsink; GstFdSink *fdsink;
gint control_sock[2]; GstPollFD fd;
fdsink = GST_FD_SINK (basesink); fdsink = GST_FD_SINK (basesink);
if (!gst_fd_sink_check_fd (fdsink, fdsink->fd)) if (!gst_fd_sink_check_fd (fdsink, fdsink->fd))
return FALSE; return FALSE;
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) if ((fdsink->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
goto socket_pair; goto socket_pair;
READ_SOCKET (fdsink) = control_sock[0]; fd.fd = fdsink->fd;
WRITE_SOCKET (fdsink) = control_sock[1]; gst_poll_add_fd (fdsink->fdset, &fd);
gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE);
fcntl (READ_SOCKET (fdsink), F_SETFL, O_NONBLOCK);
fcntl (WRITE_SOCKET (fdsink), F_SETFL, O_NONBLOCK);
return TRUE; return TRUE;
@ -412,8 +378,10 @@ gst_fd_sink_stop (GstBaseSink * basesink)
{ {
GstFdSink *fdsink = GST_FD_SINK (basesink); GstFdSink *fdsink = GST_FD_SINK (basesink);
close (READ_SOCKET (fdsink)); if (fdsink->fdset) {
close (WRITE_SOCKET (fdsink)); gst_poll_free (fdsink->fdset);
fdsink->fdset = NULL;
}
return TRUE; return TRUE;
} }
@ -423,8 +391,10 @@ gst_fd_sink_unlock (GstBaseSink * basesink)
{ {
GstFdSink *fdsink = GST_FD_SINK (basesink); GstFdSink *fdsink = GST_FD_SINK (basesink);
GST_LOG_OBJECT (fdsink, "Sending unlock command to queue"); GST_LOG_OBJECT (fdsink, "Flushing");
SEND_COMMAND (fdsink, CONTROL_STOP); GST_OBJECT_LOCK (fdsink);
gst_poll_set_flushing (fdsink->fdset, TRUE);
GST_OBJECT_UNLOCK (fdsink);
return TRUE; return TRUE;
} }
@ -434,20 +404,10 @@ gst_fd_sink_unlock_stop (GstBaseSink * basesink)
{ {
GstFdSink *fdsink = GST_FD_SINK (basesink); GstFdSink *fdsink = GST_FD_SINK (basesink);
/* read all stop commands */ GST_LOG_OBJECT (fdsink, "No longer flushing");
GST_LOG_OBJECT (fdsink, "Clearing unlock command queue"); GST_OBJECT_LOCK (fdsink);
gst_poll_set_flushing (fdsink->fdset, FALSE);
while (TRUE) { GST_OBJECT_UNLOCK (fdsink);
gchar command;
int res;
READ_COMMAND (fdsink, command, res);
if (res < 0) {
GST_LOG_OBJECT (fdsink, "no more commands");
/* no more commands */
break;
}
}
return TRUE; return TRUE;
} }
@ -463,9 +423,20 @@ gst_fd_sink_update_fd (GstFdSink * fdsink, int new_fd)
/* assign the fd */ /* assign the fd */
GST_OBJECT_LOCK (fdsink); GST_OBJECT_LOCK (fdsink);
if (fdsink->fdset) {
GstPollFD fd = { 0 };
fd.fd = fdsink->fd;
gst_poll_remove_fd (fdsink->fdset, &fd);
fd.fd = new_fd;
gst_poll_add_fd (fdsink->fdset, &fd);
gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE);
}
fdsink->fd = new_fd; fdsink->fd = new_fd;
g_free (fdsink->uri); g_free (fdsink->uri);
fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd); fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd);
GST_OBJECT_UNLOCK (fdsink); GST_OBJECT_UNLOCK (fdsink);
return TRUE; return TRUE;

View file

@ -54,7 +54,7 @@ struct _GstFdSink {
gchar *uri; gchar *uri;
gint control_sock[2]; GstPoll *fdset;
int fd; int fd;
guint64 bytes_written; guint64 bytes_written;

View file

@ -50,24 +50,6 @@
#include "gstfdsrc.h" #include "gstfdsrc.h"
/* the select call is also performed on the control sockets, that way
* we can send special commands to unblock the select call */
#define CONTROL_STOP 'S' /* stop the select call */
#define CONTROL_SOCKETS(src) src->control_sock
#define WRITE_SOCKET(src) src->control_sock[1]
#define READ_SOCKET(src) src->control_sock[0]
#define SEND_COMMAND(src, command) \
G_STMT_START { \
unsigned char c; c = command; \
write (WRITE_SOCKET(src), &c, 1); \
} G_STMT_END
#define READ_COMMAND(src, command, res) \
G_STMT_START { \
res = read(READ_SOCKET(src), &command, 1); \
} G_STMT_END
#define DEFAULT_BLOCKSIZE 4096 #define DEFAULT_BLOCKSIZE 4096
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
@ -193,6 +175,21 @@ gst_fd_src_update_fd (GstFdSrc * src)
{ {
struct stat stat_results; struct stat stat_results;
/* we need to always update the fdset since it may not have existed when
* gst_fd_src_update_fd() was called earlier */
if (src->fdset != NULL) {
GstPollFD fd;
if (src->fd >= 0) {
fd.fd = src->fd;
gst_poll_remove_fd (src->fdset, &fd);
}
fd.fd = src->new_fd;
gst_poll_add_fd (src->fdset, &fd);
gst_poll_fd_ctl_read (src->fdset, &fd, TRUE);
}
if (src->fd != src->new_fd) { if (src->fd != src->new_fd) {
GST_INFO_OBJECT (src, "Updating to fd %d", src->new_fd); GST_INFO_OBJECT (src, "Updating to fd %d", src->new_fd);
@ -227,20 +224,13 @@ static gboolean
gst_fd_src_start (GstBaseSrc * bsrc) gst_fd_src_start (GstBaseSrc * bsrc)
{ {
GstFdSrc *src = GST_FD_SRC (bsrc); GstFdSrc *src = GST_FD_SRC (bsrc);
gint control_sock[2];
src->curoffset = 0; src->curoffset = 0;
gst_fd_src_update_fd (src); if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
goto socket_pair; goto socket_pair;
READ_SOCKET (src) = control_sock[0]; gst_fd_src_update_fd (src);
WRITE_SOCKET (src) = control_sock[1];
fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
return TRUE; return TRUE;
@ -258,8 +248,10 @@ gst_fd_src_stop (GstBaseSrc * bsrc)
{ {
GstFdSrc *src = GST_FD_SRC (bsrc); GstFdSrc *src = GST_FD_SRC (bsrc);
close (READ_SOCKET (src)); if (src->fdset) {
close (WRITE_SOCKET (src)); gst_poll_free (src->fdset);
src->fdset = NULL;
}
return TRUE; return TRUE;
} }
@ -269,8 +261,10 @@ gst_fd_src_unlock (GstBaseSrc * bsrc)
{ {
GstFdSrc *src = GST_FD_SRC (bsrc); GstFdSrc *src = GST_FD_SRC (bsrc);
GST_LOG_OBJECT (src, "sending unlock command"); GST_LOG_OBJECT (src, "Flushing");
SEND_COMMAND (src, CONTROL_STOP); GST_OBJECT_LOCK (src);
gst_poll_set_flushing (src->fdset, TRUE);
GST_OBJECT_UNLOCK (src);
return TRUE; return TRUE;
} }
@ -280,22 +274,10 @@ gst_fd_src_unlock_stop (GstBaseSrc * bsrc)
{ {
GstFdSrc *src = GST_FD_SRC (bsrc); GstFdSrc *src = GST_FD_SRC (bsrc);
GST_LOG_OBJECT (src, "clearing unlock command queue"); GST_LOG_OBJECT (src, "No longer flushing");
GST_OBJECT_LOCK (src);
/* read all stop commands */ gst_poll_set_flushing (src->fdset, FALSE);
while (TRUE) { GST_OBJECT_UNLOCK (src);
gchar command;
int res;
GST_LOG_OBJECT (src, "reading command");
READ_COMMAND (src, command, res);
if (res < 0) {
GST_LOG_OBJECT (src, "no more commands");
/* no more commands */
break;
}
}
return TRUE; return TRUE;
} }
@ -352,26 +334,22 @@ gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
guint blocksize; guint blocksize;
#ifndef HAVE_WIN32 #ifndef HAVE_WIN32
fd_set readfds;
gint retval; gint retval;
#endif #endif
src = GST_FD_SRC (psrc); src = GST_FD_SRC (psrc);
#ifndef HAVE_WIN32 #ifndef HAVE_WIN32
FD_ZERO (&readfds);
FD_SET (src->fd, &readfds);
FD_SET (READ_SOCKET (src), &readfds);
do { do {
retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL); retval = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE);
} while ((retval == -1 && errno == EINTR)); } while (retval == -1 && errno == EINTR);
if (retval == -1) if (retval == -1) {
goto select_error; if (errno == EBUSY)
goto stopped;
if (FD_ISSET (READ_SOCKET (src), &readfds)) else
goto stopped; goto select_error;
}
#endif #endif
blocksize = GST_BASE_SRC (src)->blocksize; blocksize = GST_BASE_SRC (src)->blocksize;
@ -530,7 +508,7 @@ gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
} }
g_free (protocol); g_free (protocol);
if (sscanf (uri, "fd://%d", &fd) != 1) if (sscanf (uri, "fd://%d", &fd) != 1 || fd < 0)
return FALSE; return FALSE;
src->new_fd = fd; src->new_fd = fd;

View file

@ -64,7 +64,7 @@ struct _GstFdSrc {
gchar *uri; gchar *uri;
gint control_sock[2]; GstPoll *fdset;
gulong curoffset; /* current offset in file */ gulong curoffset; /* current offset in file */
}; };