gst-libs/gst/rtsp/gstrtspconnection.*: Use GstPoll for the rtsp connection.

Original commit message from CVS:
* gst-libs/gst/rtsp/gstrtspconnection.c:
(gst_rtsp_connection_create), (gst_rtsp_connection_connect),
(gst_rtsp_connection_write), (gst_rtsp_connection_read_internal),
(gst_rtsp_connection_receive), (gst_rtsp_connection_close),
(gst_rtsp_connection_free), (gst_rtsp_connection_poll),
(gst_rtsp_connection_flush):
* gst-libs/gst/rtsp/gstrtspconnection.h:
Use GstPoll for the rtsp connection.
This commit is contained in:
Wim Taymans 2008-02-28 09:50:52 +00:00
parent 8ef40f64a7
commit 0667334b2d
3 changed files with 104 additions and 180 deletions

View file

@ -1,3 +1,14 @@
2008-02-28 Wim Taymans <wim.taymans@collabora.co.uk>
* gst-libs/gst/rtsp/gstrtspconnection.c:
(gst_rtsp_connection_create), (gst_rtsp_connection_connect),
(gst_rtsp_connection_write), (gst_rtsp_connection_read_internal),
(gst_rtsp_connection_receive), (gst_rtsp_connection_close),
(gst_rtsp_connection_free), (gst_rtsp_connection_poll),
(gst_rtsp_connection_flush):
* gst-libs/gst/rtsp/gstrtspconnection.h:
Use GstPoll for the rtsp connection.
2008-02-27 Wim Taymans <wim.taymans@collabora.co.uk>
* tests/examples/seek/seek.c: (vis_toggle_cb), (filter_features),

View file

