From e04f7a828f2580402215ab3bb3ad20d842ec36e2 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 18 May 2007 11:39:12 +0000 Subject: [PATCH] gst/rtsp/gstrtspsrc.*: Add TCP timeout property and use it for all TCP connection. Original commit message from CVS: * gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_class_init), (gst_rtspsrc_set_property), (gst_rtspsrc_get_property), (gst_rtspsrc_stream_configure_udp), (gst_rtspsrc_loop_interleaved), (gst_rtspsrc_loop_udp), (gst_rtspsrc_try_send), (gst_rtspsrc_send), (gst_rtspsrc_setup_streams), (gst_rtspsrc_open): * gst/rtsp/gstrtspsrc.h: Add TCP timeout property and use it for all TCP connection. * gst/rtsp/rtspconnection.c: (rtsp_connection_connect), (rtsp_connection_write), (rtsp_connection_next_timeout), (rtsp_connection_reset_timeout): Make connect and writes cancelable and make them use the timeout. --- ChangeLog | 15 ++++++ gst/rtsp/gstrtspsrc.c | 57 ++++++++++++++++++---- gst/rtsp/gstrtspsrc.h | 3 +- gst/rtsp/rtspconnection.c | 100 +++++++++++++++++++++++++++++++++++++- 4 files changed, 162 insertions(+), 13 deletions(-) diff --git a/ChangeLog b/ChangeLog index 09924a5977..46a356d438 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,18 @@ +2007-05-18 Wim Taymans + + * gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_class_init), + (gst_rtspsrc_set_property), (gst_rtspsrc_get_property), + (gst_rtspsrc_stream_configure_udp), (gst_rtspsrc_loop_interleaved), + (gst_rtspsrc_loop_udp), (gst_rtspsrc_try_send), (gst_rtspsrc_send), + (gst_rtspsrc_setup_streams), (gst_rtspsrc_open): + * gst/rtsp/gstrtspsrc.h: + Add TCP timeout property and use it for all TCP connection. + + * gst/rtsp/rtspconnection.c: (rtsp_connection_connect), + (rtsp_connection_write), (rtsp_connection_next_timeout), + (rtsp_connection_reset_timeout): + Make connect and writes cancelable and make them use the timeout. + 2007-05-18 Wim Taymans * gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_send_keep_alive), diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index d7743a9899..8ce75ecaaa 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -145,6 +145,7 @@ enum #define DEFAULT_DEBUG FALSE #define DEFAULT_RETRY 20 #define DEFAULT_TIMEOUT 5000000 +#define DEFAULT_TCP_TIMEOUT 20000000 #define DEFAULT_LATENCY_MS 3000 enum @@ -155,6 +156,7 @@ enum PROP_DEBUG, PROP_RETRY, PROP_TIMEOUT, + PROP_TCP_TIMEOUT, PROP_LATENCY, }; @@ -279,10 +281,16 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass) g_object_class_install_property (gobject_class, PROP_TIMEOUT, g_param_spec_uint64 ("timeout", "Timeout", - "Retry TCP transport after timeout microseconds (0 = disabled)", + "Retry TCP transport after UDP timeout microseconds (0 = disabled)", 0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (gobject_class, PROP_TCP_TIMEOUT, + g_param_spec_uint64 ("tcp-timeout", "TCP Timeout", + "Fail after timeout microseconds on TCP connections (0 = disabled)", + 0, G_MAXUINT64, DEFAULT_TCP_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + g_object_class_install_property (gobject_class, PROP_LATENCY, g_param_spec_uint ("latency", "Buffer latency in ms", "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS, @@ -365,8 +373,16 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value, rtspsrc->retry = g_value_get_uint (value); break; case PROP_TIMEOUT: - rtspsrc->timeout = g_value_get_uint64 (value); + rtspsrc->udp_timeout = g_value_get_uint64 (value); break; + case PROP_TCP_TIMEOUT: + { + guint64 timeout = g_value_get_uint64 (value); + + rtspsrc->tcp_timeout.tv_sec = timeout / G_USEC_PER_SEC; + rtspsrc->tcp_timeout.tv_usec = timeout % G_USEC_PER_SEC; + break; + } case PROP_LATENCY: rtspsrc->latency = g_value_get_uint (value); break; @@ -398,8 +414,17 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value, g_value_set_uint (value, rtspsrc->retry); break; case PROP_TIMEOUT: - g_value_set_uint64 (value, rtspsrc->timeout); + g_value_set_uint64 (value, rtspsrc->udp_timeout); break; + case PROP_TCP_TIMEOUT: + { + guint64 timeout; + + timeout = rtspsrc->tcp_timeout.tv_sec * G_USEC_PER_SEC + + rtspsrc->tcp_timeout.tv_usec; + g_value_set_uint64 (value, timeout); + break; + } case PROP_LATENCY: g_value_set_uint (value, rtspsrc->latency); break; @@ -1688,7 +1713,8 @@ gst_rtspsrc_stream_configure_udp (GstRTSPSrc * src, GstRTSPStream * stream, /* configure a timeout on the UDP port. When the timeout message is * posted, we assume UDP transport is not possible. We reconnect using TCP * if we can. */ - g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", src->timeout, NULL); + g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", src->udp_timeout, + NULL); /* get output pad of the UDP source. */ *outpad = gst_element_get_pad (stream->udpsrc[0], "src"); @@ -2137,7 +2163,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) have_data = FALSE; do { - GTimeVal tv_timeout; + GTimeVal tv_timeout, *tv; /* get the next timeout interval */ rtsp_connection_next_timeout (src->connection, &tv_timeout); @@ -2149,9 +2175,14 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) res = gst_rtspsrc_send_keep_alive (src); } + if ((src->tcp_timeout.tv_sec | src->tcp_timeout.tv_usec)) + tv = &src->tcp_timeout; + else + tv = NULL; + GST_DEBUG_OBJECT (src, "doing receive"); - res = rtsp_connection_receive (src->connection, &message, NULL); + res = rtsp_connection_receive (src->connection, &message, tv); switch (res) { case RTSP_OK: @@ -2425,7 +2456,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), ("Could not receive any UDP packets for %.4f seconds, maybe your " "firewall is blocking it. Retrying using a TCP connection.", - gst_guint64_to_gdouble (src->timeout / 1000000))); + gst_guint64_to_gdouble (src->udp_timeout / 1000000))); /* we can try only TCP now */ src->cur_protocols = RTSP_LOWER_TRANS_TCP; @@ -2685,6 +2716,7 @@ gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request, RTSPResult res; RTSPStatusCode thecode; gchar *content_base = NULL; + GTimeVal *tv; if (src->extension && src->extension->before_send) src->extension->before_send (src->extension, request); @@ -2694,13 +2726,18 @@ gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request, if (src->debug) rtsp_message_dump (request); - if ((res = rtsp_connection_send (src->connection, request, NULL)) < 0) + if ((src->tcp_timeout.tv_sec | src->tcp_timeout.tv_usec)) + tv = &src->tcp_timeout; + else + tv = NULL; + + if ((res = rtsp_connection_send (src->connection, request, tv)) < 0) goto send_error; rtsp_connection_reset_timeout (src->connection); next: - if ((res = rtsp_connection_receive (src->connection, response, NULL)) < 0) + if ((res = rtsp_connection_receive (src->connection, response, tv)) < 0) goto receive_error; if (src->debug) @@ -3338,7 +3375,7 @@ gst_rtspsrc_open (GstRTSPSrc * src) /* connect */ GST_DEBUG_OBJECT (src, "connecting (%s)...", src->req_location); - if ((res = rtsp_connection_connect (src->connection, NULL)) < 0) + if ((res = rtsp_connection_connect (src->connection, &src->tcp_timeout)) < 0) goto could_not_connect; /* create OPTIONS */ diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index 3bada0663b..caf9efe980 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -147,7 +147,8 @@ struct _GstRTSPSrc { RTSPLowerTrans protocols; gboolean debug; guint retry; - guint64 timeout; + guint64 udp_timeout; + GTimeVal tcp_timeout; guint latency; /* state */ diff --git a/gst/rtsp/rtspconnection.c b/gst/rtsp/rtspconnection.c index cb81c72c97..086dddcb79 100644 --- a/gst/rtsp/rtspconnection.c +++ b/gst/rtsp/rtspconnection.c @@ -172,6 +172,10 @@ rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout) gint ret; guint16 port; RTSPUrl *url; + fd_set writefds; + fd_set readfds; + struct timeval tv, *tvp; + gint max_fd, retval; g_return_val_if_fail (conn != NULL, RTSP_EINVAL); g_return_val_if_fail (conn->url != NULL, RTSP_EINVAL); @@ -207,12 +211,42 @@ rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout) goto sys_error; /* set to non-blocking mode so that we can cancel the connect */ - //fcntl (fd, F_SETFL, O_NONBLOCK); + fcntl (fd, F_SETFL, O_NONBLOCK); + /* we are going to connect ASYNC now */ ret = connect (fd, (struct sockaddr *) &sin, sizeof (sin)); - if (ret != 0) + if (ret == 0) + goto done; + if (errno != EINPROGRESS) goto sys_error; + /* wait for connect to complete up to the specified timeout or until we got + * interrupted. */ + FD_ZERO (&writefds); + FD_SET (fd, &writefds); + FD_ZERO (&readfds); + FD_SET (READ_SOCKET (conn), &readfds); + + if (timeout->tv_sec != 0 || timeout->tv_usec != 0) { + tv.tv_sec = timeout->tv_sec; + tv.tv_usec = timeout->tv_usec; + tvp = &tv; + } else { + tvp = NULL; + } + + max_fd = MAX (fd, READ_SOCKET (conn)); + + do { + retval = select (max_fd + 1, &readfds, &writefds, NULL, tvp); + } while ((retval == -1 && errno == EINTR)); + + if (retval == 0) + goto timeout; + else if (retval == -1) + goto sys_error; + +done: conn->fd = fd; conn->ip = ip; @@ -232,6 +266,10 @@ not_ip: { return RTSP_ENOTIP; } +timeout: + { + return RTSP_ETIMEOUT; + } } static void @@ -270,15 +308,61 @@ rtsp_connection_write (RTSPConnection * conn, const guint8 * data, guint size, GTimeVal * timeout) { guint towrite; + fd_set writefds; + fd_set readfds; + int max_fd; + gint retval; + struct timeval tv, *tvp; g_return_val_if_fail (conn != NULL, RTSP_EINVAL); g_return_val_if_fail (data != NULL || size == 0, RTSP_EINVAL); + FD_ZERO (&writefds); + FD_SET (conn->fd, &writefds); + FD_ZERO (&readfds); + FD_SET (READ_SOCKET (conn), &readfds); + + max_fd = MAX (conn->fd, READ_SOCKET (conn)); + + if (timeout) { + tv.tv_sec = timeout->tv_sec; + tv.tv_usec = timeout->tv_usec; + tvp = &tv; + } else { + tvp = NULL; + } + towrite = size; while (towrite > 0) { gint written; + do { + retval = select (max_fd + 1, &readfds, &writefds, NULL, tvp); + } while ((retval == -1 && errno == EINTR)); + + if (retval == 0) + goto timeout; + + if (retval == -1) + goto select_error; + + if (FD_ISSET (READ_SOCKET (conn), &readfds)) { + /* read all stop commands */ + while (TRUE) { + gchar command; + int res; + + READ_COMMAND (conn, command, res); + if (res <= 0) { + /* no more commands */ + break; + } + } + goto stopped; + } + + /* now we can write */ written = write (conn->fd, data, towrite); if (written < 0) { if (errno != EAGAIN && errno != EINTR) @@ -291,6 +375,18 @@ rtsp_connection_write (RTSPConnection * conn, const guint8 * data, guint size, return RTSP_OK; /* ERRORS */ +timeout: + { + return RTSP_ETIMEOUT; + } +select_error: + { + return RTSP_ESYS; + } +stopped: + { + return RTSP_EINTR; + } write_error: { return RTSP_ESYS;