From 966ced220858a2f15f1efeb2a739a0db977110d5 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 18 Jun 2010 15:08:21 +0200 Subject: [PATCH] rtspsrc: factor out the connections Keep a global connection for aggregate control but also keep stream connections for non-aggregate control. Add some helper methods to connect/close/flush the connections. --- gst/rtsp/gstrtspsrc.c | 591 +++++++++++++++++++++++++++--------------- gst/rtsp/gstrtspsrc.h | 22 +- 2 files changed, 398 insertions(+), 215 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index cbfb87eb3c..f6da483487 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -431,8 +431,7 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class) } #endif - src->location = g_strdup (DEFAULT_LOCATION); - src->url = NULL; + src->conninfo.location = g_strdup (DEFAULT_LOCATION); src->protocols = DEFAULT_PROTOCOLS; src->debug = DEFAULT_DEBUG; src->retry = DEFAULT_RETRY; @@ -479,9 +478,8 @@ gst_rtspsrc_finalize (GObject * object) rtspsrc = GST_RTSPSRC (object); gst_rtsp_ext_list_free (rtspsrc->extensions); - g_free (rtspsrc->location); - g_free (rtspsrc->req_location); - gst_rtsp_url_free (rtspsrc->url); + g_free (rtspsrc->conninfo.location); + gst_rtsp_url_free (rtspsrc->conninfo.url); g_free (rtspsrc->user_id); g_free (rtspsrc->user_pw); @@ -638,7 +636,7 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value, switch (prop_id) { case PROP_LOCATION: - g_value_set_string (value, rtspsrc->location); + g_value_set_string (value, rtspsrc->conninfo.location); break; case PROP_PROTOCOLS: g_value_set_flags (value, rtspsrc->protocols); @@ -748,7 +746,7 @@ static gint find_stream_by_setup (GstRTSPStream * stream, gconstpointer a) { /* check qualified setup_url */ - if (!strcmp (stream->setup_url, (gchar *) a)) + if (!strcmp (stream->conninfo.location, (gchar *) a)) return 0; /* check original control_url */ if (!strcmp (stream->control_url, (gchar *) a)) @@ -952,7 +950,7 @@ gst_rtspsrc_create_stream (GstRTSPSrc * src, GstSDPMessage * sdp, gint idx) * likely build a URL that the server will fail to understand, this is ok, * we will fail then. */ if (g_str_has_prefix (control_url, "rtsp://")) - stream->setup_url = g_strdup (control_url); + stream->conninfo.location = g_strdup (control_url); else { const gchar *base; gboolean has_slash; @@ -961,8 +959,8 @@ gst_rtspsrc_create_stream (GstRTSPSrc * src, GstSDPMessage * sdp, gint idx) base = src->control; else if (src->content_base) base = src->content_base; - else if (src->req_location) - base = src->req_location; + else if (src->conninfo.url_str) + base = src->conninfo.url_str; else base = "/"; @@ -971,11 +969,12 @@ gst_rtspsrc_create_stream (GstRTSPSrc * src, GstSDPMessage * sdp, gint idx) has_slash = has_slash || g_str_has_suffix (base, "/"); /* concatenate the two strings, insert / when not present */ - stream->setup_url = + stream->conninfo.location = g_strdup_printf ("%s%s%s", base, has_slash ? "" : "/", control_url); } } - GST_DEBUG_OBJECT (src, " setup: %s", GST_STR_NULL (stream->setup_url)); + GST_DEBUG_OBJECT (src, " setup: %s", + GST_STR_NULL (stream->conninfo.location)); /* we keep track of all streams */ src->streams = g_list_append (src->streams, stream); @@ -997,7 +996,7 @@ gst_rtspsrc_stream_free (GstRTSPSrc * src, GstRTSPStream * stream) g_free (stream->destination); g_free (stream->control_url); - g_free (stream->setup_url); + g_free (stream->conninfo.location); for (i = 0; i < 2; i++) { if (stream->udpsrc[i]) { @@ -1075,6 +1074,11 @@ gst_rtspsrc_cleanup (GstRTSPSrc * src) if (src->range) gst_rtsp_range_free (src->range); src->range = NULL; + + if (src->sdp) { + gst_sdp_message_free (src->sdp); + src->sdp = NULL; + } } #define PARSE_INT(p, del, res) \ @@ -1565,14 +1569,14 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush) } static GstRTSPResult -gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPMessage * message, - GTimeVal * timeout) +gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnection * conn, + GstRTSPMessage * message, GTimeVal * timeout) { GstRTSPResult ret; GST_RTSP_CONN_LOCK (src); - if (src->connection) - ret = gst_rtsp_connection_send (src->connection, message, timeout); + if (conn) + ret = gst_rtsp_connection_send (conn, message, timeout); else ret = GST_RTSP_ERROR; GST_RTSP_CONN_UNLOCK (src); @@ -1581,14 +1585,14 @@ gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPMessage * message, } static GstRTSPResult -gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPMessage * message, - GTimeVal * timeout) +gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnection * conn, + GstRTSPMessage * message, GTimeVal * timeout) { GstRTSPResult ret; GST_RTSP_CONN_LOCK (src); - if (src->connection) - ret = gst_rtsp_connection_receive (src->connection, message, timeout); + if (conn) + ret = gst_rtsp_connection_receive (conn, message, timeout); else ret = GST_RTSP_ERROR; GST_RTSP_CONN_UNLOCK (src); @@ -1955,6 +1959,7 @@ gst_rtspsrc_sink_chain (GstPad * pad, GstBuffer * buffer) guint size; GstRTSPResult ret; GstRTSPMessage message = { 0 }; + GstRTSPConnection *conn; stream = (GstRTSPStream *) gst_pad_get_element_private (pad); src = stream->parent; @@ -1967,8 +1972,13 @@ gst_rtspsrc_sink_chain (GstPad * pad, GstBuffer * buffer) /* lend the body data to the message */ gst_rtsp_message_take_body (&message, data, size); + if (stream->conninfo.connection) + conn = stream->conninfo.connection; + else + conn = src->conninfo.connection; + GST_DEBUG_OBJECT (src, "sending %u bytes RTCP", size); - ret = gst_rtspsrc_connection_send (src, &message, NULL); + ret = gst_rtspsrc_connection_send (src, conn, &message, NULL); GST_DEBUG_OBJECT (src, "sent RTCP, %d", ret); /* and steal it away again because we will free it when unreffing the @@ -2448,8 +2458,13 @@ gst_rtspsrc_get_transport_info (GstRTSPSrc * src, GstRTSPStream * stream, if (destination) { /* first take the source, then the endpoint to figure out where to send * the RTCP. */ - if (!(*destination = transport->source)) - *destination = gst_rtsp_connection_get_ip (src->connection); + if (!(*destination = transport->source)) { + if (src->conninfo.connection) + *destination = gst_rtsp_connection_get_ip (src->conninfo.connection); + else if (stream->conninfo.connection) + *destination = + gst_rtsp_connection_get_ip (stream->conninfo.connection); + } } if (min && max) { /* for unicast we only expect the ports here */ @@ -3055,9 +3070,126 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event) gst_event_unref (event); } +static GstRTSPResult +gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info) +{ + GstRTSPResult res; + + if (info->connection == NULL) { + if (info->url == NULL) { + GST_DEBUG_OBJECT (src, "parsing uri (%s)...", info->location); + if ((res = gst_rtsp_url_parse (info->location, &info->url)) < 0) + goto parse_error; + } + + /* create connection */ + GST_DEBUG_OBJECT (src, "creating connection (%s)...", info->location); + if ((res = gst_rtsp_connection_create (info->url, &info->connection)) < 0) + goto could_not_create; + + if (info->url_str) + g_free (info->url_str); + info->url_str = gst_rtsp_url_get_request_uri (info->url); + + GST_DEBUG_OBJECT (src, "sanitized uri %s", info->url_str); + + if (info->url->transports & GST_RTSP_LOWER_TRANS_HTTP) + gst_rtsp_connection_set_tunneled (info->connection, TRUE); + + if (src->proxy_host) { + GST_DEBUG_OBJECT (src, "setting proxy %s:%d", src->proxy_host, + src->proxy_port); + gst_rtsp_connection_set_proxy (info->connection, src->proxy_host, + src->proxy_port); + } + } + + if (!info->connected) { + /* connect */ + GST_DEBUG_OBJECT (src, "connecting (%s)...", info->location); + if ((res = + gst_rtsp_connection_connect (info->connection, + src->ptcp_timeout)) < 0) + goto could_not_connect; + + info->connected = TRUE; + } + return GST_RTSP_OK; + + /* ERRORS */ +parse_error: + { + GST_ERROR_OBJECT (src, "No valid RTSP URL was provided"); + return res; + } +could_not_create: + { + gchar *str = gst_rtsp_strresult (res); + GST_ERROR_OBJECT (src, "Could not create connection. (%s)", str); + g_free (str); + return res; + } +could_not_connect: + { + gchar *str = gst_rtsp_strresult (res); + GST_ERROR_OBJECT (src, "Could not connect to server. (%s)", str); + g_free (str); + return res; + } +} + +static GstRTSPResult +gst_rtsp_conninfo_close (GstRTSPSrc * src, GstRTSPConnInfo * info, + gboolean free) +{ + if (info->connected) { + GST_DEBUG_OBJECT (src, "closing connection..."); + gst_rtsp_connection_close (info->connection); + info->connected = FALSE; + } + if (free && info->connection) { + /* free connection */ + GST_DEBUG_OBJECT (src, "freeing connection..."); + gst_rtsp_connection_free (info->connection); + info->connection = NULL; + } + return GST_RTSP_OK; +} + +static GstRTSPResult +gst_rtsp_conninfo_reconnect (GstRTSPSrc * src, GstRTSPConnInfo * info) +{ + GstRTSPResult res; + + GST_DEBUG_OBJECT (src, "reconnecting connection..."); + gst_rtsp_conninfo_close (src, info, FALSE); + res = gst_rtsp_conninfo_connect (src, info); + + return res; +} + +static void +gst_rtspsrc_connection_flush (GstRTSPSrc * src, gboolean flush) +{ + GList *walk; + + GST_DEBUG_OBJECT (src, "set flushing %d", flush); + if (src->conninfo.connection) { + GST_DEBUG_OBJECT (src, "connection flush"); + gst_rtsp_connection_flush (src->conninfo.connection, flush); + } + for (walk = src->streams; walk; walk = g_list_next (walk)) { + GstRTSPStream *stream = (GstRTSPStream *) walk->data; + GST_DEBUG_OBJECT (src, "stream %p flush", stream); + if (stream->conninfo.connection) + gst_rtsp_connection_flush (stream->conninfo.connection, flush); + } +} + /* FIXME, handle server request, reply with OK, for now */ static GstRTSPResult -gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPMessage * request) +gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPConnection * conn, + GstRTSPMessage * request) { GstRTSPMessage response = { 0 }; GstRTSPResult res; @@ -3082,7 +3214,7 @@ gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPMessage * request) if (src->debug) gst_rtsp_message_dump (&response); - res = gst_rtspsrc_connection_send (src, &response, NULL); + res = gst_rtspsrc_connection_send (src, conn, &response, NULL); if (res < 0) goto send_error; } else if (res == GST_RTSP_EEOF) @@ -3117,7 +3249,7 @@ gst_rtspsrc_send_keep_alive (GstRTSPSrc * src) if (src->control) control = src->control; else - control = src->req_location; + control = src->conninfo.url_str; if (control == NULL) goto no_control; @@ -3129,11 +3261,13 @@ gst_rtspsrc_send_keep_alive (GstRTSPSrc * src) if (src->debug) gst_rtsp_message_dump (&request); - res = gst_rtspsrc_connection_send (src, &request, NULL); + res = + gst_rtspsrc_connection_send (src, src->conninfo.connection, &request, + NULL); if (res < 0) goto send_error; - gst_rtsp_connection_reset_timeout (src->connection); + gst_rtsp_connection_reset_timeout (src->conninfo.connection); gst_rtsp_message_unset (&request); return GST_RTSP_OK; @@ -3176,7 +3310,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) GTimeVal tv_timeout; /* get the next timeout interval */ - gst_rtsp_connection_next_timeout (src->connection, &tv_timeout); + gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout); /* see if the timeout period expired */ if ((tv_timeout.tv_sec | tv_timeout.tv_usec) == 0) { @@ -3184,7 +3318,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) /* send keep-alive, ignore the result, a warning will be posted. */ gst_rtspsrc_send_keep_alive (src); /* get new timeout */ - gst_rtsp_connection_next_timeout (src->connection, &tv_timeout); + gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout); } GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec", @@ -3192,7 +3326,9 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) /* protect the connection with the connection lock so that we can see when * we are finished doing server communication */ - res = gst_rtspsrc_connection_receive (src, &message, src->ptcp_timeout); + res = + gst_rtspsrc_connection_receive (src, src->conninfo.connection, &message, + src->ptcp_timeout); switch (res) { case GST_RTSP_OK: @@ -3216,7 +3352,9 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) switch (message.type) { case GST_RTSP_MESSAGE_REQUEST: /* server sends us a request message, handle it */ - res = gst_rtspsrc_handle_request (src, &message); + res = + gst_rtspsrc_handle_request (src, src->conninfo.connection, + &message); if (res == GST_RTSP_EEOF) goto server_eof; else if (res < 0) @@ -3360,7 +3498,7 @@ server_eof: GST_DEBUG_OBJECT (src, "we got an eof from the server"); GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), ("The server closed the connection.")); - src->connected = FALSE; + src->conninfo.connected = FALSE; gst_rtsp_message_unset (&message); return GST_FLOW_UNEXPECTED; } @@ -3369,7 +3507,7 @@ interrupt: gst_rtsp_message_unset (&message); GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush"); /* unset flushing so we can do something else */ - gst_rtsp_connection_flush (src->connection, FALSE); + gst_rtspsrc_connection_flush (src, FALSE); return GST_FLOW_WRONG_STATE; } receive_error: @@ -3420,7 +3558,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) GTimeVal tv_timeout; /* get the next timeout interval */ - gst_rtsp_connection_next_timeout (src->connection, &tv_timeout); + gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout); GST_DEBUG_OBJECT (src, "doing receive with timeout %d seconds", (gint) tv_timeout.tv_sec); @@ -3428,7 +3566,9 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) /* we should continue reading the TCP socket because the server might * send us requests. When the session timeout expires, we need to send a * keep-alive request to keep the session open. */ - res = gst_rtspsrc_connection_receive (src, &message, &tv_timeout); + res = + gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, &tv_timeout); switch (res) { case GST_RTSP_OK: @@ -3438,7 +3578,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) /* we got interrupted, see what we have to do */ GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush"); /* unset flushing so we can do something else */ - gst_rtsp_connection_flush (src->connection, FALSE); + gst_rtspsrc_connection_flush (src, FALSE); goto interrupt; case GST_RTSP_ETIMEOUT: /* send keep-alive, ignore the result, a warning will be posted. */ @@ -3450,12 +3590,9 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) * see what happens. */ GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), ("The server closed the connection.")); - gst_rtsp_connection_close (src->connection); - res = - gst_rtsp_connection_connect (src->connection, src->ptcp_timeout); - if (res < 0) + if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) < 0) goto connect_error; - src->connected = TRUE; + continue; default: goto receive_error; @@ -3464,7 +3601,9 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) switch (message.type) { case GST_RTSP_MESSAGE_REQUEST: /* server sends us a request message, handle it */ - res = gst_rtspsrc_handle_request (src, &message); + res = + gst_rtspsrc_handle_request (src, src->conninfo.connection, + &message); if (res == GST_RTSP_EEOF) goto server_eof; else if (res < 0) @@ -3574,7 +3713,7 @@ connect_error: { gchar *str = gst_rtsp_strresult (res); - src->connected = FALSE; + src->conninfo.connected = FALSE; GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), ("Could not connect to server. (%s)", str)); g_free (str); @@ -3603,7 +3742,7 @@ server_eof: GST_DEBUG_OBJECT (src, "we got an eof from the server"); GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), ("The server closed the connection.")); - src->connected = FALSE; + src->conninfo.connected = FALSE; gst_rtsp_message_unset (&message); return GST_FLOW_UNEXPECTED; } @@ -3616,12 +3755,10 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) src->loop_cmd = cmd; if (flush) { GST_DEBUG_OBJECT (src, "start connection flush"); - if (src->connection) - gst_rtsp_connection_flush (src->connection, TRUE); + gst_rtspsrc_connection_flush (src, TRUE); } else { GST_DEBUG_OBJECT (src, "stop connection flush"); - if (src->connection) - gst_rtsp_connection_flush (src->connection, FALSE); + gst_rtspsrc_connection_flush (src, FALSE); } GST_OBJECT_UNLOCK (src); } @@ -3869,12 +4006,15 @@ gst_rtspsrc_setup_auth (GstRTSPSrc * src, GstRTSPMessage * response) GstRTSPAuthMethod method; GstRTSPResult auth_result; GstRTSPUrl *url; + GstRTSPConnection *conn; gchar *hdr; + conn = src->conninfo.connection; + /* Identify the available auth methods and see if any are supported */ if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_WWW_AUTHENTICATE, &hdr, 0) == GST_RTSP_OK) { - gst_rtspsrc_parse_auth_hdr (hdr, &avail_methods, src->connection); + gst_rtspsrc_parse_auth_hdr (hdr, &avail_methods, conn); } if (avail_methods == GST_RTSP_AUTH_NONE) @@ -3884,7 +4024,7 @@ gst_rtspsrc_setup_auth (GstRTSPSrc * src, GstRTSPMessage * response) * data are stale, we just update them in the connection object and * return TRUE to retry the request */ - url = gst_rtsp_connection_get_url (src->connection); + url = gst_rtsp_connection_get_url (conn); /* Do we have username and password available? */ if (url != NULL && !src->tried_url_auth && url->user != NULL @@ -3917,8 +4057,7 @@ gst_rtspsrc_setup_auth (GstRTSPSrc * src, GstRTSPMessage * response) continue; /* Pass the credentials to the connection to try on the next request */ - auth_result = - gst_rtsp_connection_set_auth (src->connection, method, user, pass); + auth_result = gst_rtsp_connection_set_auth (conn, method, user, pass); /* INVAL indicates an invalid username/passwd were supplied, so we'll just * ignore it and end up retrying later */ if (auth_result == GST_RTSP_OK || auth_result == GST_RTSP_EINVAL) { @@ -3950,8 +4089,9 @@ no_user_pass: } static GstRTSPResult -gst_rtspsrc_try_send (GstRTSPSrc * src, GstRTSPMessage * request, - GstRTSPMessage * response, GstRTSPStatusCode * code) +gst_rtspsrc_try_send (GstRTSPSrc * src, GstRTSPConnection * conn, + GstRTSPMessage * request, GstRTSPMessage * response, + GstRTSPStatusCode * code) { GstRTSPResult res; GstRTSPStatusCode thecode; @@ -3966,14 +4106,14 @@ again: if (src->debug) gst_rtsp_message_dump (request); - res = gst_rtspsrc_connection_send (src, request, src->ptcp_timeout); + res = gst_rtspsrc_connection_send (src, conn, request, src->ptcp_timeout); if (res < 0) goto send_error; - gst_rtsp_connection_reset_timeout (src->connection); + gst_rtsp_connection_reset_timeout (conn); next: - res = gst_rtspsrc_connection_receive (src, response, src->ptcp_timeout); + res = gst_rtspsrc_connection_receive (src, conn, response, src->ptcp_timeout); if (res < 0) goto receive_error; @@ -3982,7 +4122,7 @@ next: switch (response->type) { case GST_RTSP_MESSAGE_REQUEST: - res = gst_rtspsrc_handle_request (src, response); + res = gst_rtspsrc_handle_request (src, conn, response); if (res == GST_RTSP_EEOF) goto server_eof; else if (res < 0) @@ -4039,15 +4179,10 @@ receive_error: case GST_RTSP_EEOF: GST_WARNING_OBJECT (src, "server closed connection, doing reconnect"); if (try == 0) { - gst_rtsp_connection_close (src->connection); try++; /* if reconnect succeeds, try again */ - if ((res = - gst_rtsp_connection_connect (src->connection, - src->ptcp_timeout)) == 0) + if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) == 0) goto again; - - src->connected = FALSE; } /* only try once after reconnect, then fallthrough and error out */ default: @@ -4081,6 +4216,7 @@ server_eof: /** * gst_rtspsrc_send: * @src: the rtsp source + * @conn: the connection to send on * @request: must point to a valid request * @response: must point to an empty #GstRTSPMessage * @code: an optional code result @@ -4089,7 +4225,7 @@ server_eof: * non-NULL in which case it will contain the status code of the response. * * If This function returns #GST_RTSP_OK, @response will contain a valid response - * message that should be cleaned with gst_rtsp_message_unset() after usage. + * message that should be cleaned with gst_rtsp_message_unset() after usage. * * If @code is NULL, this function will return #GST_RTSP_ERROR (with an invalid * @response message) if the response code was not 200 (OK). @@ -4101,8 +4237,9 @@ server_eof: * Returns: #GST_RTSP_OK if the processing was successful. */ static GstRTSPResult -gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPMessage * request, - GstRTSPMessage * response, GstRTSPStatusCode * code) +gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPConnection * conn, + GstRTSPMessage * request, GstRTSPMessage * response, + GstRTSPStatusCode * code) { GstRTSPStatusCode int_code = GST_RTSP_STS_OK; GstRTSPResult res = GST_RTSP_ERROR; @@ -4121,7 +4258,8 @@ gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPMessage * request, /* save method so we can disable it when the server complains */ method = request->type_data.request.method; - if ((res = gst_rtspsrc_try_send (src, request, response, &int_code)) < 0) + if ((res = + gst_rtspsrc_try_send (src, conn, request, response, &int_code)) < 0) goto error; switch (int_code) { @@ -4179,16 +4317,16 @@ error_response: GST_DEBUG_OBJECT (src, "redirection to %s", new_location); /* save current transports */ - if (src->url) - transports = src->url->transports; + if (src->conninfo.url) + transports = src->conninfo.url->transports; else transports = GST_RTSP_LOWER_TRANS_UNKNOWN; gst_rtspsrc_uri_set_uri (GST_URI_HANDLER (src), new_location); /* set old transports */ - if (src->url && transports != GST_RTSP_LOWER_TRANS_UNKNOWN) - src->url->transports = transports; + if (src->conninfo.url && transports != GST_RTSP_LOWER_TRANS_UNKNOWN) + src->conninfo.url->transports = transports; src->need_redirect = TRUE; src->state = GST_RTSP_STATE_INIT; @@ -4221,7 +4359,8 @@ static GstRTSPResult gst_rtspsrc_send_cb (GstRTSPExtension * ext, GstRTSPMessage * request, GstRTSPMessage * response, GstRTSPSrc * src) { - return gst_rtspsrc_send (src, request, response, NULL); + return gst_rtspsrc_send (src, src->conninfo.connection, request, response, + NULL); } @@ -4478,7 +4617,7 @@ gst_rtspsrc_stream_is_real_media (GstRTSPStream * stream) return res; } -/* Perform the SETUP request for all the streams. +/* Perform the SETUP request for all the streams. * * We ask the server for a specific transport, which initially includes all the * ones we can support (UDP/TCP/MULTICAST). For the UDP transport we allocate @@ -4505,11 +4644,15 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src) GstRTSPUrl *url; gchar *hval; - url = gst_rtsp_connection_get_url (src->connection); - - /* we initially allow all configured lower transports. based on the URL - * transports and the replies from the server we narrow them down. */ - protocols = url->transports & src->cur_protocols; + if (src->conninfo.connection) { + url = gst_rtsp_connection_get_url (src->conninfo.connection); + /* we initially allow all configured lower transports. based on the URL + * transports and the replies from the server we narrow them down. */ + protocols = url->transports & src->cur_protocols; + } else { + url = NULL; + protocols = src->cur_protocols; + } if (protocols == 0) goto no_protocols; @@ -4521,6 +4664,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src) rtpport = rtcpport = 0; for (walk = src->streams; walk; walk = g_list_next (walk)) { + GstRTSPConnection *conn; gchar *transports; gint retry = 0; guint mask = 0; @@ -4556,13 +4700,22 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src) } /* skip setup if we have no URL for it */ - if (stream->setup_url == NULL) { + if (stream->conninfo.location == NULL) { GST_DEBUG_OBJECT (src, "skipping stream %p, no setup", stream); continue; } + if (src->conninfo.connection == NULL) { + if (!gst_rtsp_conninfo_connect (src, &stream->conninfo)) { + GST_DEBUG_OBJECT (src, "skipping stream %p, failed to connect", stream); + continue; + } + conn = stream->conninfo.connection; + } else { + conn = src->conninfo.connection; + } GST_DEBUG_OBJECT (src, "doing setup of stream %p with %s", stream, - stream->setup_url); + stream->conninfo.location); next_protocol: /* first selectable protocol */ @@ -4604,7 +4757,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src) /* create SETUP request */ res = gst_rtsp_message_init_request (&request, GST_RTSP_SETUP, - stream->setup_url); + stream->conninfo.location); if (res < 0) { g_free (transports); goto create_request_failed; @@ -4623,7 +4776,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src) } /* handle the code ourselves */ - if ((res = gst_rtspsrc_send (src, &request, &response, &code) < 0)) + if ((res = gst_rtspsrc_send (src, conn, &request, &response, &code) < 0)) goto send_error; switch (code) { @@ -4884,27 +5037,29 @@ gst_rtspsrc_parse_range (GstRTSPSrc * src, const gchar * range, return TRUE; } +/* must be called with the RTSP state lock */ static gboolean -gst_rtspsrc_from_sdp (GstRTSPSrc * src, guint8 * data, guint size) +gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp) { - GstSDPMessage sdp = { 0 }; gint i, n_streams; - GST_DEBUG_OBJECT (src, "parse SDP..."); - gst_sdp_message_init (&sdp); - gst_sdp_message_parse_buffer (data, size, &sdp); + /* prepare global stream caps properties */ + if (src->props) + gst_structure_remove_all_fields (src->props); + else + src->props = gst_structure_empty_new ("RTSPProperties"); if (src->debug) - gst_sdp_message_dump (&sdp); + gst_sdp_message_dump (sdp); - gst_rtsp_ext_list_parse_sdp (src->extensions, &sdp, src->props); + gst_rtsp_ext_list_parse_sdp (src->extensions, sdp, src->props); /* parse range for duration reporting. */ { const gchar *range; for (i = 0;; i++) { - range = gst_sdp_message_get_attribute_val_n (&sdp, "range", i); + range = gst_sdp_message_get_attribute_val_n (sdp, "range", i); if (range == NULL) break; @@ -4913,12 +5068,14 @@ gst_rtspsrc_from_sdp (GstRTSPSrc * src, guint8 * data, guint size) break; } } - /* try to find a global control attribute */ + /* try to find a global control attribute. Note that a '*' means that we should + * do aggregate control with the current url (so we don't do anything and + * leave the current connection as is) */ { const gchar *control; for (i = 0;; i++) { - control = gst_sdp_message_get_attribute_val_n (&sdp, "control", i); + control = gst_sdp_message_get_attribute_val_n (sdp, "control", i); if (control == NULL) break; @@ -4926,14 +5083,25 @@ gst_rtspsrc_from_sdp (GstRTSPSrc * src, guint8 * data, guint size) if (g_str_has_prefix (control, "rtsp://")) break; } + if (control) { + g_free (src->conninfo.location); + src->conninfo.location = g_strdup (control); + /* make a connection for this, if there was a connection already, nothing + * happens. */ + if (gst_rtsp_conninfo_connect (src, &src->conninfo) < 0) { + GST_ERROR_OBJECT (src, "could not connect"); + } + } + /* we need to keep the control url separate from the connection url because + * the rules for constructing the media control url need it */ g_free (src->control); src->control = g_strdup (control); } /* create streams */ - n_streams = gst_sdp_message_medias_len (&sdp); + n_streams = gst_sdp_message_medias_len (sdp); for (i = 0; i < n_streams; i++) { - gst_rtspsrc_create_stream (src, &sdp, i); + gst_rtspsrc_create_stream (src, sdp, i); } src->state = GST_RTSP_STATE_INIT; @@ -4942,21 +5110,25 @@ gst_rtspsrc_from_sdp (GstRTSPSrc * src, guint8 * data, guint size) if (!gst_rtspsrc_setup_streams (src)) goto setup_failed; - src->state = GST_RTSP_STATE_READY; + /* reset our state */ + gst_segment_init (&src->segment, GST_FORMAT_TIME); + src->need_range = TRUE; + src->skip = FALSE; - gst_sdp_message_uninit (&sdp); + src->state = GST_RTSP_STATE_READY; return TRUE; + /* ERRORS */ setup_failed: { - gst_sdp_message_uninit (&sdp); + GST_ERROR_OBJECT (src, "setup failed"); return FALSE; } } static gboolean -gst_rtspsrc_open (GstRTSPSrc * src) +gst_rtspsrc_retrieve_sdp (GstRTSPSrc * src, GstSDPMessage ** sdp) { GstRTSPResult res; GstRTSPMessage request = { 0 }; @@ -4964,58 +5136,32 @@ gst_rtspsrc_open (GstRTSPSrc * src) guint8 *data; guint size; gchar *respcont = NULL; - GstRTSPUrl *url; GST_RTSP_STATE_LOCK (src); restart: - /* reset our state */ - gst_segment_init (&src->segment, GST_FORMAT_TIME); - src->need_range = TRUE; src->need_redirect = FALSE; - src->skip = FALSE; /* can't continue without a valid url */ - if (G_UNLIKELY (src->url == NULL)) + if (G_UNLIKELY (src->conninfo.url == NULL)) goto no_url; src->tried_url_auth = FALSE; - /* create connection */ - GST_DEBUG_OBJECT (src, "creating connection (%s)...", src->req_location); - if ((res = gst_rtsp_connection_create (src->url, &src->connection)) < 0) - goto could_not_create; - - url = gst_rtsp_connection_get_url (src->connection); - - if (url->transports & GST_RTSP_LOWER_TRANS_HTTP) - gst_rtsp_connection_set_tunneled (src->connection, TRUE); - - if (src->proxy_host) { - GST_DEBUG_OBJECT (src, "setting proxy %s:%d", src->proxy_host, - src->proxy_port); - gst_rtsp_connection_set_proxy (src->connection, src->proxy_host, - src->proxy_port); - } - - /* connect */ - GST_DEBUG_OBJECT (src, "connecting (%s)...", src->req_location); - if ((res = - gst_rtsp_connection_connect (src->connection, src->ptcp_timeout)) < 0) - goto could_not_connect; - - src->connected = TRUE; + if ((res = gst_rtsp_conninfo_connect (src, &src->conninfo)) < 0) + goto connect_failed; /* create OPTIONS */ GST_DEBUG_OBJECT (src, "create options..."); res = gst_rtsp_message_init_request (&request, GST_RTSP_OPTIONS, - src->req_location); + src->conninfo.url_str); if (res < 0) goto create_request_failed; /* send OPTIONS */ GST_DEBUG_OBJECT (src, "send options..."); - if (gst_rtspsrc_send (src, &request, &response, NULL) < 0) + if (gst_rtspsrc_send (src, src->conninfo.connection, &request, &response, + NULL) < 0) goto send_error; /* parse OPTIONS */ @@ -5026,7 +5172,7 @@ restart: GST_DEBUG_OBJECT (src, "create describe..."); res = gst_rtsp_message_init_request (&request, GST_RTSP_DESCRIBE, - src->req_location); + src->conninfo.url_str); if (res < 0) goto create_request_failed; @@ -5034,25 +5180,17 @@ restart: gst_rtsp_message_add_header (&request, GST_RTSP_HDR_ACCEPT, "application/sdp"); - /* prepare global stream caps properties */ - if (src->props) - gst_structure_remove_all_fields (src->props); - else - src->props = gst_structure_empty_new ("RTSPProperties"); - /* send DESCRIBE */ GST_DEBUG_OBJECT (src, "send describe..."); - if (gst_rtspsrc_send (src, &request, &response, NULL) < 0) + if (gst_rtspsrc_send (src, src->conninfo.connection, &request, &response, + NULL) < 0) goto send_error; /* we only perform redirect for the describe, currently */ if (src->need_redirect) { /* close connection, we don't have to send a TEARDOWN yet, ignore the * result. */ - gst_rtsp_connection_close (src->connection); - gst_rtsp_connection_free (src->connection); - src->connection = NULL; - src->connected = FALSE; + gst_rtsp_conninfo_close (src, &src->conninfo, TRUE); gst_rtsp_message_unset (&request); gst_rtsp_message_unset (&response); @@ -5077,12 +5215,12 @@ restart: /* get message body and parse as SDP */ gst_rtsp_message_get_body (&response, &data, &size); - - if (data == NULL) + if (data == NULL || size == 0) goto no_describe; - if (!gst_rtspsrc_from_sdp (src, data, size)) - goto sdp_failed; + GST_DEBUG_OBJECT (src, "parse SDP..."); + gst_sdp_message_new (sdp); + gst_sdp_message_parse_buffer (data, size, *sdp); /* clean up any messages */ gst_rtsp_message_unset (&request); @@ -5097,21 +5235,12 @@ no_url: ("No valid RTSP URL was provided")); goto cleanup_error; } -could_not_create: +connect_failed: { gchar *str = gst_rtsp_strresult (res); GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), - ("Could not create connection. (%s)", str)); - g_free (str); - goto cleanup_error; - } -could_not_connect: - { - gchar *str = gst_rtsp_strresult (res); - - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), - ("Could not connect to server. (%s)", str)); + ("Failed to connect. (%s)", str)); g_free (str); goto cleanup_error; } @@ -5147,19 +5276,11 @@ no_describe: ("Server can not provide an SDP.")); goto cleanup_error; } -sdp_failed: - { - gst_rtspsrc_close (src); - /* error was posted */ - goto cleanup_error; - } cleanup_error: { - if (src->connection) { + if (src->conninfo.connection) { GST_DEBUG_OBJECT (src, "free connection"); - gst_rtsp_connection_free (src->connection); - src->connection = NULL; - src->connected = FALSE; + gst_rtsp_conninfo_close (src, &src->conninfo, TRUE); } GST_RTSP_STATE_UNLOCK (src); gst_rtsp_message_unset (&request); @@ -5168,6 +5289,37 @@ cleanup_error: } } +static gboolean +gst_rtspsrc_open (GstRTSPSrc * src) +{ + gboolean res; + + src->methods = + GST_RTSP_SETUP | GST_RTSP_PLAY | GST_RTSP_PAUSE | GST_RTSP_TEARDOWN; + + if (src->sdp == NULL) { + if (!(res = gst_rtspsrc_retrieve_sdp (src, &src->sdp))) + goto no_sdp; + } + + if (!(res = gst_rtspsrc_open_from_sdp (src, src->sdp))) + goto open_failed; + + return res; + + /* ERRORS */ +no_sdp: + { + GST_ERROR_OBJECT (src, "can't get sdp"); + return FALSE; + } +open_failed: + { + GST_ERROR_OBJECT (src, "can't setup streaming from sdp"); + return FALSE; + } +} + #if 0 static gboolean gst_rtspsrc_async_open (GstRTSPSrc * src) @@ -5185,6 +5337,7 @@ gst_rtspsrc_async_open (GstRTSPSrc * src) } #endif + static gboolean gst_rtspsrc_close (GstRTSPSrc * src) { @@ -5222,16 +5375,9 @@ gst_rtspsrc_close (GstRTSPSrc * src) GST_RTSP_STATE_LOCK (src); } - if (!src->connection) - goto done; + /* make sure we're not flushing anymore */ + gst_rtspsrc_connection_flush (src, FALSE); - GST_DEBUG_OBJECT (src, "stop connection flush"); - gst_rtsp_connection_flush (src->connection, FALSE); - - if (!src->connected) { - GST_DEBUG_OBJECT (src, "not connected, doing cleanup"); - goto close; - } if (src->state < GST_RTSP_STATE_READY) { GST_DEBUG_OBJECT (src, "not ready, doing cleanup"); goto close; @@ -5241,7 +5387,7 @@ gst_rtspsrc_close (GstRTSPSrc * src) if (src->control) control = src->control; else - control = src->req_location; + control = src->conninfo.url_str; if (!(src->methods & (GST_RTSP_PLAY | GST_RTSP_TEARDOWN))) goto not_supported; @@ -5249,41 +5395,52 @@ gst_rtspsrc_close (GstRTSPSrc * src) for (walk = src->streams; walk; walk = g_list_next (walk)) { GstRTSPStream *stream = (GstRTSPStream *) walk->data; gchar *setup_url; + GstRTSPConnInfo *info; /* try aggregate control first but do non-aggregate control otherwise */ if (control) setup_url = control; - else if ((setup_url = stream->setup_url) == NULL) + else if ((setup_url = stream->conninfo.location) == NULL) continue; + if (src->conninfo.connection) { + info = &src->conninfo; + } else if (stream->conninfo.connection) { + info = &stream->conninfo; + } else { + continue; + } + if (!info->connected) + goto next; + /* do TEARDOWN */ res = gst_rtsp_message_init_request (&request, GST_RTSP_TEARDOWN, setup_url); if (res < 0) goto create_request_failed; - if (gst_rtspsrc_send (src, &request, &response, NULL) < 0) + if (gst_rtspsrc_send (src, info->connection, &request, &response, NULL) < 0) goto send_error; /* FIXME, parse result? */ gst_rtsp_message_unset (&request); gst_rtsp_message_unset (&response); + next: /* early exit when we did aggregate control */ if (control) break; } close: - /* close connection */ + /* close connections */ GST_DEBUG_OBJECT (src, "closing connection..."); - gst_rtsp_connection_close (src->connection); - /* free connection */ - gst_rtsp_connection_free (src->connection); - src->connection = NULL; - src->connected = FALSE; + gst_rtsp_conninfo_close (src, &src->conninfo, TRUE); + for (walk = src->streams; walk; walk = g_list_next (walk)) { + GstRTSPStream *stream = (GstRTSPStream *) walk->data; + gst_rtsp_conninfo_close (src, &stream->conninfo, TRUE); + } -done: /* cleanup */ gst_rtspsrc_cleanup (src); @@ -5455,7 +5612,7 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment) if (src->state == GST_RTSP_STATE_PLAYING) goto was_playing; - if (!src->connection || !src->connected) + if (!src->conninfo.connection || !src->conninfo.connected) goto done; /* waiting for connection idle, we were flushing so any attempt at doing data @@ -5466,24 +5623,33 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment) GST_RTSP_CONN_UNLOCK (src); GST_DEBUG_OBJECT (src, "stop connection flush"); - gst_rtsp_connection_flush (src->connection, FALSE); + gst_rtspsrc_connection_flush (src, FALSE); /* construct a control url */ if (src->control) control = src->control; else - control = src->req_location; + control = src->conninfo.url_str; for (walk = src->streams; walk; walk = g_list_next (walk)) { GstRTSPStream *stream = (GstRTSPStream *) walk->data; gchar *setup_url; + GstRTSPConnection *conn; /* try aggregate control first but do non-aggregate control otherwise */ if (control) setup_url = control; - else if ((setup_url = stream->setup_url) == NULL) + else if ((setup_url = stream->conninfo.location) == NULL) continue; + if (src->conninfo.connection) { + conn = src->conninfo.connection; + } else if (stream->conninfo.connection) { + conn = stream->conninfo.connection; + } else { + continue; + } + /* do play */ res = gst_rtsp_message_init_request (&request, GST_RTSP_PLAY, setup_url); if (res < 0) @@ -5506,7 +5672,7 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment) g_free (hval); } - if (gst_rtspsrc_send (src, &request, &response, NULL) < 0) + if (gst_rtspsrc_send (src, conn, &request, &response, NULL) < 0) goto send_error; gst_rtsp_message_unset (&request); @@ -5621,34 +5787,43 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle) GST_DEBUG_OBJECT (src, "connection is idle now"); GST_RTSP_CONN_UNLOCK (src); - if (!src->connection || !src->connected) + if (!src->conninfo.connection || !src->conninfo.connected) goto no_connection; GST_DEBUG_OBJECT (src, "stop connection flush"); - gst_rtsp_connection_flush (src->connection, FALSE); + gst_rtspsrc_connection_flush (src, FALSE); /* construct a control url */ if (src->control) control = src->control; else - control = src->req_location; + control = src->conninfo.url_str; /* loop over the streams. We might exit the loop early when we could do an * aggregate control */ for (walk = src->streams; walk; walk = g_list_next (walk)) { GstRTSPStream *stream = (GstRTSPStream *) walk->data; + GstRTSPConnection *conn; gchar *setup_url; /* try aggregate control first but do non-aggregate control otherwise */ if (control) setup_url = control; - else if ((setup_url = stream->setup_url) == NULL) + else if ((setup_url = stream->conninfo.location) == NULL) continue; + if (src->conninfo.connection) { + conn = src->conninfo.connection; + } else if (stream->conninfo.connection) { + conn = stream->conninfo.connection; + } else { + continue; + } + if (gst_rtsp_message_init_request (&request, GST_RTSP_PAUSE, setup_url) < 0) goto create_request_failed; - if (gst_rtspsrc_send (src, &request, &response, NULL) < 0) + if (gst_rtspsrc_send (src, conn, &request, &response, NULL) < 0) goto send_error; gst_rtsp_message_unset (&request); @@ -5874,7 +6049,7 @@ gst_rtspsrc_uri_get_uri (GstURIHandler * handler) GstRTSPSrc *src = GST_RTSPSRC (handler); /* should not dup */ - return src->location; + return src->conninfo.location; } static gboolean @@ -5887,7 +6062,7 @@ gst_rtspsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri) src = GST_RTSPSRC (handler); /* same URI, we're fine */ - if (src->location && uri && !strcmp (uri, src->location)) + if (src->conninfo.location && uri && !strcmp (uri, src->conninfo.location)) goto was_ok; /* try to parse */ @@ -5896,16 +6071,16 @@ gst_rtspsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri) /* if worked, free previous and store new url object along with the original * location. */ - gst_rtsp_url_free (src->url); - src->url = newurl; - g_free (src->location); - g_free (src->req_location); - src->location = g_strdup (uri); - src->req_location = gst_rtsp_url_get_request_uri (src->url); + gst_rtsp_url_free (src->conninfo.url); + src->conninfo.url = newurl; + g_free (src->conninfo.location); + src->conninfo.location = g_strdup (uri); + g_free (src->conninfo.url_str); + src->conninfo.url_str = gst_rtsp_url_get_request_uri (src->conninfo.url); GST_DEBUG_OBJECT (src, "set uri: %s", GST_STR_NULL (uri)); GST_DEBUG_OBJECT (src, "request uri is: %s", - GST_STR_NULL (src->req_location)); + GST_STR_NULL (src->conninfo.url_str)); return TRUE; diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index a0fdf34170..be8b28e37e 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -83,6 +83,16 @@ typedef struct _GstRTSPSrcClass GstRTSPSrcClass; #define GST_RTSP_CONN_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_CONN_GET_LOCK(rtsp))) #define GST_RTSP_CONN_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_CONN_GET_LOCK(rtsp))) +typedef struct _GstRTSPConnInfo GstRTSPConnInfo; + +struct _GstRTSPConnInfo { + gchar *location; + GstRTSPUrl *url; + gchar *url_str; + GstRTSPConnection *connection; + gboolean connected; +}; + typedef struct _GstRTSPStream GstRTSPStream; struct _GstRTSPStream { @@ -121,12 +131,13 @@ struct _GstRTSPStream { gboolean container; /* original control url */ gchar *control_url; - /* fully qualified control url */ - gchar *setup_url; guint32 ssrc; guint32 seqbase; guint64 timebase; + /* per stream connection */ + GstRTSPConnInfo conninfo; + /* bandwidth */ guint as_bandwidth; guint rs_bandwidth; @@ -176,15 +187,13 @@ struct _GstRTSPSrc { /* mutex for protecting the connection */ GStaticRecMutex *conn_rec_lock; + GstSDPMessage *sdp; gint numstreams; GList *streams; GstStructure *props; gboolean need_activate; /* properties */ - gchar *location; - gchar *req_location; /* Sanitised URL to use in network requests */ - GstRTSPUrl *url; GstRTSPLowerTrans protocols; gboolean debug; guint retry; @@ -222,8 +231,7 @@ struct _GstRTSPSrc { gulong session_sig_id; gulong session_ptmap_id; - GstRTSPConnection *connection; - gboolean connected; + GstRTSPConnInfo conninfo; /* a list of RTSP extensions as GstElement */ GstRTSPExtensionList *extensions;