@ -90,25 +90,6 @@
#include "gstrtspconnection.h"
#include "gstrtspbase64.h"
/* the select call is also performed on the control sockets, that way
* we can send special commands to unlock or restart the select call */
#define CONTROL_RESTART 'R' /* restart the select call */
#define CONTROL_STOP 'S' /* stop the select call */
#define CONTROL_SOCKETS(conn) conn->control_sock
#define WRITE_SOCKET(conn) conn->control_sock[1]
#define READ_SOCKET(conn) conn->control_sock[0]
#define SEND_COMMAND(conn, command) \
G_STMT_START { \
unsigned char c; c = command; \
write (WRITE_SOCKET(conn), &c, 1); \
} G_STMT_END
#define READ_COMMAND(conn, command, res) \
G_STMT_START { \
res = read(READ_SOCKET(conn), &command, 1); \
} G_STMT_END
#ifdef G_OS_WIN32
#define FIONREAD_TYPE gulong
#define IOCTL_SOCKET ioctlsocket
@ -149,36 +130,17 @@ inet_aton (const char *c, struct in_addr *paddr)
GstRTSPResult
gst_rtsp_connection_create (GstRTSPUrl * url, GstRTSPConnection ** conn)
{
gint ret;
GstRTSPConnection *newconn;
#ifdef G_OS_WIN32
unsigned long flags;
#endif /* G_OS_WIN32 */
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
newconn = g_new0 (GstRTSPConnection, 1);
#ifdef G_OS_WIN32
/* This should work on UNIX too. PF_UNIX sockets replaced with pipe */
/* pipe( CONTROL_SOCKETS(newconn) ) */
if ((ret = _pipe (CONTROL_SOCKETS (newconn), 4096, _O_BINARY)) < 0)
goto no_socket_pair;
ioctlsocket (READ_SOCKET (newconn), FIONBIO, &flags);
ioctlsocket (WRITE_SOCKET (newconn), FIONBIO, &flags);
#else
if ((ret =
socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (newconn))) < 0)
goto no_socket_pair;
fcntl (READ_SOCKET (newconn), F_SETFL, O_NONBLOCK);
fcntl (WRITE_SOCKET (newconn), F_SETFL, O_NONBLOCK);
#endif
if ((newconn->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
goto no_fdset;
newconn->url = url;
newconn->fd = -1;
newconn->fd.fd = -1;
newconn->timer = g_timer_new ();
newconn->auth_method = GST_RTSP_AUTH_NONE;
@ -190,7 +152,7 @@ gst_rtsp_connection_create (GstRTSPUrl * url, GstRTSPConnection ** conn)
return GST_RTSP_OK;
/* ERRORS */
no_socket_pair:
no_fdset:
{
g_free (newconn);
return GST_RTSP_ESYS;
@ -207,6 +169,8 @@ no_socket_pair:
* forever. If @timeout contains a valid timeout, this function will return
* #GST_RTSP_ETIMEOUT after the timeout expired.
*
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK when a connection could be made.
*/
GstRTSPResult
@ -220,10 +184,8 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
gint ret;
guint16 port;
GstRTSPUrl *url;
fd_set writefds;
fd_set readfds;
struct timeval tv, *tvp;
gint max_fd, retval;
GstClockTime to;
gint retval;
#ifdef G_OS_WIN32
unsigned long flags;
@ -235,7 +197,7 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->fd < 0, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->fd.fd < 0, GST_RTSP_EINVAL);
url = conn->url;
@ -279,6 +241,10 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
ioctlsocket (fd, FIONBIO, &flags);
#endif /* G_OS_WIN32 */
/* add the socket to our fdset */
conn->fd.fd = fd;
gst_poll_add_fd (conn->fdset, &conn->fd);
/* we are going to connect ASYNC now */
ret = connect (fd, (struct sockaddr *) &sa_in, sizeof (sa_in));
if (ret == 0)
@ -288,24 +254,13 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
/* wait for connect to complete up to the specified timeout or until we got
* interrupted. */
FD_ZERO (&writefds);
FD_SET (fd, &writefds);
FD_ZERO (&readfds);
FD_SET (READ_SOCKET (conn), &readfds);
gst_poll_fd_ctl_write (conn->fdset, &conn->fd, TRUE);
if (timeout) {
tv.tv_sec = timeout->tv_sec;
tv.tv_usec = timeout->tv_usec;
tvp = &tv;
} else {
tvp = NULL;
}
max_fd = MAX (fd, READ_SOCKET (conn));
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
do {
retval = select (max_fd + 1, &readfds, &writefds, NULL, tvp);
} while ((retval == -1 && errno == EINTR));
retval = gst_poll_wait (conn->fdset, to);
} while (retval == -1 && errno == EINTR);
if (retval == 0)
goto timeout;
@ -313,14 +268,17 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
goto sys_error;
done:
conn->fd = fd;
conn->ip = g_strdup (ip);
return GST_RTSP_OK;
sys_error:
{
if (fd != -1)
if (conn->fd.fd >= 0) {
gst_poll_remove_fd (conn->fdset, &conn->fd);
conn->fd.fd = -1;
}
if (fd >= 0)
CLOSE_SOCKET (fd);
return GST_RTSP_ESYS;
}
@ -334,6 +292,12 @@ not_ip:
}
timeout:
{
if (conn->fd.fd >= 0) {
gst_poll_remove_fd (conn->fdset, &conn->fd);
conn->fd.fd = -1;
}
if (fd >= 0)
CLOSE_SOCKET (fd);
return GST_RTSP_ETIMEOUT;
}
}
@ -399,7 +363,7 @@ add_date_header (GstRTSPMessage * message)
* the specified @timeout. @timeout can be #NULL, in which case this function
* might block forever.
*
* This function can be canceled with gst_rtsp_connection_flush().
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK on success.
*/
@ -408,52 +372,39 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
guint size, GTimeVal * timeout)
{
guint towrite;
fd_set writefds;
fd_set readfds;
int max_fd;
gint retval;
struct timeval tv, *tvp;
GstClockTime to;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
FD_ZERO (&writefds);
FD_SET (conn->fd, &writefds);
FD_ZERO (&readfds);
gst_poll_set_controllable (conn->fdset, TRUE);
gst_poll_fd_ctl_write (conn->fdset, &conn->fd, TRUE);
gst_poll_fd_ctl_read (conn->fdset, &conn->fd, FALSE);
max_fd = MAX (conn->fd, READ_SOCKET (conn));
if (timeout) {
tv.tv_sec = timeout->tv_sec;
tv.tv_usec = timeout->tv_usec;
tvp = &tv;
} else {
tvp = NULL;
}
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
towrite = size;
while (towrite > 0) {
gint written;
/* set bit inside the loop for when we loop to read the rest of the data */
FD_SET (READ_SOCKET (conn), &readfds);
do {
retval = select (max_fd + 1, &readfds, &writefds, NULL, tvp);
} while ((retval == -1 && errno == EINTR));
retval = gst_poll_wait (conn->fdset, to);
} while (retval == -1 && errno == EINTR);
if (retval == 0)
goto timeout;
if (retval == -1)
goto select_error;
if (FD_ISSET (READ_SOCKET (conn), &readfds))
goto stopped;
if (retval == -1) {
if (errno == EBUSY)
goto stopped;
else
goto select_error;
}
/* now we can write */
written = write (conn->fd, data, towrite);
written = write (conn->fd.fd, data, towrite);
if (written < 0) {
if (errno != EAGAIN && errno != EINTR)
goto write_error;
@ -493,7 +444,7 @@ write_error:
* the specified @timeout. @timeout can be #NULL, in which case this function
* might block forever.
*
* This function can be canceled with gst_rtsp_connection_flush().
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK on success.
*/
@ -810,7 +761,7 @@ no_column:
* the specified @timeout. @timeout can be #NULL, in which case this function
* might block forever.
*
* This function can be canceled with gst_rtsp_connection_flush() only if the
* This function can be cancelled with gst_rtsp_connection_flush() only if
* @allow_interrupt is set.
*
* Returns: #GST_RTSP_OK on success.
@ -819,10 +770,9 @@ static GstRTSPResult
gst_rtsp_connection_read_internal (GstRTSPConnection * conn, guint8 * data,
guint size, GTimeVal * timeout, gboolean allow_interrupt)
{
fd_set readfds;
guint toread;
gint retval;
struct timeval tv_timeout, *ptv_timeout;
GstClockTime to;
FIONREAD_TYPE avail;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
@ -834,49 +784,41 @@ gst_rtsp_connection_read_internal (GstRTSPConnection * conn, guint8 * data,
toread = size;
/* configure timeout if any */
if (timeout != NULL) {
tv_timeout.tv_sec = timeout->tv_sec;
tv_timeout.tv_usec = timeout->tv_usec;
ptv_timeout = &tv_timeout;
} else
ptv_timeout = NULL;
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
/* if the call fails, just go in the select.. it should not fail. Else if
* there is enough data to read, skip the select call al together.*/
if (IOCTL_SOCKET (conn->fd, FIONREAD, &avail) < 0)
if (IOCTL_SOCKET (conn->fd.fd, FIONREAD, &avail) < 0)
avail = 0;
else if (avail >= toread)
goto do_read;
FD_ZERO (&readfds);
FD_SET (conn->fd, &readfds);
gst_poll_set_controllable (conn->fdset, allow_interrupt);
gst_poll_fd_ctl_write (conn->fdset, &conn->fd, FALSE);
gst_poll_fd_ctl_read (conn->fdset, &conn->fd, TRUE);
while (toread > 0) {
gint bytes;
/* set inside the loop so that when we did not read enough and we have to
* continue, we still have the cancel socket bit set */
if (allow_interrupt)
FD_SET (READ_SOCKET (conn), &readfds);
do {
retval = select (FD_SETSIZE, &readfds, NULL, NULL, ptv_timeout);
} while ((retval == -1 && errno == EINTR));
retval = gst_poll_wait (conn->fdset, to);
} while (retval == -1 && errno == EINTR);
if (retval == -1)
goto select_error;
if (retval == -1) {
if (errno == EBUSY)
goto stopped;
else
goto select_error;
}
/* check for timeout */
if (retval == 0)
goto select_timeout;
if (FD_ISSET (READ_SOCKET (conn), &readfds))
goto stopped;
do_read:
/* if we get here there is activity on the real fd since the select
* completed and the control socket was not readable. */
bytes = read (conn->fd, data, toread);
bytes = read (conn->fd.fd, data, toread);
if (bytes == 0) {
goto eof;
@ -924,7 +866,7 @@ read_error:
* the specified @timeout. @timeout can be #NULL, in which case this function
* might block forever.
*
* This function can be canceled with gst_rtsp_connection_flush().
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK on success.
*/
@ -980,7 +922,7 @@ read_error:
* the specified @timeout. @timeout can be #NULL, in which case this function
* might block forever.
*
* This function can be canceled with gst_rtsp_connection_flush().
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK on success.
*/
@ -1047,7 +989,7 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
break;
/* read lines */
GST_RTSP_CHECK (read_line (conn->fd, buffer + offset,
GST_RTSP_CHECK (read_line (conn->fd.fd, buffer + offset,
sizeof (buffer) - offset), read_error);
if (buffer[0] == '\0')
@ -1142,12 +1084,13 @@ gst_rtsp_connection_close (GstRTSPConnection * conn)
g_free (conn->ip);
conn->ip = NULL;
if (conn->fd != -1) {
res = CLOSE_SOCKET (conn->fd);
if (conn->fd.fd != -1) {
gst_poll_remove_fd (conn->fdset, &conn->fd);
res = CLOSE_SOCKET (conn->fd.fd);
conn->fd.fd = -1;
#ifdef G_OS_WIN32
WSACleanup ();
#endif
conn->fd = -1;
if (res != 0)
goto sys_error;
}
@ -1175,14 +1118,11 @@ gst_rtsp_connection_free (GstRTSPConnection * conn)
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
if (WRITE_SOCKET (conn) >= 0)
CLOSE_SOCKET (WRITE_SOCKET (conn));
if (READ_SOCKET (conn) >= 0)
CLOSE_SOCKET (READ_SOCKET (conn));
res = gst_rtsp_connection_close (conn);
gst_poll_free (conn->fdset);
#ifdef G_OS_WIN32
WSACleanup ();
#endif
res = gst_rtsp_connection_close (conn);
g_timer_destroy (conn->timer);
g_free (conn->username);
g_free (conn->passwd);
@ -1205,7 +1145,7 @@ gst_rtsp_connection_free (GstRTSPConnection * conn)
*
* @timeout can be #NULL, in which case this function might block forever.
*
* This function can be canceled with gst_rtsp_connection_flush().
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK on success.
*
@ -1215,60 +1155,45 @@ GstRTSPResult
gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
GstRTSPEvent * revents, GTimeVal * timeout)
{
fd_set writefds, *pwritefds;
fd_set readfds;
int max_fd;
GstClockTime to;
gint retval;
struct timeval tv, *tvp;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL);
if (events & GST_RTSP_EV_WRITE) {
/* add fd to writer set when asked to */
FD_ZERO (&writefds);
FD_SET (conn->fd, &writefds);
pwritefds = &writefds;
} else
pwritefds = NULL;
gst_poll_set_controllable (conn->fdset, TRUE);
/* always add cancel socket to readfds */
FD_ZERO (&readfds);
FD_SET (READ_SOCKET (conn), &readfds);
if (events & GST_RTSP_EV_READ) {
/* add fd to reader set when asked to */
FD_SET (conn->fd, &readfds);
}
max_fd = MAX (conn->fd, READ_SOCKET (conn));
/* add fd to writer set when asked to */
gst_poll_fd_ctl_write (conn->fdset, &conn->fd, events & GST_RTSP_EV_WRITE);
if (timeout) {
tv.tv_sec = timeout->tv_sec;
tv.tv_usec = timeout->tv_usec;
tvp = &tv;
} else
tvp = NULL;
/* add fd to reader set when asked to */
gst_poll_fd_ctl_read (conn->fdset, &conn->fd, events & GST_RTSP_EV_READ);
/* configure timeout if any */
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
do {
retval = select (max_fd + 1, &readfds, pwritefds, NULL, tvp);
} while ((retval == -1 && errno == EINTR));
retval = gst_poll_wait (conn->fdset, to);
} while (retval == -1 && errno == EINTR);
if (retval == 0)
goto select_timeout;
if (retval == -1)
goto select_error;
if (FD_ISSET (READ_SOCKET (conn), &readfds))
goto stopped;
if (retval == -1) {
if (errno == EBUSY)
goto stopped;
else
goto select_error;
}
*revents = 0;
if (events & GST_RTSP_EV_READ) {
if (FD_ISSET (conn->fd, &readfds))
if (gst_poll_fd_can_read (conn->fdset, &conn->fd))
*revents |= GST_RTSP_EV_READ;
}
if (events & GST_RTSP_EV_WRITE) {
if (FD_ISSET (conn->fd, &writefds))
if (gst_poll_fd_can_write (conn->fdset, &conn->fd))
*revents |= GST_RTSP_EV_WRITE;
}
return GST_RTSP_OK;
@ -1355,20 +1280,8 @@ gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
{
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
if (flush) {
SEND_COMMAND (conn, CONTROL_STOP);
} else {
while (TRUE) {
gchar command;
int res;
gst_poll_set_flushing (conn->fdset, flush);
READ_COMMAND (conn, command, res);
if (res <= 0) {
/* no more commands */
break;
}
}
}
return GST_RTSP_OK;
}

View file

@ -65,9 +65,9 @@ struct _GstRTSPConnection
GstRTSPUrl *url;
/* connection state */
gint fd;
gint control_sock[2];
gchar *ip;
GstPollFD fd;
GstPoll *fdset;
gchar *ip;
/* Session state */
gint cseq; /* sequence number */