/* GStreamer * Copyright (C) <2005-2009> Wim Taymans * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ /* * Unless otherwise indicated, Source Code is licensed under MIT license. * See further explanation attached in License Statement (distributed in the file * LICENSE). * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies * of the Software, and to permit persons to whom the Software is furnished to do * so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ /** * SECTION:gstrtspconnection * @title: GstRTSPConnection * @short_description: manage RTSP connections * @see_also: gstrtspurl * * This object manages the RTSP connection to the server. It provides function * to receive and send bytes and messages. */ #ifdef HAVE_CONFIG_H # include #endif #include #include #include #include #include /* we include this here to get the G_OS_* defines */ #include #include #include /* necessary for IP_TOS define */ #include #include "gstrtspconnection.h" #ifdef IP_TOS union gst_sockaddr { struct sockaddr sa; struct sockaddr_in sa_in; struct sockaddr_in6 sa_in6; struct sockaddr_storage sa_stor; }; #endif typedef struct { gint state; guint save; guchar out[3]; /* the size must be evenly divisible by 3 */ guint cout; guint coutl; } DecodeCtx; typedef struct { /* If %TRUE we only own data and none of the * other fields */ gboolean borrowed; /* Header or full message */ guint8 *data; guint data_size; gboolean data_is_data_header; /* Payload following data, if any */ guint8 *body_data; guint body_data_size; /* or */ GstBuffer *body_buffer; /* DATA packet header statically allocated for above */ guint8 data_header[4]; /* all below only for async writing */ guint data_offset; /* == data_size when done */ guint body_offset; /* into body_data or the buffer */ /* ID of the message for notification */ guint id; } GstRTSPSerializedMessage; static void gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg) { if (!msg->borrowed) { g_free (msg->body_data); gst_buffer_replace (&msg->body_buffer, NULL); } g_free (msg->data); } #ifdef MSG_NOSIGNAL #define SEND_FLAGS MSG_NOSIGNAL #else #define SEND_FLAGS 0 #endif typedef enum { TUNNEL_STATE_NONE, TUNNEL_STATE_GET, TUNNEL_STATE_POST, TUNNEL_STATE_COMPLETE } GstRTSPTunnelState; #define TUNNELID_LEN 24 struct _GstRTSPConnection { /*< private > */ /* URL for the remote connection */ GstRTSPUrl *url; GstRTSPVersion version; gboolean server; GSocketClient *client; GIOStream *stream0; GIOStream *stream1; GInputStream *input_stream; GOutputStream *output_stream; /* this is a read source we add on the write socket in tunneled mode to be * able to detect when client disconnects the GET channel */ GInputStream *control_stream; /* connection state */ GSocket *read_socket; GSocket *write_socket; GSocket *socket0, *socket1; gboolean manual_http; gboolean may_cancel; GCancellable *cancellable; gchar tunnelid[TUNNELID_LEN]; gboolean tunneled; GstRTSPTunnelState tstate; /* the remote and local ip */ gchar *remote_ip; gchar *local_ip; gint read_ahead; gchar *initial_buffer; gsize initial_buffer_offset; gboolean remember_session_id; /* remember the session id or not */ /* Session state */ gint cseq; /* sequence number */ gchar session_id[512]; /* session id */ gint timeout; /* session timeout in seconds */ GTimer *timer; /* timeout timer */ /* Authentication */ GstRTSPAuthMethod auth_method; gchar *username; gchar *passwd; GHashTable *auth_params; /* TLS */ GTlsDatabase *tls_database; GTlsInteraction *tls_interaction; GstRTSPConnectionAcceptCertificateFunc accept_certificate_func; GDestroyNotify accept_certificate_destroy_notify; gpointer accept_certificate_user_data; DecodeCtx ctx; DecodeCtx *ctxp; gchar *proxy_host; guint proxy_port; }; enum { STATE_START = 0, STATE_DATA_HEADER, STATE_DATA_BODY, STATE_READ_LINES, STATE_END, STATE_LAST }; enum { READ_AHEAD_EOH = -1, /* end of headers */ READ_AHEAD_CRLF = -2, READ_AHEAD_CRLFCR = -3 }; /* a structure for constructing RTSPMessages */ typedef struct { gint state; GstRTSPResult status; guint8 buffer[4096]; guint offset; guint line; guint8 *body_data; glong body_len; } GstRTSPBuilder; /* function prototypes */ static void add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message); static void build_reset (GstRTSPBuilder * builder) { g_free (builder->body_data); memset (builder, 0, sizeof (GstRTSPBuilder)); } static gboolean tls_accept_certificate (GTlsConnection * conn, GTlsCertificate * peer_cert, GTlsCertificateFlags errors, GstRTSPConnection * rtspconn) { GError *error = NULL; gboolean accept = FALSE; if (rtspconn->tls_database) { GSocketConnectable *peer_identity; GTlsCertificateFlags validation_flags; GST_DEBUG ("TLS peer certificate not accepted, checking user database..."); peer_identity = g_tls_client_connection_get_server_identity (G_TLS_CLIENT_CONNECTION (conn)); errors = g_tls_database_verify_chain (rtspconn->tls_database, peer_cert, G_TLS_DATABASE_PURPOSE_AUTHENTICATE_SERVER, peer_identity, g_tls_connection_get_interaction (conn), G_TLS_DATABASE_VERIFY_NONE, NULL, &error); if (error) goto verify_error; validation_flags = gst_rtsp_connection_get_tls_validation_flags (rtspconn); accept = ((errors & validation_flags) == 0); if (accept) GST_DEBUG ("Peer certificate accepted"); else GST_DEBUG ("Peer certificate not accepted (errors: 0x%08X)", errors); } if (!accept && rtspconn->accept_certificate_func) { accept = rtspconn->accept_certificate_func (conn, peer_cert, errors, rtspconn->accept_certificate_user_data); GST_DEBUG ("Peer certificate %saccepted by accept-certificate function", accept ? "" : "not "); } return accept; /* ERRORS */ verify_error: { GST_ERROR ("An error occurred while verifying the peer certificate: %s", error->message); g_clear_error (&error); return FALSE; } } static void socket_client_event (GSocketClient * client, GSocketClientEvent event, GSocketConnectable * connectable, GTlsConnection * connection, GstRTSPConnection * rtspconn) { if (event == G_SOCKET_CLIENT_TLS_HANDSHAKING) { GST_DEBUG ("TLS handshaking about to start..."); g_signal_connect (connection, "accept-certificate", (GCallback) tls_accept_certificate, rtspconn); g_tls_connection_set_interaction (connection, rtspconn->tls_interaction); } } /** * gst_rtsp_connection_create: * @url: a #GstRTSPUrl * @conn: (out) (transfer full): storage for a #GstRTSPConnection * * Create a newly allocated #GstRTSPConnection from @url and store it in @conn. * The connection will not yet attempt to connect to @url, use * gst_rtsp_connection_connect(). * * A copy of @url will be made. * * Returns: #GST_RTSP_OK when @conn contains a valid connection. */ GstRTSPResult gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn) { GstRTSPConnection *newconn; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (url != NULL, GST_RTSP_EINVAL); newconn = g_new0 (GstRTSPConnection, 1); newconn->may_cancel = TRUE; newconn->cancellable = g_cancellable_new (); newconn->client = g_socket_client_new (); if (url->transports & GST_RTSP_LOWER_TRANS_TLS) g_socket_client_set_tls (newconn->client, TRUE); g_signal_connect (newconn->client, "event", (GCallback) socket_client_event, newconn); newconn->url = gst_rtsp_url_copy (url); newconn->timer = g_timer_new (); newconn->timeout = 60; newconn->cseq = 1; newconn->remember_session_id = TRUE; newconn->auth_method = GST_RTSP_AUTH_NONE; newconn->username = NULL; newconn->passwd = NULL; newconn->auth_params = NULL; newconn->version = 0; *conn = newconn; return GST_RTSP_OK; } static gboolean collect_addresses (GSocket * socket, gchar ** ip, guint16 * port, gboolean remote, GError ** error) { GSocketAddress *addr; if (remote) addr = g_socket_get_remote_address (socket, error); else addr = g_socket_get_local_address (socket, error); if (!addr) return FALSE; if (ip) *ip = g_inet_address_to_string (g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (addr))); if (port) *port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)); g_object_unref (addr); return TRUE; } /** * gst_rtsp_connection_create_from_socket: * @socket: a #GSocket * @ip: the IP address of the other end * @port: the port used by the other end * @initial_buffer: data already read from @fd * @conn: (out) (transfer full): storage for a #GstRTSPConnection * * Create a new #GstRTSPConnection for handling communication on the existing * socket @socket. The @initial_buffer contains zero terminated data already * read from @socket which should be used before starting to read new data. * * Returns: #GST_RTSP_OK when @conn contains a valid connection. */ /* FIXME 2.0 We don't need the ip and port since they can be got from the * GSocket */ GstRTSPResult gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip, guint16 port, const gchar * initial_buffer, GstRTSPConnection ** conn) { GstRTSPConnection *newconn = NULL; GstRTSPUrl *url; GstRTSPResult res; GError *err = NULL; gchar *local_ip; GIOStream *stream; g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL); g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err)) goto getnameinfo_failed; /* create a url for the client address */ url = g_new0 (GstRTSPUrl, 1); url->host = g_strdup (ip); url->port = port; /* now create the connection object */ GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed); gst_rtsp_url_free (url); stream = G_IO_STREAM (g_socket_connection_factory_create_connection (socket)); /* both read and write initially */ newconn->server = TRUE; newconn->socket0 = socket; newconn->stream0 = stream; newconn->write_socket = newconn->read_socket = newconn->socket0; newconn->input_stream = g_io_stream_get_input_stream (stream); newconn->output_stream = g_io_stream_get_output_stream (stream); newconn->control_stream = NULL; newconn->remote_ip = g_strdup (ip); newconn->local_ip = local_ip; newconn->initial_buffer = g_strdup (initial_buffer); *conn = newconn; return GST_RTSP_OK; /* ERRORS */ getnameinfo_failed: { GST_ERROR ("failed to get local address: %s", err->message); g_clear_error (&err); return GST_RTSP_ERROR; } newconn_failed: { GST_ERROR ("failed to make connection"); g_free (local_ip); gst_rtsp_url_free (url); return res; } } /** * gst_rtsp_connection_accept: * @socket: a socket * @conn: (out) (transfer full): storage for a #GstRTSPConnection * @cancellable: a #GCancellable to cancel the operation * * Accept a new connection on @socket and create a new #GstRTSPConnection for * handling communication on new socket. * * Returns: #GST_RTSP_OK when @conn contains a valid connection. */ GstRTSPResult gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn, GCancellable * cancellable) { GError *err = NULL; gchar *ip; guint16 port; GSocket *client_sock; GstRTSPResult ret; g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL); g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); client_sock = g_socket_accept (socket, cancellable, &err); if (!client_sock) goto accept_failed; /* get the remote ip address and port */ if (!collect_addresses (client_sock, &ip, &port, TRUE, &err)) goto getnameinfo_failed; ret = gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL, conn); g_object_unref (client_sock); g_free (ip); return ret; /* ERRORS */ accept_failed: { GST_DEBUG ("Accepting client failed: %s", err->message); g_clear_error (&err); return GST_RTSP_ESYS; } getnameinfo_failed: { GST_DEBUG ("getnameinfo failed: %s", err->message); g_clear_error (&err); if (!g_socket_close (client_sock, &err)) { GST_DEBUG ("Closing socket failed: %s", err->message); g_clear_error (&err); } g_object_unref (client_sock); return GST_RTSP_ERROR; } } /** * gst_rtsp_connection_get_tls: * @conn: a #GstRTSPConnection * @error: #GError for error reporting, or NULL to ignore. * * Get the TLS connection of @conn. * * For client side this will return the #GTlsClientConnection when connected * over TLS. * * For server side connections, this function will create a GTlsServerConnection * when called the first time and will return that same connection on subsequent * calls. The server is then responsible for configuring the TLS connection. * * Returns: (transfer none): the TLS connection for @conn. * * Since: 1.2 */ GTlsConnection * gst_rtsp_connection_get_tls (GstRTSPConnection * conn, GError ** error) { GTlsConnection *result; if (G_IS_TLS_CONNECTION (conn->stream0)) { /* we already had one, return it */ result = G_TLS_CONNECTION (conn->stream0); } else if (conn->server) { /* no TLS connection but we are server, make one */ result = (GTlsConnection *) g_tls_server_connection_new (conn->stream0, NULL, error); if (result) { g_object_unref (conn->stream0); conn->stream0 = G_IO_STREAM (result); conn->input_stream = g_io_stream_get_input_stream (conn->stream0); conn->output_stream = g_io_stream_get_output_stream (conn->stream0); } } else { /* client */ result = NULL; g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_FAILED, "client not connected with TLS"); } return result; } /** * gst_rtsp_connection_set_tls_validation_flags: * @conn: a #GstRTSPConnection * @flags: the validation flags. * * Sets the TLS validation flags to be used to verify the peer * certificate when a TLS connection is established. * * Returns: TRUE if the validation flags are set correctly, or FALSE if * @conn is NULL or is not a TLS connection. * * Since: 1.2.1 */ gboolean gst_rtsp_connection_set_tls_validation_flags (GstRTSPConnection * conn, GTlsCertificateFlags flags) { gboolean res = FALSE; g_return_val_if_fail (conn != NULL, FALSE); res = g_socket_client_get_tls (conn->client); if (res) g_socket_client_set_tls_validation_flags (conn->client, flags); return res; } /** * gst_rtsp_connection_get_tls_validation_flags: * @conn: a #GstRTSPConnection * * Gets the TLS validation flags used to verify the peer certificate * when a TLS connection is established. * * Returns: the validationg flags. * * Since: 1.2.1 */ GTlsCertificateFlags gst_rtsp_connection_get_tls_validation_flags (GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, 0); return g_socket_client_get_tls_validation_flags (conn->client); } /** * gst_rtsp_connection_set_tls_database: * @conn: a #GstRTSPConnection * @database: a #GTlsDatabase * * Sets the anchor certificate authorities database. This certificate * database will be used to verify the server's certificate in case it * can't be verified with the default certificate database first. * * Since: 1.4 */ void gst_rtsp_connection_set_tls_database (GstRTSPConnection * conn, GTlsDatabase * database) { GTlsDatabase *old_db; g_return_if_fail (conn != NULL); if (database) g_object_ref (database); old_db = conn->tls_database; conn->tls_database = database; if (old_db) g_object_unref (old_db); } /** * gst_rtsp_connection_get_tls_database: * @conn: a #GstRTSPConnection * * Gets the anchor certificate authorities database that will be used * after a server certificate can't be verified with the default * certificate database. * * Returns: (transfer full): the anchor certificate authorities database, or NULL if no * database has been previously set. Use g_object_unref() to release the * certificate database. * * Since: 1.4 */ GTlsDatabase * gst_rtsp_connection_get_tls_database (GstRTSPConnection * conn) { GTlsDatabase *result; g_return_val_if_fail (conn != NULL, NULL); if ((result = conn->tls_database)) g_object_ref (result); return result; } /** * gst_rtsp_connection_set_tls_interaction: * @conn: a #GstRTSPConnection * @interaction: a #GTlsInteraction * * Sets a #GTlsInteraction object to be used when the connection or certificate * database need to interact with the user. This will be used to prompt the * user for passwords where necessary. * * Since: 1.6 */ void gst_rtsp_connection_set_tls_interaction (GstRTSPConnection * conn, GTlsInteraction * interaction) { GTlsInteraction *old_interaction; g_return_if_fail (conn != NULL); if (interaction) g_object_ref (interaction); old_interaction = conn->tls_interaction; conn->tls_interaction = interaction; if (old_interaction) g_object_unref (old_interaction); } /** * gst_rtsp_connection_get_tls_interaction: * @conn: a #GstRTSPConnection * * Gets a #GTlsInteraction object to be used when the connection or certificate * database need to interact with the user. This will be used to prompt the * user for passwords where necessary. * * Returns: (transfer full): a reference on the #GTlsInteraction. Use * g_object_unref() to release. * * Since: 1.6 */ GTlsInteraction * gst_rtsp_connection_get_tls_interaction (GstRTSPConnection * conn) { GTlsInteraction *result; g_return_val_if_fail (conn != NULL, NULL); if ((result = conn->tls_interaction)) g_object_ref (result); return result; } /** * gst_rtsp_connection_set_accept_certificate_func: * @conn: a #GstRTSPConnection * @func: a #GstRTSPConnectionAcceptCertificateFunc to check certificates * @destroy_notify: #GDestroyNotify for @user_data * @user_data: User data passed to @func * * Sets a custom accept-certificate function for checking certificates for * validity. This will directly map to #GTlsConnection 's "accept-certificate" * signal and be performed after the default checks of #GstRTSPConnection * (checking against the #GTlsDatabase with the given #GTlsCertificateFlags) * have failed. If no #GTlsDatabase is set on this connection, only @func will * be called. * * Since: 1.14 */ void gst_rtsp_connection_set_accept_certificate_func (GstRTSPConnection * conn, GstRTSPConnectionAcceptCertificateFunc func, gpointer user_data, GDestroyNotify destroy_notify) { if (conn->accept_certificate_destroy_notify) conn-> accept_certificate_destroy_notify (conn->accept_certificate_user_data); conn->accept_certificate_func = func; conn->accept_certificate_user_data = user_data; conn->accept_certificate_destroy_notify = destroy_notify; } static GstRTSPResult setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri, GstRTSPMessage * response) { gint i; GstRTSPResult res; gchar *value; guint16 url_port; GstRTSPMessage *msg; gboolean old_http; GstRTSPUrl *url; GError *error = NULL; GSocketConnection *connection; GSocket *socket; gchar *connection_uri = NULL; gchar *request_uri = NULL; gchar *host = NULL; url = conn->url; gst_rtsp_url_get_port (url, &url_port); host = g_strdup_printf ("%s:%d", url->host, url_port); /* create a random sessionid */ for (i = 0; i < TUNNELID_LEN; i++) conn->tunnelid[i] = g_random_int_range ('a', 'z'); conn->tunnelid[TUNNELID_LEN - 1] = '\0'; /* create the GET request for the read connection */ GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri), no_message); msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST; gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE, conn->tunnelid); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT, "application/x-rtsp-tunnelled"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host); /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP * request from being base64 encoded */ conn->tunneled = FALSE; GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed); gst_rtsp_message_free (msg); conn->tunneled = TRUE; /* receive the response to the GET request */ /* we need to temporarily set manual_http to TRUE since * gst_rtsp_connection_receive() will treat the HTTP response as a parsing * failure otherwise */ old_http = conn->manual_http; conn->manual_http = TRUE; GST_RTSP_CHECK (gst_rtsp_connection_receive (conn, response, timeout), read_failed); conn->manual_http = old_http; if (response->type != GST_RTSP_MESSAGE_HTTP_RESPONSE || response->type_data.response.code != GST_RTSP_STS_OK) goto wrong_result; if (gst_rtsp_message_get_header (response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS, &value, 0) == GST_RTSP_OK) { g_free (url->host); url->host = g_strdup (value); g_free (conn->remote_ip); conn->remote_ip = g_strdup (value); } connection_uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port, url->abspath, url->query ? "?" : "", url->query ? url->query : ""); /* connect to the host/port */ if (conn->proxy_host) { connection = g_socket_client_connect_to_host (conn->client, conn->proxy_host, conn->proxy_port, conn->cancellable, &error); request_uri = g_strdup (connection_uri); } else { connection = g_socket_client_connect_to_uri (conn->client, connection_uri, 0, conn->cancellable, &error); request_uri = g_strdup_printf ("%s%s%s", url->abspath, url->query ? "?" : "", url->query ? url->query : ""); } if (connection == NULL) goto connect_failed; socket = g_socket_connection_get_socket (connection); /* get remote address */ g_free (conn->remote_ip); conn->remote_ip = NULL; if (!collect_addresses (socket, &conn->remote_ip, NULL, TRUE, &error)) goto remote_address_failed; /* this is now our writing socket */ conn->stream1 = G_IO_STREAM (connection); conn->socket1 = socket; conn->write_socket = conn->socket1; conn->output_stream = g_io_stream_get_output_stream (conn->stream1); conn->control_stream = NULL; /* create the POST request for the write connection */ GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST, request_uri), no_message); msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST; gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE, conn->tunnelid); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT, "application/x-rtsp-tunnelled"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE, "application/x-rtsp-tunnelled"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_EXPIRES, "Sun, 9 Jan 1972 00:00:00 GMT"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_LENGTH, "32767"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, host); /* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP * request from being base64 encoded */ conn->tunneled = FALSE; GST_RTSP_CHECK (gst_rtsp_connection_send (conn, msg, timeout), write_failed); gst_rtsp_message_free (msg); conn->tunneled = TRUE; exit: g_free (connection_uri); g_free (request_uri); g_free (host); return res; /* ERRORS */ no_message: { GST_ERROR ("failed to create request (%d)", res); goto exit; } write_failed: { GST_ERROR ("write failed (%d)", res); gst_rtsp_message_free (msg); conn->tunneled = TRUE; goto exit; } read_failed: { GST_ERROR ("read failed (%d)", res); conn->manual_http = FALSE; goto exit; } wrong_result: { GST_ERROR ("got failure response %d %s", response->type_data.response.code, response->type_data.response.reason); res = GST_RTSP_ERROR; goto exit; } connect_failed: { GST_ERROR ("failed to connect: %s", error->message); res = GST_RTSP_ERROR; g_clear_error (&error); goto exit; } remote_address_failed: { GST_ERROR ("failed to resolve address: %s", error->message); g_object_unref (connection); g_clear_error (&error); return GST_RTSP_ERROR; } } /** * gst_rtsp_connection_connect_with_response: * @conn: a #GstRTSPConnection * @timeout: a #GTimeVal timeout * @response: a #GstRTSPMessage * * Attempt to connect to the url of @conn made with * gst_rtsp_connection_create(). If @timeout is %NULL this function can block * forever. If @timeout contains a valid timeout, this function will return * #GST_RTSP_ETIMEOUT after the timeout expired. If @conn is set to tunneled, * @response will contain a response to the tunneling request messages. * * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK when a connection could be made. * * Since 1.8 */ GstRTSPResult gst_rtsp_connection_connect_with_response (GstRTSPConnection * conn, GTimeVal * timeout, GstRTSPMessage * response) { GstRTSPResult res; GSocketConnection *connection; GSocket *socket; GError *error = NULL; gchar *connection_uri, *request_uri, *remote_ip; GstClockTime to; guint16 url_port; GstRTSPUrl *url; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->stream0 == NULL, GST_RTSP_EINVAL); to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; g_socket_client_set_timeout (conn->client, (to + GST_SECOND - 1) / GST_SECOND); url = conn->url; gst_rtsp_url_get_port (url, &url_port); if (conn->tunneled) { connection_uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port, url->abspath, url->query ? "?" : "", url->query ? url->query : ""); } else { connection_uri = gst_rtsp_url_get_request_uri (url); } if (conn->proxy_host) { connection = g_socket_client_connect_to_host (conn->client, conn->proxy_host, conn->proxy_port, conn->cancellable, &error); request_uri = g_strdup (connection_uri); } else { connection = g_socket_client_connect_to_uri (conn->client, connection_uri, url_port, conn->cancellable, &error); /* use the relative component of the uri for non-proxy connections */ request_uri = g_strdup_printf ("%s%s%s", url->abspath, url->query ? "?" : "", url->query ? url->query : ""); } if (connection == NULL) goto connect_failed; /* get remote address */ socket = g_socket_connection_get_socket (connection); if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error)) goto remote_address_failed; g_free (conn->remote_ip); conn->remote_ip = remote_ip; conn->stream0 = G_IO_STREAM (connection); conn->socket0 = socket; /* this is our read socket */ conn->read_socket = conn->socket0; conn->write_socket = conn->socket0; conn->input_stream = g_io_stream_get_input_stream (conn->stream0); conn->output_stream = g_io_stream_get_output_stream (conn->stream0); conn->control_stream = NULL; if (conn->tunneled) { res = setup_tunneling (conn, timeout, request_uri, response); if (res != GST_RTSP_OK) goto tunneling_failed; } g_free (connection_uri); g_free (request_uri); return GST_RTSP_OK; /* ERRORS */ connect_failed: { GST_ERROR ("failed to connect: %s", error->message); g_clear_error (&error); g_free (connection_uri); g_free (request_uri); return GST_RTSP_ERROR; } remote_address_failed: { GST_ERROR ("failed to connect: %s", error->message); g_object_unref (connection); g_clear_error (&error); g_free (connection_uri); g_free (request_uri); return GST_RTSP_ERROR; } tunneling_failed: { GST_ERROR ("failed to setup tunneling"); g_free (connection_uri); g_free (request_uri); return res; } } static void add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message) { switch (conn->auth_method) { case GST_RTSP_AUTH_BASIC:{ gchar *user_pass; gchar *user_pass64; gchar *auth_string; if (conn->username == NULL || conn->passwd == NULL) break; user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd); user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass)); auth_string = g_strdup_printf ("Basic %s", user_pass64); gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION, auth_string); g_free (user_pass); g_free (user_pass64); break; } case GST_RTSP_AUTH_DIGEST:{ gchar *response; gchar *auth_string, *auth_string2; gchar *realm; gchar *nonce; gchar *opaque; const gchar *uri; const gchar *method; /* we need to have some params set */ if (conn->auth_params == NULL || conn->username == NULL || conn->passwd == NULL) break; /* we need the realm and nonce */ realm = (gchar *) g_hash_table_lookup (conn->auth_params, "realm"); nonce = (gchar *) g_hash_table_lookup (conn->auth_params, "nonce"); if (realm == NULL || nonce == NULL) break; method = gst_rtsp_method_as_text (message->type_data.request.method); uri = message->type_data.request.uri; response = gst_rtsp_generate_digest_auth_response (NULL, method, realm, conn->username, conn->passwd, uri, nonce); auth_string = g_strdup_printf ("Digest username=\"%s\", " "realm=\"%s\", nonce=\"%s\", uri=\"%s\", response=\"%s\"", conn->username, realm, nonce, uri, response); g_free (response); opaque = (gchar *) g_hash_table_lookup (conn->auth_params, "opaque"); if (opaque) { auth_string2 = g_strdup_printf ("%s, opaque=\"%s\"", auth_string, opaque); g_free (auth_string); auth_string = auth_string2; } /* Do not keep any old Authorization headers */ gst_rtsp_message_remove_header (message, GST_RTSP_HDR_AUTHORIZATION, -1); gst_rtsp_message_take_header (message, GST_RTSP_HDR_AUTHORIZATION, auth_string); break; } default: /* Nothing to do */ break; } } /** * gst_rtsp_connection_connect: * @conn: a #GstRTSPConnection * @timeout: a #GTimeVal timeout * * Attempt to connect to the url of @conn made with * gst_rtsp_connection_create(). If @timeout is %NULL this function can block * forever. If @timeout contains a valid timeout, this function will return * #GST_RTSP_ETIMEOUT after the timeout expired. * * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK when a connection could be made. */ GstRTSPResult gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) { GstRTSPResult result; GstRTSPMessage response; memset (&response, 0, sizeof (response)); gst_rtsp_message_init (&response); result = gst_rtsp_connection_connect_with_response (conn, timeout, &response); gst_rtsp_message_unset (&response); return result; } static void gen_date_string (gchar * date_string, guint len) { static const char wkdays[7][4] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" }; static const char months[12][4] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; struct tm tm; time_t t; time (&t); #ifdef HAVE_GMTIME_R gmtime_r (&t, &tm); #else tm = *gmtime (&t); #endif g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT", wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900, tm.tm_hour, tm.tm_min, tm.tm_sec); } static GstRTSPResult write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx, guint size, gboolean block, GCancellable * cancellable) { guint left; gssize r; GError *err = NULL; if (G_UNLIKELY (*idx > size)) return GST_RTSP_ERROR; left = size - *idx; while (left) { if (block) r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left, cancellable, &err); else r = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (stream), (gchar *) & buffer[*idx], left, cancellable, &err); if (G_UNLIKELY (r < 0)) goto error; left -= r; *idx += r; } return GST_RTSP_OK; /* ERRORS */ error: { if (G_UNLIKELY (r == 0)) return GST_RTSP_EEOF; if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) GST_WARNING ("%s", err->message); else GST_DEBUG ("%s", err->message); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { g_clear_error (&err); return GST_RTSP_EINTR; } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { g_clear_error (&err); return GST_RTSP_EINTR; } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { g_clear_error (&err); return GST_RTSP_ETIMEOUT; } g_clear_error (&err); return GST_RTSP_ESYS; } } /* NOTE: This changes the values of vectors if multiple iterations are needed! */ #if GLIB_CHECK_VERSION(2, 59, 1) static GstRTSPResult writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors, gsize * bytes_written, gboolean block, GCancellable * cancellable) { gsize _bytes_written = 0; gsize written; GError *err = NULL; GPollableReturn res = G_POLLABLE_RETURN_OK; while (n_vectors > 0) { if (block) { if (G_UNLIKELY (!g_output_stream_writev (stream, vectors, n_vectors, &written, cancellable, &err))) { /* This will never return G_IO_ERROR_WOULD_BLOCK */ res = G_POLLABLE_RETURN_FAILED; goto error; } } else { res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (stream), vectors, n_vectors, &written, cancellable, &err); if (res != G_POLLABLE_RETURN_OK) { g_assert (written == 0); goto error; } } _bytes_written += written; /* skip vectors that have been written in full */ while (written > 0 && written >= vectors[0].size) { written -= vectors[0].size; ++vectors; --n_vectors; } /* skip partially written vector data */ if (written > 0) { vectors[0].size -= written; vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written; } } *bytes_written = _bytes_written; return GST_RTSP_OK; /* ERRORS */ error: { *bytes_written = _bytes_written; if (err) GST_WARNING ("%s", err->message); if (res == G_POLLABLE_RETURN_WOULD_BLOCK) { g_assert (!err); return GST_RTSP_EINTR; } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { g_clear_error (&err); return GST_RTSP_EINTR; } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { g_clear_error (&err); return GST_RTSP_ETIMEOUT; } else if (G_UNLIKELY (written == 0)) { g_clear_error (&err); return GST_RTSP_EEOF; } g_clear_error (&err); return GST_RTSP_ESYS; } } #else static GstRTSPResult writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors, gsize * bytes_written, gboolean block, GCancellable * cancellable) { gsize _bytes_written = 0; guint written; gint i; GstRTSPResult res = GST_RTSP_OK; for (i = 0; i < n_vectors; i++) { written = 0; res = write_bytes (stream, vectors[i].buffer, &written, vectors[i].size, block, cancellable); _bytes_written += written; if (G_UNLIKELY (res != GST_RTSP_OK)) break; } *bytes_written = _bytes_written; return res; } #endif static gint fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, gboolean block, GError ** err) { gint out = 0; if (G_UNLIKELY (conn->initial_buffer != NULL)) { gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]); out = MIN (left, size); memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out); if (left == (gsize) out) { g_free (conn->initial_buffer); conn->initial_buffer = NULL; conn->initial_buffer_offset = 0; } else conn->initial_buffer_offset += out; } if (G_LIKELY (size > (guint) out)) { gssize r; gsize count = size - out; if (block) r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out], count, conn->may_cancel ? conn->cancellable : NULL, err); else r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (conn->input_stream), (gchar *) & buffer[out], count, conn->may_cancel ? conn->cancellable : NULL, err); if (G_UNLIKELY (r < 0)) { if (out == 0) { /* propagate the error */ out = r; } else { /* we have some data ignore error */ g_clear_error (err); } } else out += r; } return out; } static gint fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, gboolean block, GError ** err) { DecodeCtx *ctx = conn->ctxp; gint out = 0; if (ctx) { while (size > 0) { guint8 in[sizeof (ctx->out) * 4 / 3]; gint r; while (size > 0 && ctx->cout < ctx->coutl) { /* we have some leftover bytes */ *buffer++ = ctx->out[ctx->cout++]; size--; out++; } /* got what we needed? */ if (size == 0) break; /* try to read more bytes */ r = fill_raw_bytes (conn, in, sizeof (in), block, err); if (r <= 0) { if (out == 0) { out = r; } else { /* we have some data ignore error */ g_clear_error (err); } break; } ctx->cout = 0; ctx->coutl = g_base64_decode_step ((gchar *) in, r, ctx->out, &ctx->state, &ctx->save); } } else { out = fill_raw_bytes (conn, buffer, size, block, err); } return out; } static GstRTSPResult read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size, gboolean block) { guint left; gint r; GError *err = NULL; if (G_UNLIKELY (*idx > size)) return GST_RTSP_ERROR; left = size - *idx; while (left) { r = fill_bytes (conn, &buffer[*idx], left, block, &err); if (G_UNLIKELY (r <= 0)) goto error; left -= r; *idx += r; } return GST_RTSP_OK; /* ERRORS */ error: { if (G_UNLIKELY (r == 0)) return GST_RTSP_EEOF; GST_DEBUG ("%s", err->message); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { g_clear_error (&err); return GST_RTSP_EINTR; } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { g_clear_error (&err); return GST_RTSP_EINTR; } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { g_clear_error (&err); return GST_RTSP_ETIMEOUT; } g_clear_error (&err); return GST_RTSP_ESYS; } } /* The code below tries to handle clients using \r, \n or \r\n to indicate the * end of a line. It even does its best to handle clients which mix them (even * though this is a really stupid idea (tm).) It also handles Line White Space * (LWS), where a line end followed by whitespace is considered LWS. This is * the method used in RTSP (and HTTP) to break long lines. */ static GstRTSPResult read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size, gboolean block) { GstRTSPResult res; while (TRUE) { guint8 c; guint i; if (conn->read_ahead == READ_AHEAD_EOH) { /* the last call to read_line() already determined that we have reached * the end of the headers, so convey that information now */ conn->read_ahead = 0; break; } else if (conn->read_ahead == READ_AHEAD_CRLF) { /* the last call to read_line() left off after having read \r\n */ c = '\n'; } else if (conn->read_ahead == READ_AHEAD_CRLFCR) { /* the last call to read_line() left off after having read \r\n\r */ c = '\r'; } else if (conn->read_ahead != 0) { /* the last call to read_line() left us with a character to start with */ c = (guint8) conn->read_ahead; conn->read_ahead = 0; } else { /* read the next character */ i = 0; res = read_bytes (conn, &c, &i, 1, block); if (G_UNLIKELY (res != GST_RTSP_OK)) return res; } /* special treatment of line endings */ if (c == '\r' || c == '\n') { guint8 read_ahead; retry: /* need to read ahead one more character to know what to do... */ i = 0; res = read_bytes (conn, &read_ahead, &i, 1, block); if (G_UNLIKELY (res != GST_RTSP_OK)) return res; if (read_ahead == ' ' || read_ahead == '\t') { if (conn->read_ahead == READ_AHEAD_CRLFCR) { /* got \r\n\r followed by whitespace, treat it as a normal line * followed by one starting with LWS */ conn->read_ahead = read_ahead; break; } else { /* got LWS, change the line ending to a space and continue */ c = ' '; conn->read_ahead = read_ahead; } } else if (conn->read_ahead == READ_AHEAD_CRLFCR) { if (read_ahead == '\r' || read_ahead == '\n') { /* got \r\n\r\r or \r\n\r\n, treat it as the end of the headers */ conn->read_ahead = READ_AHEAD_EOH; break; } else { /* got \r\n\r followed by something else, this is not really * supported since we have probably just eaten the first character * of the body or the next message, so just ignore the second \r * and live with it... */ conn->read_ahead = read_ahead; break; } } else if (conn->read_ahead == READ_AHEAD_CRLF) { if (read_ahead == '\r') { /* got \r\n\r so far, need one more character... */ conn->read_ahead = READ_AHEAD_CRLFCR; goto retry; } else if (read_ahead == '\n') { /* got \r\n\n, treat it as the end of the headers */ conn->read_ahead = READ_AHEAD_EOH; break; } else { /* found the end of a line, keep read_ahead for the next line */ conn->read_ahead = read_ahead; break; } } else if (c == read_ahead) { /* got double \r or \n, treat it as the end of the headers */ conn->read_ahead = READ_AHEAD_EOH; break; } else if (c == '\r' && read_ahead == '\n') { /* got \r\n so far, still need more to know what to do... */ conn->read_ahead = READ_AHEAD_CRLF; goto retry; } else { /* found the end of a line, keep read_ahead for the next line */ conn->read_ahead = read_ahead; break; } } if (G_LIKELY (*idx < size - 1)) buffer[(*idx)++] = c; } buffer[*idx] = '\0'; return GST_RTSP_OK; } /** * gst_rtsp_connection_write: * @conn: a #GstRTSPConnection * @data: the data to write * @size: the size of @data * @timeout: a timeout value or %NULL * * Attempt to write @size bytes of @data to the connected @conn, blocking up to * the specified @timeout. @timeout can be %NULL, in which case this function * might block forever. * * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK on success. */ /* FIXME 2.0: This should've been static! */ GstRTSPResult gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, guint size, GTimeVal * timeout) { guint offset; GstClockTime to; GstRTSPResult res; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL); g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL); offset = 0; to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND); res = write_bytes (conn->output_stream, data, &offset, size, TRUE, conn->cancellable); g_socket_set_timeout (conn->write_socket, 0); return res; } static gboolean serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message, GstRTSPSerializedMessage * serialized_message) { GString *str = NULL; memset (serialized_message, 0, sizeof (*serialized_message)); /* Initially we borrow the body_data / body_buffer fields from * the message */ serialized_message->borrowed = TRUE; switch (message->type) { case GST_RTSP_MESSAGE_REQUEST: str = g_string_new (""); /* create request string, add CSeq */ g_string_append_printf (str, "%s %s RTSP/%s\r\n" "CSeq: %d\r\n", gst_rtsp_method_as_text (message->type_data.request.method), message->type_data.request.uri, gst_rtsp_version_as_text (message->type_data.request.version), conn->cseq++); /* add session id if we have one */ if (conn->session_id[0] != '\0') { gst_rtsp_message_remove_header (message, GST_RTSP_HDR_SESSION, -1); gst_rtsp_message_add_header (message, GST_RTSP_HDR_SESSION, conn->session_id); } /* add any authentication headers */ add_auth_header (conn, message); break; case GST_RTSP_MESSAGE_RESPONSE: str = g_string_new (""); /* create response string */ g_string_append_printf (str, "RTSP/%s %d %s\r\n", gst_rtsp_version_as_text (message->type_data.response.version), message->type_data.response.code, message->type_data.response.reason); break; case GST_RTSP_MESSAGE_HTTP_REQUEST: str = g_string_new (""); /* create request string */ g_string_append_printf (str, "%s %s HTTP/%s\r\n", gst_rtsp_method_as_text (message->type_data.request.method), message->type_data.request.uri, gst_rtsp_version_as_text (message->type_data.request.version)); /* add any authentication headers */ add_auth_header (conn, message); break; case GST_RTSP_MESSAGE_HTTP_RESPONSE: str = g_string_new (""); /* create response string */ g_string_append_printf (str, "HTTP/%s %d %s\r\n", gst_rtsp_version_as_text (message->type_data.request.version), message->type_data.response.code, message->type_data.response.reason); break; case GST_RTSP_MESSAGE_DATA: { guint8 *data_header = serialized_message->data_header; /* prepare data header */ data_header[0] = '$'; data_header[1] = message->type_data.data.channel; data_header[2] = (message->body_size >> 8) & 0xff; data_header[3] = message->body_size & 0xff; /* create serialized message with header and data */ serialized_message->data_is_data_header = TRUE; serialized_message->data_size = 4; if (message->body) { serialized_message->body_data = message->body; serialized_message->body_data_size = message->body_size; } else { g_assert (message->body_buffer != NULL); serialized_message->body_buffer = message->body_buffer; } break; } default: g_string_free (str, TRUE); g_return_val_if_reached (FALSE); break; } /* append headers and body */ if (message->type != GST_RTSP_MESSAGE_DATA) { gchar date_string[100]; g_assert (str != NULL); gen_date_string (date_string, sizeof (date_string)); /* add date header */ gst_rtsp_message_remove_header (message, GST_RTSP_HDR_DATE, -1); gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string); /* append headers */ gst_rtsp_message_append_headers (message, str); /* append Content-Length and body if needed */ if (message->body_size > 0) { gchar *len; len = g_strdup_printf ("%d", message->body_size); g_string_append_printf (str, "%s: %s\r\n", gst_rtsp_header_as_text (GST_RTSP_HDR_CONTENT_LENGTH), len); g_free (len); /* header ends here */ g_string_append (str, "\r\n"); if (message->body) { serialized_message->body_data = message->body; serialized_message->body_data_size = message->body_size; } else { g_assert (message->body_buffer != NULL); serialized_message->body_buffer = message->body_buffer; } } else { /* just end headers */ g_string_append (str, "\r\n"); } serialized_message->data_size = str->len; serialized_message->data = (guint8 *) g_string_free (str, FALSE); } return TRUE; } /** * gst_rtsp_connection_send: * @conn: a #GstRTSPConnection * @message: the message to send * @timeout: a timeout value or %NULL * * Attempt to send @message to the connected @conn, blocking up to * the specified @timeout. @timeout can be %NULL, in which case this function * might block forever. * * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK on success. */ GstRTSPResult gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); return gst_rtsp_connection_send_messages (conn, message, 1, timeout); } /** * gst_rtsp_connection_send_messages: * @conn: a #GstRTSPConnection * @messages: (array length=n_messages): the messages to send * @n_messages: the number of messages to send * @timeout: a timeout value or %NULL * * Attempt to send @messages to the connected @conn, blocking up to * the specified @timeout. @timeout can be %NULL, in which case this function * might block forever. * * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK on success. * * Since: 1.16 */ GstRTSPResult gst_rtsp_connection_send_messages (GstRTSPConnection * conn, GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout) { GstClockTime to; GstRTSPResult res; GstRTSPSerializedMessage *serialized_messages; GOutputVector *vectors; GstMapInfo *map_infos; guint n_vectors, n_memories; gint i, j, k; gsize bytes_to_write, bytes_written; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL); serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages); memset (serialized_messages, 0, sizeof (GstRTSPSerializedMessage) * n_messages); for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages; i++) { if (G_UNLIKELY (!serialize_message (conn, &messages[i], &serialized_messages[i]))) goto no_message; if (conn->tunneled) { gint state = 0, save = 0; gchar *base64_buffer, *out_buffer; gsize written = 0; gsize in_length; in_length = serialized_messages[i].data_size; if (serialized_messages[i].body_data) in_length += serialized_messages[i].body_data_size; else if (serialized_messages[i].body_buffer) in_length += gst_buffer_get_size (serialized_messages[i].body_buffer); in_length = (in_length / 3 + 1) * 4 + 4 + 1; base64_buffer = out_buffer = g_malloc0 (in_length); written = g_base64_encode_step (serialized_messages[i].data_is_data_header ? serialized_messages[i].data_header : serialized_messages[i].data, serialized_messages[i].data_size, FALSE, out_buffer, &state, &save); out_buffer += written; if (serialized_messages[i].body_data) { written = g_base64_encode_step (serialized_messages[i].body_data, serialized_messages[i].body_data_size, FALSE, out_buffer, &state, &save); out_buffer += written; } else if (serialized_messages[i].body_buffer) { guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer); for (j = 0; j < n; j++) { GstMemory *mem = gst_buffer_peek_memory (serialized_messages[i].body_buffer, j); GstMapInfo map; gst_memory_map (mem, &map, GST_MAP_READ); written = g_base64_encode_step (map.data, map.size, FALSE, out_buffer, &state, &save); out_buffer += written; gst_memory_unmap (mem, &map); } } written = g_base64_encode_close (FALSE, out_buffer, &state, &save); out_buffer += written; gst_rtsp_serialized_message_clear (&serialized_messages[i]); memset (&serialized_messages[i], 0, sizeof (serialized_messages[i])); serialized_messages[i].data = (guint8 *) base64_buffer; serialized_messages[i].data_size = (out_buffer - base64_buffer) + 1; n_vectors++; } else { n_vectors++; if (serialized_messages[i].body_data) { n_vectors++; } else if (serialized_messages[i].body_buffer) { n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer); n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer); } } } vectors = g_newa (GOutputVector, n_vectors); map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL; for (i = 0, j = 0, k = 0; i < n_messages; i++) { vectors[j].buffer = serialized_messages[i].data_is_data_header ? serialized_messages[i].data_header : serialized_messages[i].data; vectors[j].size = serialized_messages[i].data_size; bytes_to_write += vectors[j].size; j++; if (serialized_messages[i].body_data) { vectors[j].buffer = serialized_messages[i].body_data; vectors[j].size = serialized_messages[i].body_data_size; bytes_to_write += vectors[j].size; j++; } else if (serialized_messages[i].body_buffer) { gint l, n; n = gst_buffer_n_memory (serialized_messages[i].body_buffer); for (l = 0; l < n; l++) { GstMemory *mem = gst_buffer_peek_memory (serialized_messages[i].body_buffer, l); gst_memory_map (mem, &map_infos[k], GST_MAP_READ); vectors[j].buffer = map_infos[k].data; vectors[j].size = map_infos[k].size; bytes_to_write += vectors[j].size; k++; j++; } } } /* write request: this is synchronous */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND); res = writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written, TRUE, conn->cancellable); g_socket_set_timeout (conn->write_socket, 0); g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK); /* free everything */ for (i = 0, k = 0; i < n_messages; i++) { if (serialized_messages[i].body_buffer) { gint l, n; n = gst_buffer_n_memory (serialized_messages[i].body_buffer); for (l = 0; l < n; l++) { GstMemory *mem = gst_buffer_peek_memory (serialized_messages[i].body_buffer, l); gst_memory_unmap (mem, &map_infos[k]); k++; } } g_free (serialized_messages[i].data); } return res; no_message: { for (i = 0; i < n_messages; i++) { gst_rtsp_serialized_message_clear (&serialized_messages[i]); } g_warning ("Wrong message"); return GST_RTSP_EINVAL; } } static GstRTSPResult parse_string (gchar * dest, gint size, gchar ** src) { GstRTSPResult res = GST_RTSP_OK; gint idx; idx = 0; /* skip spaces */ while (g_ascii_isspace (**src)) (*src)++; while (!g_ascii_isspace (**src) && **src != '\0') { if (idx < size - 1) dest[idx++] = **src; else res = GST_RTSP_EPARSE; (*src)++; } if (size > 0) dest[idx] = '\0'; return res; } static GstRTSPResult parse_protocol_version (gchar * protocol, GstRTSPMsgType * type, GstRTSPVersion * version) { GstRTSPVersion rversion; GstRTSPResult res = GST_RTSP_OK; gchar *ver; if (G_LIKELY ((ver = strchr (protocol, '/')) != NULL)) { guint major; guint minor; gchar dummychar; *ver++ = '\0'; /* the version number must be formatted as X.Y with nothing following */ if (sscanf (ver, "%u.%u%c", &major, &minor, &dummychar) != 2) res = GST_RTSP_EPARSE; rversion = major * 0x10 + minor; if (g_ascii_strcasecmp (protocol, "RTSP") == 0) { if (rversion != GST_RTSP_VERSION_1_0 && rversion != GST_RTSP_VERSION_2_0) { *version = GST_RTSP_VERSION_INVALID; res = GST_RTSP_ERROR; } } else if (g_ascii_strcasecmp (protocol, "HTTP") == 0) { if (*type == GST_RTSP_MESSAGE_REQUEST) *type = GST_RTSP_MESSAGE_HTTP_REQUEST; else if (*type == GST_RTSP_MESSAGE_RESPONSE) *type = GST_RTSP_MESSAGE_HTTP_RESPONSE; if (rversion != GST_RTSP_VERSION_1_0 && rversion != GST_RTSP_VERSION_1_1 && rversion != GST_RTSP_VERSION_2_0) res = GST_RTSP_ERROR; } else res = GST_RTSP_EPARSE; } else res = GST_RTSP_EPARSE; if (res == GST_RTSP_OK) *version = rversion; return res; } static GstRTSPResult parse_response_status (guint8 * buffer, GstRTSPMessage * msg) { GstRTSPResult res = GST_RTSP_OK; GstRTSPResult res2; gchar versionstr[20]; gchar codestr[4]; gint code; gchar *bptr; bptr = (gchar *) buffer; if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK) res = GST_RTSP_EPARSE; if (parse_string (codestr, sizeof (codestr), &bptr) != GST_RTSP_OK) res = GST_RTSP_EPARSE; code = atoi (codestr); if (G_UNLIKELY (*codestr == '\0' || code < 0 || code >= 600)) res = GST_RTSP_EPARSE; while (g_ascii_isspace (*bptr)) bptr++; if (G_UNLIKELY (gst_rtsp_message_init_response (msg, code, bptr, NULL) != GST_RTSP_OK)) res = GST_RTSP_EPARSE; res2 = parse_protocol_version (versionstr, &msg->type, &msg->type_data.response.version); if (G_LIKELY (res == GST_RTSP_OK)) res = res2; return res; } static GstRTSPResult parse_request_line (guint8 * buffer, GstRTSPMessage * msg) { GstRTSPResult res = GST_RTSP_OK; GstRTSPResult res2; gchar versionstr[20]; gchar methodstr[20]; gchar urlstr[4096]; gchar *bptr; GstRTSPMethod method; bptr = (gchar *) buffer; if (parse_string (methodstr, sizeof (methodstr), &bptr) != GST_RTSP_OK) res = GST_RTSP_EPARSE; method = gst_rtsp_find_method (methodstr); if (parse_string (urlstr, sizeof (urlstr), &bptr) != GST_RTSP_OK) res = GST_RTSP_EPARSE; if (G_UNLIKELY (*urlstr == '\0')) res = GST_RTSP_EPARSE; if (parse_string (versionstr, sizeof (versionstr), &bptr) != GST_RTSP_OK) res = GST_RTSP_EPARSE; if (G_UNLIKELY (*bptr != '\0')) res = GST_RTSP_EPARSE; if (G_UNLIKELY (gst_rtsp_message_init_request (msg, method, urlstr) != GST_RTSP_OK)) res = GST_RTSP_EPARSE; res2 = parse_protocol_version (versionstr, &msg->type, &msg->type_data.request.version); if (G_LIKELY (res == GST_RTSP_OK)) res = res2; if (G_LIKELY (msg->type == GST_RTSP_MESSAGE_REQUEST)) { /* GET and POST are not allowed as RTSP methods */ if (msg->type_data.request.method == GST_RTSP_GET || msg->type_data.request.method == GST_RTSP_POST) { msg->type_data.request.method = GST_RTSP_INVALID; if (res == GST_RTSP_OK) res = GST_RTSP_ERROR; } } else if (msg->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { /* only GET and POST are allowed as HTTP methods */ if (msg->type_data.request.method != GST_RTSP_GET && msg->type_data.request.method != GST_RTSP_POST) { msg->type_data.request.method = GST_RTSP_INVALID; if (res == GST_RTSP_OK) res = GST_RTSP_ERROR; } } return res; } /* parsing lines means reading a Key: Value pair */ static GstRTSPResult parse_line (guint8 * buffer, GstRTSPMessage * msg) { GstRTSPHeaderField field; gchar *line = (gchar *) buffer; gchar *field_name = NULL; gchar *value; if ((value = strchr (line, ':')) == NULL || value == line) goto parse_error; /* trim space before the colon */ if (value[-1] == ' ') value[-1] = '\0'; /* replace the colon with a NUL */ *value++ = '\0'; /* find the header */ field = gst_rtsp_find_header_field (line); /* custom header not present in the list of pre-defined headers */ if (field == GST_RTSP_HDR_INVALID) field_name = line; /* split up the value in multiple key:value pairs if it contains comma(s) */ while (*value != '\0') { gchar *next_value; gchar *comma = NULL; gboolean quoted = FALSE; guint comment = 0; /* trim leading space */ if (*value == ' ') value++; /* for headers which may not appear multiple times, and thus may not * contain multiple values on the same line, we can short-circuit the loop * below and the entire value results in just one key:value pair*/ if (!gst_rtsp_header_allow_multiple (field)) next_value = value + strlen (value); else next_value = value; /* find the next value, taking special care of quotes and comments */ while (*next_value != '\0') { if ((quoted || comment != 0) && *next_value == '\\' && next_value[1] != '\0') next_value++; else if (comment == 0 && *next_value == '"') quoted = !quoted; else if (!quoted && *next_value == '(') comment++; else if (comment != 0 && *next_value == ')') comment--; else if (!quoted && comment == 0) { /* To quote RFC 2068: "User agents MUST take special care in parsing * the WWW-Authenticate field value if it contains more than one * challenge, or if more than one WWW-Authenticate header field is * provided, since the contents of a challenge may itself contain a * comma-separated list of authentication parameters." * * What this means is that we cannot just look for an unquoted comma * when looking for multiple values in Proxy-Authenticate and * WWW-Authenticate headers. Instead we need to look for the sequence * "comma [space] token space token" before we can split after the * comma... */ if (field == GST_RTSP_HDR_PROXY_AUTHENTICATE || field == GST_RTSP_HDR_WWW_AUTHENTICATE) { if (*next_value == ',') { if (next_value[1] == ' ') { /* skip any space following the comma so we do not mistake it for * separating between two tokens */ next_value++; } comma = next_value; } else if (*next_value == ' ' && next_value[1] != ',' && next_value[1] != '=' && comma != NULL) { next_value = comma; comma = NULL; break; } } else if (*next_value == ',') break; } next_value++; } if (msg->type == GST_RTSP_MESSAGE_REQUEST && field == GST_RTSP_HDR_SESSION) { /* The timeout parameter is only allowed in a session response header * but some clients send it as part of the session request header. * Ignore everything from the semicolon to the end of the line. */ next_value = value; while (*next_value != '\0') { if (*next_value == ';') { break; } next_value++; } } /* trim space */ if (value != next_value && next_value[-1] == ' ') next_value[-1] = '\0'; if (*next_value != '\0') *next_value++ = '\0'; /* add the key:value pair */ if (*value != '\0') { if (field != GST_RTSP_HDR_INVALID) gst_rtsp_message_add_header (msg, field, value); else gst_rtsp_message_add_header_by_name (msg, field_name, value); } value = next_value; } return GST_RTSP_OK; /* ERRORS */ parse_error: { return GST_RTSP_EPARSE; } } /* convert all consecutive whitespace to a single space */ static void normalize_line (guint8 * buffer) { while (*buffer) { if (g_ascii_isspace (*buffer)) { guint8 *tmp; *buffer++ = ' '; for (tmp = buffer; g_ascii_isspace (*tmp); tmp++) { } if (buffer != tmp) memmove (buffer, tmp, strlen ((gchar *) tmp) + 1); } else { buffer++; } } } /* returns: * GST_RTSP_OK when a complete message was read. * GST_RTSP_EEOF: when the read socket is closed * GST_RTSP_EINTR: when more data is needed. * GST_RTSP_..: some other error occured. */ static GstRTSPResult build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, GstRTSPConnection * conn, gboolean block) { GstRTSPResult res; while (TRUE) { switch (builder->state) { case STATE_START: { guint8 c; builder->offset = 0; res = read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1, block); if (res != GST_RTSP_OK) goto done; c = builder->buffer[0]; /* we have 1 bytes now and we can see if this is a data message or * not */ if (c == '$') { /* data message, prepare for the header */ builder->state = STATE_DATA_HEADER; conn->may_cancel = FALSE; } else if (c == '\n' || c == '\r') { /* skip \n and \r */ builder->offset = 0; } else { builder->line = 0; builder->state = STATE_READ_LINES; conn->may_cancel = FALSE; } break; } case STATE_DATA_HEADER: { res = read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4, block); if (res != GST_RTSP_OK) goto done; gst_rtsp_message_init_data (message, builder->buffer[1]); builder->body_len = (builder->buffer[2] << 8) | builder->buffer[3]; builder->body_data = g_malloc (builder->body_len + 1); builder->body_data[builder->body_len] = '\0'; builder->offset = 0; builder->state = STATE_DATA_BODY; break; } case STATE_DATA_BODY: { res = read_bytes (conn, builder->body_data, &builder->offset, builder->body_len, block); if (res != GST_RTSP_OK) goto done; /* we have the complete body now, store in the message adjusting the * length to include the trailing '\0' */ gst_rtsp_message_take_body (message, (guint8 *) builder->body_data, builder->body_len + 1); builder->body_data = NULL; builder->body_len = 0; builder->state = STATE_END; break; } case STATE_READ_LINES: { res = read_line (conn, builder->buffer, &builder->offset, sizeof (builder->buffer), block); if (res != GST_RTSP_OK) goto done; /* we have a regular response */ if (builder->buffer[0] == '\0') { gchar *hdrval; /* empty line, end of message header */ /* see if there is a Content-Length header, but ignore it if this * is a POST request with an x-sessioncookie header */ if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_CONTENT_LENGTH, &hdrval, 0) == GST_RTSP_OK && (message->type != GST_RTSP_MESSAGE_HTTP_REQUEST || message->type_data.request.method != GST_RTSP_POST || gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) { /* there is, prepare to read the body */ builder->body_len = atol (hdrval); builder->body_data = g_try_malloc (builder->body_len + 1); /* we can't do much here, we need the length to know how many bytes * we need to read next and when allocation fails, something is * probably wrong with the length. */ if (builder->body_data == NULL) goto invalid_body_len; builder->body_data[builder->body_len] = '\0'; builder->offset = 0; builder->state = STATE_DATA_BODY; } else { builder->state = STATE_END; } break; } /* we have a line */ normalize_line (builder->buffer); if (builder->line == 0) { /* first line, check for response status */ if (memcmp (builder->buffer, "RTSP", 4) == 0 || memcmp (builder->buffer, "HTTP", 4) == 0) { builder->status = parse_response_status (builder->buffer, message); } else { builder->status = parse_request_line (builder->buffer, message); } } else { /* else just parse the line */ res = parse_line (builder->buffer, message); if (res != GST_RTSP_OK) builder->status = res; } builder->line++; builder->offset = 0; break; } case STATE_END: { gchar *session_cookie; gchar *session_id; conn->may_cancel = TRUE; if (message->type == GST_RTSP_MESSAGE_DATA) { /* data messages don't have headers */ res = GST_RTSP_OK; goto done; } /* save the tunnel session in the connection */ if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST && !conn->manual_http && conn->tstate == TUNNEL_STATE_NONE && gst_rtsp_message_get_header (message, GST_RTSP_HDR_X_SESSIONCOOKIE, &session_cookie, 0) == GST_RTSP_OK) { strncpy (conn->tunnelid, session_cookie, TUNNELID_LEN); conn->tunnelid[TUNNELID_LEN - 1] = '\0'; conn->tunneled = TRUE; } /* save session id in the connection for further use */ if (message->type == GST_RTSP_MESSAGE_RESPONSE && gst_rtsp_message_get_header (message, GST_RTSP_HDR_SESSION, &session_id, 0) == GST_RTSP_OK) { gint maxlen, i; maxlen = sizeof (conn->session_id) - 1; /* the sessionid can have attributes marked with ; * Make sure we strip them */ for (i = 0; i < maxlen && session_id[i] != '\0'; i++) { if (session_id[i] == ';') { maxlen = i; /* parse timeout */ do { i++; } while (g_ascii_isspace (session_id[i])); if (g_str_has_prefix (&session_id[i], "timeout=")) { gint to; /* if we parsed something valid, configure */ if ((to = atoi (&session_id[i + 8])) > 0) conn->timeout = to; } break; } } /* make sure to not overflow */ if (conn->remember_session_id) { strncpy (conn->session_id, session_id, maxlen); conn->session_id[maxlen] = '\0'; } } res = builder->status; goto done; } default: res = GST_RTSP_ERROR; goto done; } } done: conn->may_cancel = TRUE; return res; /* ERRORS */ invalid_body_len: { conn->may_cancel = TRUE; GST_DEBUG ("could not allocate body"); return GST_RTSP_ERROR; } } /** * gst_rtsp_connection_read: * @conn: a #GstRTSPConnection * @data: the data to read * @size: the size of @data * @timeout: a timeout value or %NULL * * Attempt to read @size bytes into @data from the connected @conn, blocking up to * the specified @timeout. @timeout can be %NULL, in which case this function * might block forever. * * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK on success. */ GstRTSPResult gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, GTimeVal * timeout) { guint offset; GstClockTime to; GstRTSPResult res; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); if (G_UNLIKELY (size == 0)) return GST_RTSP_OK; offset = 0; /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND); res = read_bytes (conn, data, &offset, size, TRUE); g_socket_set_timeout (conn->read_socket, 0); return res; } static GstRTSPMessage * gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code, const GstRTSPMessage * request) { GstRTSPMessage *msg; GstRTSPResult res; if (gst_rtsp_status_as_text (code) == NULL) code = GST_RTSP_STS_INTERNAL_SERVER_ERROR; GST_RTSP_CHECK (gst_rtsp_message_new_response (&msg, code, NULL, request), no_message); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_SERVER, "GStreamer RTSP Server"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONNECTION, "close"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-store"); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache"); if (code == GST_RTSP_STS_OK) { /* add the local ip address to the tunnel reply, this is where the client * should send the POST request to */ if (conn->local_ip) gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS, conn->local_ip); gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE, "application/x-rtsp-tunnelled"); } return msg; /* ERRORS */ no_message: { return NULL; } } /** * gst_rtsp_connection_receive: * @conn: a #GstRTSPConnection * @message: the message to read * @timeout: a timeout value or %NULL * * Attempt to read into @message from the connected @conn, blocking up to * the specified @timeout. @timeout can be %NULL, in which case this function * might block forever. * * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK on success. */ GstRTSPResult gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout) { GstRTSPResult res; GstRTSPBuilder builder; GstClockTime to; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); /* configure timeout if any */ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND); memset (&builder, 0, sizeof (GstRTSPBuilder)); res = build_next (&builder, message, conn, TRUE); g_socket_set_timeout (conn->read_socket, 0); if (G_UNLIKELY (res != GST_RTSP_OK)) goto read_error; if (!conn->manual_http) { if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { if (conn->tstate == TUNNEL_STATE_NONE && message->type_data.request.method == GST_RTSP_GET) { GstRTSPMessage *response; conn->tstate = TUNNEL_STATE_GET; /* tunnel GET request, we can reply now */ response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message); res = gst_rtsp_connection_send (conn, response, timeout); gst_rtsp_message_free (response); if (res == GST_RTSP_OK) res = GST_RTSP_ETGET; goto cleanup; } else if (conn->tstate == TUNNEL_STATE_NONE && message->type_data.request.method == GST_RTSP_POST) { conn->tstate = TUNNEL_STATE_POST; /* tunnel POST request, the caller now has to link the two * connections. */ res = GST_RTSP_ETPOST; goto cleanup; } else { res = GST_RTSP_EPARSE; goto cleanup; } } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { res = GST_RTSP_EPARSE; goto cleanup; } } /* we have a message here */ build_reset (&builder); return GST_RTSP_OK; /* ERRORS */ read_error: cleanup: { build_reset (&builder); gst_rtsp_message_unset (message); return res; } } /** * gst_rtsp_connection_close: * @conn: a #GstRTSPConnection * * Close the connected @conn. After this call, the connection is in the same * state as when it was first created. * * Returns: #GST_RTSP_OK on success. */ GstRTSPResult gst_rtsp_connection_close (GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); /* last unref closes the connection we don't want to explicitly close here * because these sockets might have been provided at construction */ if (conn->stream0) { g_object_unref (conn->stream0); conn->stream0 = NULL; conn->socket0 = NULL; } if (conn->stream1) { g_object_unref (conn->stream1); conn->stream1 = NULL; conn->socket1 = NULL; } /* these were owned by the stream */ conn->input_stream = NULL; conn->output_stream = NULL; conn->control_stream = NULL; g_free (conn->remote_ip); conn->remote_ip = NULL; g_free (conn->local_ip); conn->local_ip = NULL; conn->read_ahead = 0; g_free (conn->initial_buffer); conn->initial_buffer = NULL; conn->initial_buffer_offset = 0; conn->write_socket = NULL; conn->read_socket = NULL; conn->tunneled = FALSE; conn->tstate = TUNNEL_STATE_NONE; conn->ctxp = NULL; g_free (conn->username); conn->username = NULL; g_free (conn->passwd); conn->passwd = NULL; gst_rtsp_connection_clear_auth_params (conn); conn->timeout = 60; conn->cseq = 0; conn->session_id[0] = '\0'; return GST_RTSP_OK; } /** * gst_rtsp_connection_free: * @conn: a #GstRTSPConnection * * Close and free @conn. * * Returns: #GST_RTSP_OK on success. */ GstRTSPResult gst_rtsp_connection_free (GstRTSPConnection * conn) { GstRTSPResult res; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); res = gst_rtsp_connection_close (conn); if (conn->cancellable) g_object_unref (conn->cancellable); if (conn->client) g_object_unref (conn->client); if (conn->tls_database) g_object_unref (conn->tls_database); if (conn->tls_interaction) g_object_unref (conn->tls_interaction); if (conn->accept_certificate_destroy_notify) conn-> accept_certificate_destroy_notify (conn->accept_certificate_user_data); g_timer_destroy (conn->timer); gst_rtsp_url_free (conn->url); g_free (conn->proxy_host); g_free (conn); return res; } /** * gst_rtsp_connection_poll: * @conn: a #GstRTSPConnection * @events: a bitmask of #GstRTSPEvent flags to check * @revents: location for result flags * @timeout: a timeout * * Wait up to the specified @timeout for the connection to become available for * at least one of the operations specified in @events. When the function returns * with #GST_RTSP_OK, @revents will contain a bitmask of available operations on * @conn. * * @timeout can be %NULL, in which case this function might block forever. * * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK on success. */ GstRTSPResult gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events, GstRTSPEvent * revents, GTimeVal * timeout) { GMainContext *ctx; GSource *rs, *ws, *ts; GIOCondition condition; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (events != 0, GST_RTSP_EINVAL); g_return_val_if_fail (revents != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); ctx = g_main_context_new (); /* configure timeout if any */ if (timeout) { ts = g_timeout_source_new (GST_TIMEVAL_TO_TIME (*timeout) / GST_MSECOND); g_source_set_dummy_callback (ts); g_source_attach (ts, ctx); g_source_unref (ts); } if (events & GST_RTSP_EV_READ) { rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI, conn->cancellable); g_source_set_dummy_callback (rs); g_source_attach (rs, ctx); g_source_unref (rs); } if (events & GST_RTSP_EV_WRITE) { ws = g_socket_create_source (conn->write_socket, G_IO_OUT, conn->cancellable); g_source_set_dummy_callback (ws); g_source_attach (ws, ctx); g_source_unref (ws); } /* Returns after handling all pending events */ while (!g_main_context_iteration (ctx, TRUE)); g_main_context_unref (ctx); *revents = 0; if (events & GST_RTSP_EV_READ) { condition = g_socket_condition_check (conn->read_socket, G_IO_IN | G_IO_PRI); if ((condition & G_IO_IN) || (condition & G_IO_PRI)) *revents |= GST_RTSP_EV_READ; } if (events & GST_RTSP_EV_WRITE) { condition = g_socket_condition_check (conn->write_socket, G_IO_OUT); if ((condition & G_IO_OUT)) *revents |= GST_RTSP_EV_WRITE; } if (*revents == 0) return GST_RTSP_ETIMEOUT; return GST_RTSP_OK; } /** * gst_rtsp_connection_next_timeout: * @conn: a #GstRTSPConnection * @timeout: a timeout * * Calculate the next timeout for @conn, storing the result in @timeout. * * Returns: #GST_RTSP_OK. */ GstRTSPResult gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout) { gdouble elapsed; glong sec; gulong usec; gint ctimeout; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL); ctimeout = conn->timeout; if (ctimeout >= 20) { /* Because we should act before the timeout we timeout 5 * seconds in advance. */ ctimeout -= 5; } else if (ctimeout >= 5) { /* else timeout 20% earlier */ ctimeout -= ctimeout / 5; } else if (ctimeout >= 1) { /* else timeout 1 second earlier */ ctimeout -= 1; } elapsed = g_timer_elapsed (conn->timer, &usec); if (elapsed >= ctimeout) { sec = 0; usec = 0; } else { sec = ctimeout - elapsed; if (usec <= G_USEC_PER_SEC) usec = G_USEC_PER_SEC - usec; else usec = 0; } timeout->tv_sec = sec; timeout->tv_usec = usec; return GST_RTSP_OK; } /** * gst_rtsp_connection_reset_timeout: * @conn: a #GstRTSPConnection * * Reset the timeout of @conn. * * Returns: #GST_RTSP_OK. */ GstRTSPResult gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_timer_start (conn->timer); return GST_RTSP_OK; } /** * gst_rtsp_connection_flush: * @conn: a #GstRTSPConnection * @flush: start or stop the flush * * Start or stop the flushing action on @conn. When flushing, all current * and future actions on @conn will return #GST_RTSP_EINTR until the connection * is set to non-flushing mode again. * * Returns: #GST_RTSP_OK. */ GstRTSPResult gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); if (flush) { g_cancellable_cancel (conn->cancellable); } else { g_object_unref (conn->cancellable); conn->cancellable = g_cancellable_new (); } return GST_RTSP_OK; } /** * gst_rtsp_connection_set_proxy: * @conn: a #GstRTSPConnection * @host: the proxy host * @port: the proxy port * * Set the proxy host and port. * * Returns: #GST_RTSP_OK. */ GstRTSPResult gst_rtsp_connection_set_proxy (GstRTSPConnection * conn, const gchar * host, guint port) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_free (conn->proxy_host); conn->proxy_host = g_strdup (host); conn->proxy_port = port; return GST_RTSP_OK; } /** * gst_rtsp_connection_set_auth: * @conn: a #GstRTSPConnection * @method: authentication method * @user: the user * @pass: the password * * Configure @conn for authentication mode @method with @user and @pass as the * user and password respectively. * * Returns: #GST_RTSP_OK. */ GstRTSPResult gst_rtsp_connection_set_auth (GstRTSPConnection * conn, GstRTSPAuthMethod method, const gchar * user, const gchar * pass) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); if (method == GST_RTSP_AUTH_DIGEST && ((user == NULL || pass == NULL) || g_strrstr (user, ":") != NULL)) return GST_RTSP_EINVAL; /* Make sure the username and passwd are being set for authentication */ if (method == GST_RTSP_AUTH_NONE && (user == NULL || pass == NULL)) return GST_RTSP_EINVAL; /* ":" chars are not allowed in usernames for basic auth */ if (method == GST_RTSP_AUTH_BASIC && g_strrstr (user, ":") != NULL) return GST_RTSP_EINVAL; g_free (conn->username); g_free (conn->passwd); conn->auth_method = method; conn->username = g_strdup (user); conn->passwd = g_strdup (pass); return GST_RTSP_OK; } /** * str_case_hash: * @key: ASCII string to hash * * Hashes @key in a case-insensitive manner. * * Returns: the hash code. **/ static guint str_case_hash (gconstpointer key) { const char *p = key; guint h = g_ascii_toupper (*p); if (h) for (p += 1; *p != '\0'; p++) h = (h << 5) - h + g_ascii_toupper (*p); return h; } /** * str_case_equal: * @v1: an ASCII string * @v2: another ASCII string * * Compares @v1 and @v2 in a case-insensitive manner * * Returns: %TRUE if they are equal (modulo case) **/ static gboolean str_case_equal (gconstpointer v1, gconstpointer v2) { const char *string1 = v1; const char *string2 = v2; return g_ascii_strcasecmp (string1, string2) == 0; } /** * gst_rtsp_connection_set_auth_param: * @conn: a #GstRTSPConnection * @param: authentication directive * @value: value * * Setup @conn with authentication directives. This is not necesary for * methods #GST_RTSP_AUTH_NONE and #GST_RTSP_AUTH_BASIC. For * #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge * in the WWW-Authenticate response header and can include realm, domain, * nonce, opaque, stale, algorithm, qop as per RFC2617. */ void gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn, const gchar * param, const gchar * value) { g_return_if_fail (conn != NULL); g_return_if_fail (param != NULL); if (conn->auth_params == NULL) { conn->auth_params = g_hash_table_new_full (str_case_hash, str_case_equal, g_free, g_free); } g_hash_table_insert (conn->auth_params, g_strdup (param), g_strdup (value)); } /** * gst_rtsp_connection_clear_auth_params: * @conn: a #GstRTSPConnection * * Clear the list of authentication directives stored in @conn. */ void gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn) { g_return_if_fail (conn != NULL); if (conn->auth_params != NULL) { g_hash_table_destroy (conn->auth_params); conn->auth_params = NULL; } } static GstRTSPResult set_qos_dscp (GSocket * socket, guint qos_dscp) { #ifndef IP_TOS GST_FIXME ("IP_TOS socket option is not defined, not setting dscp"); return GST_RTSP_OK; #else gint fd; union gst_sockaddr sa; socklen_t slen = sizeof (sa); gint af; gint tos; if (!socket) return GST_RTSP_OK; fd = g_socket_get_fd (socket); if (getsockname (fd, &sa.sa, &slen) < 0) goto no_getsockname; af = sa.sa.sa_family; /* if this is an IPv4-mapped address then do IPv4 QoS */ if (af == AF_INET6) { if (IN6_IS_ADDR_V4MAPPED (&sa.sa_in6.sin6_addr)) af = AF_INET; } /* extract and shift 6 bits of the DSCP */ tos = (qos_dscp & 0x3f) << 2; #ifdef G_OS_WIN32 # define SETSOCKOPT_ARG4_TYPE const char * #else # define SETSOCKOPT_ARG4_TYPE const void * #endif switch (af) { case AF_INET: if (setsockopt (fd, IPPROTO_IP, IP_TOS, (SETSOCKOPT_ARG4_TYPE) & tos, sizeof (tos)) < 0) goto no_setsockopt; break; case AF_INET6: #ifdef IPV6_TCLASS if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, (SETSOCKOPT_ARG4_TYPE) & tos, sizeof (tos)) < 0) goto no_setsockopt; break; #endif default: goto wrong_family; } return GST_RTSP_OK; /* ERRORS */ no_getsockname: no_setsockopt: { return GST_RTSP_ESYS; } wrong_family: { return GST_RTSP_ERROR; } #endif } /** * gst_rtsp_connection_set_qos_dscp: * @conn: a #GstRTSPConnection * @qos_dscp: DSCP value * * Configure @conn to use the specified DSCP value. * * Returns: #GST_RTSP_OK on success. */ GstRTSPResult gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp) { GstRTSPResult res; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL); res = set_qos_dscp (conn->socket0, qos_dscp); if (res == GST_RTSP_OK) res = set_qos_dscp (conn->socket1, qos_dscp); return res; } /** * gst_rtsp_connection_get_url: * @conn: a #GstRTSPConnection * * Retrieve the URL of the other end of @conn. * * Returns: The URL. This value remains valid until the * connection is freed. */ GstRTSPUrl * gst_rtsp_connection_get_url (const GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, NULL); return conn->url; } /** * gst_rtsp_connection_get_ip: * @conn: a #GstRTSPConnection * * Retrieve the IP address of the other end of @conn. * * Returns: The IP address as a string. this value remains valid until the * connection is closed. */ const gchar * gst_rtsp_connection_get_ip (const GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, NULL); return conn->remote_ip; } /** * gst_rtsp_connection_set_ip: * @conn: a #GstRTSPConnection * @ip: an ip address * * Set the IP address of the server. */ void gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip) { g_return_if_fail (conn != NULL); g_free (conn->remote_ip); conn->remote_ip = g_strdup (ip); } /** * gst_rtsp_connection_get_read_socket: * @conn: a #GstRTSPConnection * * Get the file descriptor for reading. * * Returns: (transfer none): the file descriptor used for reading or %NULL on * error. The file descriptor remains valid until the connection is closed. */ GSocket * gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, NULL); g_return_val_if_fail (conn->read_socket != NULL, NULL); return conn->read_socket; } /** * gst_rtsp_connection_get_write_socket: * @conn: a #GstRTSPConnection * * Get the file descriptor for writing. * * Returns: (transfer none): the file descriptor used for writing or NULL on * error. The file descriptor remains valid until the connection is closed. */ GSocket * gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, NULL); g_return_val_if_fail (conn->write_socket != NULL, NULL); return conn->write_socket; } /** * gst_rtsp_connection_set_http_mode: * @conn: a #GstRTSPConnection * @enable: %TRUE to enable manual HTTP mode * * By setting the HTTP mode to %TRUE the message parsing will support HTTP * messages in addition to the RTSP messages. It will also disable the * automatic handling of setting up an HTTP tunnel. */ void gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable) { g_return_if_fail (conn != NULL); conn->manual_http = enable; } /** * gst_rtsp_connection_set_tunneled: * @conn: a #GstRTSPConnection * @tunneled: the new state * * Set the HTTP tunneling state of the connection. This must be configured before * the @conn is connected. */ void gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled) { g_return_if_fail (conn != NULL); g_return_if_fail (conn->read_socket == NULL); g_return_if_fail (conn->write_socket == NULL); conn->tunneled = tunneled; } /** * gst_rtsp_connection_is_tunneled: * @conn: a #GstRTSPConnection * * Get the tunneling state of the connection. * * Returns: if @conn is using HTTP tunneling. */ gboolean gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, FALSE); return conn->tunneled; } /** * gst_rtsp_connection_get_tunnelid: * @conn: a #GstRTSPConnection * * Get the tunnel session id the connection. * * Returns: returns a non-empty string if @conn is being tunneled over HTTP. */ const gchar * gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn) { g_return_val_if_fail (conn != NULL, NULL); if (!conn->tunneled) return NULL; return conn->tunnelid; } /** * gst_rtsp_connection_do_tunnel: * @conn: a #GstRTSPConnection * @conn2: a #GstRTSPConnection or %NULL * * If @conn received the first tunnel connection and @conn2 received * the second tunnel connection, link the two connections together so that * @conn manages the tunneled connection. * * After this call, @conn2 cannot be used anymore and must be freed with * gst_rtsp_connection_free(). * * If @conn2 is %NULL then only the base64 decoding context will be setup for * @conn. * * Returns: return GST_RTSP_OK on success. */ GstRTSPResult gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, GstRTSPConnection * conn2) { g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); if (conn2 != NULL) { GstRTSPTunnelState ts1 = conn->tstate; GstRTSPTunnelState ts2 = conn2->tstate; g_return_val_if_fail ((ts1 == TUNNEL_STATE_GET && ts2 == TUNNEL_STATE_POST) || (ts1 == TUNNEL_STATE_POST && ts2 == TUNNEL_STATE_GET), GST_RTSP_EINVAL); g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN), GST_RTSP_EINVAL); /* both connections have socket0 as the read/write socket */ if (ts1 == TUNNEL_STATE_GET) { /* conn2 is the HTTP POST channel. take its socket and set it as read * socket in conn */ conn->socket1 = conn2->socket0; conn->stream1 = conn2->stream0; conn->input_stream = conn2->input_stream; conn->control_stream = g_io_stream_get_input_stream (conn->stream0); conn2->output_stream = NULL; } else { /* conn2 is the HTTP GET channel. take its socket and set it as write * socket in conn */ conn->socket1 = conn->socket0; conn->stream1 = conn->stream0; conn->socket0 = conn2->socket0; conn->stream0 = conn2->stream0; conn->output_stream = conn2->output_stream; conn->control_stream = g_io_stream_get_input_stream (conn->stream0); } /* clean up some of the state of conn2 */ g_cancellable_cancel (conn2->cancellable); conn2->write_socket = conn2->read_socket = NULL; conn2->socket0 = NULL; conn2->stream0 = NULL; conn2->socket1 = NULL; conn2->stream1 = NULL; conn2->input_stream = NULL; conn2->control_stream = NULL; g_object_unref (conn2->cancellable); conn2->cancellable = NULL; /* We make socket0 the write socket and socket1 the read socket. */ conn->write_socket = conn->socket0; conn->read_socket = conn->socket1; conn->tstate = TUNNEL_STATE_COMPLETE; g_free (conn->initial_buffer); conn->initial_buffer = conn2->initial_buffer; conn2->initial_buffer = NULL; conn->initial_buffer_offset = conn2->initial_buffer_offset; } /* we need base64 decoding for the readfd */ conn->ctx.state = 0; conn->ctx.save = 0; conn->ctx.cout = 0; conn->ctx.coutl = 0; conn->ctxp = &conn->ctx; return GST_RTSP_OK; } /** * gst_rtsp_connection_set_remember_session_id: * @conn: a #GstRTSPConnection * @remember: %TRUE if the connection should remember the session id * * Sets if the #GstRTSPConnection should remember the session id from the last * response received and force it onto any further requests. * * The default value is %TRUE */ void gst_rtsp_connection_set_remember_session_id (GstRTSPConnection * conn, gboolean remember) { conn->remember_session_id = remember; if (!remember) conn->session_id[0] = '\0'; } /** * gst_rtsp_connection_get_remember_session_id: * @conn: a #GstRTSPConnection * * Returns: %TRUE if the #GstRTSPConnection remembers the session id in the * last response to set it on any further request. */ gboolean gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn) { return conn->remember_session_id; } #define READ_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) #define READ_COND (G_IO_IN | READ_ERR) #define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) #define WRITE_COND (G_IO_OUT | WRITE_ERR) /* async functions */ struct _GstRTSPWatch { GSource source; GstRTSPConnection *conn; GstRTSPBuilder builder; GstRTSPMessage message; GSource *readsrc; GSource *writesrc; GSource *controlsrc; gboolean keep_running; /* queued message for transmission */ guint id; GMutex mutex; GstQueueArray *messages; gsize messages_bytes; guint messages_count; gsize max_bytes; guint max_messages; GCond queue_not_full; gboolean flushing; GstRTSPWatchFuncs funcs; gpointer user_data; GDestroyNotify notify; }; #define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \ ((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages)) static gboolean gst_rtsp_source_prepare (GSource * source, gint * timeout) { GstRTSPWatch *watch = (GstRTSPWatch *) source; if (watch->conn->initial_buffer != NULL) return TRUE; *timeout = (watch->conn->timeout * 1000); return FALSE; } static gboolean gst_rtsp_source_check (GSource * source) { return FALSE; } static gboolean gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream, GstRTSPWatch * watch) { gssize count; guint8 buffer[1024]; GError *error = NULL; /* try to read in order to be able to detect errors, we read 1k in case some * client actually decides to send data on the GET channel */ count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL, &error); if (count == 0) { /* other end closed the socket */ goto eof; } if (count < 0) { GST_DEBUG ("%s", error->message); if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { g_clear_error (&error); goto done; } g_clear_error (&error); goto read_error; } /* client sent data on the GET channel, ignore it */ done: return TRUE; /* ERRORS */ eof: { if (watch->funcs.closed) watch->funcs.closed (watch, watch->user_data); /* the read connection was closed, stop the watch now */ watch->keep_running = FALSE; return FALSE; } read_error: { if (watch->funcs.error_full) watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message, 0, watch->user_data); else if (watch->funcs.error) watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data); goto eof; } } static gboolean gst_rtsp_source_dispatch_read (GPollableInputStream * stream, GstRTSPWatch * watch) { GstRTSPResult res = GST_RTSP_ERROR; GstRTSPConnection *conn = watch->conn; /* if this connection was already closed, stop now */ if (G_POLLABLE_INPUT_STREAM (conn->input_stream) != stream) goto eof; res = build_next (&watch->builder, &watch->message, conn, FALSE); if (res == GST_RTSP_EINTR) goto done; else if (G_UNLIKELY (res == GST_RTSP_EEOF)) { g_mutex_lock (&watch->mutex); if (watch->readsrc) { if (!g_source_is_destroyed ((GSource *) watch)) g_source_remove_child_source ((GSource *) watch, watch->readsrc); g_source_unref (watch->readsrc); watch->readsrc = NULL; } if (conn->stream1) { g_object_unref (conn->stream1); conn->stream1 = NULL; conn->socket1 = NULL; conn->input_stream = NULL; } g_mutex_unlock (&watch->mutex); /* When we are in tunnelled mode, the read socket can be closed and we * should be prepared for a new POST method to reopen it */ if (conn->tstate == TUNNEL_STATE_COMPLETE) { /* remove the read connection for the tunnel */ /* we accept a new POST request */ conn->tstate = TUNNEL_STATE_GET; /* and signal that we lost our tunnel */ if (watch->funcs.tunnel_lost) res = watch->funcs.tunnel_lost (watch, watch->user_data); /* we add read source on the write socket able to detect when client closes get channel in tunneled mode */ g_mutex_lock (&watch->mutex); if (watch->conn->control_stream && !watch->controlsrc) { watch->controlsrc = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (watch->conn->control_stream), NULL); g_source_set_callback (watch->controlsrc, (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL); g_source_add_child_source ((GSource *) watch, watch->controlsrc); } g_mutex_unlock (&watch->mutex); goto read_done; } else goto eof; } else if (G_LIKELY (res == GST_RTSP_OK)) { if (!conn->manual_http && watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { if (conn->tstate == TUNNEL_STATE_NONE && watch->message.type_data.request.method == GST_RTSP_GET) { GstRTSPMessage *response; GstRTSPStatusCode code; conn->tstate = TUNNEL_STATE_GET; if (watch->funcs.tunnel_start) code = watch->funcs.tunnel_start (watch, watch->user_data); else code = GST_RTSP_STS_OK; /* queue the response */ response = gen_tunnel_reply (conn, code, &watch->message); if (watch->funcs.tunnel_http_response) watch->funcs.tunnel_http_response (watch, &watch->message, response, watch->user_data); gst_rtsp_watch_send_message (watch, response, NULL); gst_rtsp_message_free (response); goto read_done; } else if (conn->tstate == TUNNEL_STATE_NONE && watch->message.type_data.request.method == GST_RTSP_POST) { conn->tstate = TUNNEL_STATE_POST; /* in the callback the connection should be tunneled with the * GET connection */ if (watch->funcs.tunnel_complete) { watch->funcs.tunnel_complete (watch, watch->user_data); } goto read_done; } } } else goto read_error; if (!conn->manual_http) { /* if manual HTTP support is not enabled, then restore the message to * what it would have looked like without the support for parsing HTTP * messages being present */ if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { watch->message.type = GST_RTSP_MESSAGE_REQUEST; watch->message.type_data.request.method = GST_RTSP_INVALID; if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0) watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID; res = GST_RTSP_EPARSE; } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { watch->message.type = GST_RTSP_MESSAGE_RESPONSE; if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0) watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID; res = GST_RTSP_EPARSE; } } if (G_LIKELY (res != GST_RTSP_OK)) goto read_error; if (watch->funcs.message_received) watch->funcs.message_received (watch, &watch->message, watch->user_data); read_done: gst_rtsp_message_unset (&watch->message); build_reset (&watch->builder); done: return TRUE; /* ERRORS */ eof: { if (watch->funcs.closed) watch->funcs.closed (watch, watch->user_data); /* we closed the read connection, stop the watch now */ watch->keep_running = FALSE; /* always stop when the input returns EOF in non-tunneled mode */ return FALSE; } read_error: { if (watch->funcs.error_full) watch->funcs.error_full (watch, res, &watch->message, 0, watch->user_data); else if (watch->funcs.error) watch->funcs.error (watch, res, watch->user_data); goto eof; } } static gboolean gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, gpointer user_data G_GNUC_UNUSED) { GstRTSPWatch *watch = (GstRTSPWatch *) source; GstRTSPConnection *conn = watch->conn; if (conn->initial_buffer != NULL) { gst_rtsp_source_dispatch_read (G_POLLABLE_INPUT_STREAM (conn->input_stream), watch); } return watch->keep_running; } static gboolean gst_rtsp_source_dispatch_write (GPollableOutputStream * stream, GstRTSPWatch * watch) { GstRTSPResult res = GST_RTSP_ERROR; GstRTSPConnection *conn = watch->conn; /* if this connection was already closed, stop now */ if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream || !watch->messages) goto eof; g_mutex_lock (&watch->mutex); do { guint n_messages = gst_queue_array_get_length (watch->messages); GOutputVector *vectors; GstMapInfo *map_infos; guint *ids; gsize bytes_to_write, bytes_written; guint n_vectors, n_memories, n_ids, drop_messages; gint i, j, l, n_mmap; GstRTSPSerializedMessage *msg; /* if this connection was already closed, stop now */ if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream || !watch->messages) { g_mutex_unlock (&watch->mutex); goto eof; } if (n_messages == 0) { if (watch->writesrc) { if (!g_source_is_destroyed ((GSource *) watch)) g_source_remove_child_source ((GSource *) watch, watch->writesrc); g_source_unref (watch->writesrc); watch->writesrc = NULL; /* we create and add the write source again when we actually have * something to write */ /* since write source is now removed we add read source on the write * socket instead to be able to detect when client closes get channel * in tunneled mode */ if (watch->conn->control_stream) { watch->controlsrc = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (watch->conn->control_stream), NULL); g_source_set_callback (watch->controlsrc, (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL); g_source_add_child_source ((GSource *) watch, watch->controlsrc); } else { watch->controlsrc = NULL; } } break; } for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) { msg = gst_queue_array_peek_nth_struct (watch->messages, i); if (msg->id != 0) n_ids++; if (msg->data_offset < msg->data_size) n_vectors++; if (msg->body_data && msg->body_offset < msg->body_data_size) { n_vectors++; } else if (msg->body_buffer) { guint m, n; guint offset = 0; n = gst_buffer_n_memory (msg->body_buffer); for (m = 0; m < n; m++) { GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m); /* Skip all memories we already wrote */ if (offset + mem->size < msg->body_offset) { offset += mem->size; continue; } offset += mem->size; n_memories++; n_vectors++; } } } vectors = g_newa (GOutputVector, n_vectors); map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL; ids = n_ids ? g_newa (guint, n_ids + 1) : NULL; if (ids) memset (ids, 0, sizeof (guint) * (n_ids + 1)); for (i = 0, j = 0, n_mmap = 0, l = 0, bytes_to_write = 0; i < n_messages; i++) { msg = gst_queue_array_peek_nth_struct (watch->messages, i); if (msg->data_offset < msg->data_size) { vectors[j].buffer = (msg->data_is_data_header ? msg->data_header : msg->data) + msg->data_offset; vectors[j].size = msg->data_size - msg->data_offset; bytes_to_write += vectors[j].size; j++; } if (msg->body_data) { if (msg->body_offset < msg->body_data_size) { vectors[j].buffer = msg->body_data + msg->body_offset; vectors[j].size = msg->body_data_size - msg->body_offset; bytes_to_write += vectors[j].size; j++; } } else if (msg->body_buffer) { guint m, n; guint offset = 0; n = gst_buffer_n_memory (msg->body_buffer); for (m = 0; m < n; m++) { GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m); guint off; /* Skip all memories we already wrote */ if (offset + mem->size <= msg->body_offset) { offset += mem->size; continue; } if (offset < msg->body_offset) off = msg->body_offset - offset; else off = 0; offset += mem->size; g_assert (off < mem->size); gst_memory_map (mem, &map_infos[n_mmap], GST_MAP_READ); vectors[j].buffer = map_infos[n_mmap].data + off; vectors[j].size = map_infos[n_mmap].size - off; bytes_to_write += vectors[j].size; n_mmap++; j++; } } } res = writev_bytes (watch->conn->output_stream, vectors, n_vectors, &bytes_written, FALSE, watch->conn->cancellable); g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK); /* First unmap all memories here, this simplifies the code below * as we don't have to skip all memories that were already written * before */ for (i = 0; i < n_mmap; i++) { gst_memory_unmap (map_infos[i].memory, &map_infos[i]); } if (bytes_written == bytes_to_write) { /* fast path, just unmap all memories, free memory, drop all messages and notify them */ l = 0; while ((msg = gst_queue_array_pop_head_struct (watch->messages))) { if (msg->id) { ids[l] = msg->id; l++; } gst_rtsp_serialized_message_clear (msg); } g_assert (watch->messages_bytes >= bytes_written); watch->messages_bytes -= bytes_written; } else if (bytes_written > 0) { /* not done, let's skip all messages that were sent already and free them */ for (i = 0, drop_messages = 0; i < n_messages; i++) { msg = gst_queue_array_peek_nth_struct (watch->messages, i); if (bytes_written >= 0) { if (bytes_written >= msg->data_size - msg->data_offset) { guint body_size; /* all data of this message is sent, check body and otherwise * skip the whole message for next time */ bytes_written -= (msg->data_size - msg->data_offset); msg->data_offset = msg->data_size; if (msg->body_data) { body_size = msg->body_data_size; } else if (msg->body_buffer) { body_size = gst_buffer_get_size (msg->body_buffer); } else { body_size = 0; } if (bytes_written + msg->body_offset >= body_size) { /* body written, drop this message */ bytes_written -= body_size - msg->body_offset; msg->body_offset = body_size; drop_messages++; if (msg->id) { ids[l] = msg->id; l++; } gst_rtsp_serialized_message_clear (msg); } else { msg->body_offset += bytes_written; bytes_written = 0; } } else { /* Need to continue sending from the data of this message */ msg->data_offset = bytes_written; bytes_written = 0; } } } while (drop_messages > 0) { msg = gst_queue_array_pop_head_struct (watch->messages); g_assert (msg); drop_messages--; } g_assert (watch->messages_bytes >= bytes_written); watch->messages_bytes -= bytes_written; } if (!IS_BACKLOG_FULL (watch)) g_cond_signal (&watch->queue_not_full); g_mutex_unlock (&watch->mutex); /* notify all messages that were successfully written */ if (ids) { while (*ids) { /* only decrease the counter for messages that have an id. Only * the last message of a messages chunk is counted */ watch->messages_count--; if (watch->funcs.message_sent) watch->funcs.message_sent (watch, *ids, watch->user_data); ids++; } } if (res == GST_RTSP_EINTR) { goto write_blocked; } else if (G_UNLIKELY (res != GST_RTSP_OK)) { goto write_error; } g_mutex_lock (&watch->mutex); } while (TRUE); g_mutex_unlock (&watch->mutex); write_blocked: return TRUE; /* ERRORS */ eof: { return FALSE; } write_error: { if (watch->funcs.error_full) { guint i, n_messages; n_messages = gst_queue_array_get_length (watch->messages); for (i = 0; i < n_messages; i++) { GstRTSPSerializedMessage *msg = gst_queue_array_peek_nth_struct (watch->messages, i); if (msg->id) watch->funcs.error_full (watch, res, NULL, msg->id, watch->user_data); } } else if (watch->funcs.error) { watch->funcs.error (watch, res, watch->user_data); } return FALSE; } } static void gst_rtsp_source_finalize (GSource * source) { GstRTSPWatch *watch = (GstRTSPWatch *) source; GstRTSPSerializedMessage *msg; if (watch->notify) watch->notify (watch->user_data); build_reset (&watch->builder); gst_rtsp_message_unset (&watch->message); while ((msg = gst_queue_array_pop_head_struct (watch->messages))) { gst_rtsp_serialized_message_clear (msg); } gst_queue_array_free (watch->messages); watch->messages = NULL; watch->messages_bytes = 0; watch->messages_count = 0; g_cond_clear (&watch->queue_not_full); if (watch->readsrc) g_source_unref (watch->readsrc); if (watch->writesrc) g_source_unref (watch->writesrc); if (watch->controlsrc) g_source_unref (watch->controlsrc); g_mutex_clear (&watch->mutex); } static GSourceFuncs gst_rtsp_source_funcs = { gst_rtsp_source_prepare, gst_rtsp_source_check, gst_rtsp_source_dispatch, gst_rtsp_source_finalize, NULL, NULL }; /** * gst_rtsp_watch_new: (skip) * @conn: a #GstRTSPConnection * @funcs: watch functions * @user_data: user data to pass to @funcs * @notify: notify when @user_data is not referenced anymore * * Create a watch object for @conn. The functions provided in @funcs will be * called with @user_data when activity happened on the watch. * * The new watch is usually created so that it can be attached to a * maincontext with gst_rtsp_watch_attach(). * * @conn must exist for the entire lifetime of the watch. * * Returns: a #GstRTSPWatch that can be used for asynchronous RTSP * communication. Free with gst_rtsp_watch_unref () after usage. */ GstRTSPWatch * gst_rtsp_watch_new (GstRTSPConnection * conn, GstRTSPWatchFuncs * funcs, gpointer user_data, GDestroyNotify notify) { GstRTSPWatch *result; g_return_val_if_fail (conn != NULL, NULL); g_return_val_if_fail (funcs != NULL, NULL); g_return_val_if_fail (conn->read_socket != NULL, NULL); g_return_val_if_fail (conn->write_socket != NULL, NULL); result = (GstRTSPWatch *) g_source_new (&gst_rtsp_source_funcs, sizeof (GstRTSPWatch)); result->conn = conn; result->builder.state = STATE_START; g_mutex_init (&result->mutex); result->messages = gst_queue_array_new_for_struct (sizeof (GstRTSPSerializedMessage), 10); g_cond_init (&result->queue_not_full); gst_rtsp_watch_reset (result); result->keep_running = TRUE; result->flushing = FALSE; result->funcs = *funcs; result->user_data = user_data; result->notify = notify; return result; } /** * gst_rtsp_watch_reset: * @watch: a #GstRTSPWatch * * Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel() * when the file descriptors of the connection might have changed. */ void gst_rtsp_watch_reset (GstRTSPWatch * watch) { g_mutex_lock (&watch->mutex); if (watch->readsrc) { g_source_remove_child_source ((GSource *) watch, watch->readsrc); g_source_unref (watch->readsrc); } if (watch->writesrc) { g_source_remove_child_source ((GSource *) watch, watch->writesrc); g_source_unref (watch->writesrc); watch->writesrc = NULL; } if (watch->controlsrc) { g_source_remove_child_source ((GSource *) watch, watch->controlsrc); g_source_unref (watch->controlsrc); watch->controlsrc = NULL; } if (watch->conn->input_stream) { watch->readsrc = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (watch->conn->input_stream), NULL); g_source_set_callback (watch->readsrc, (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL); g_source_add_child_source ((GSource *) watch, watch->readsrc); } else { watch->readsrc = NULL; } /* we create and add the write source when we actually have something to * write */ /* when write source is not added we add read source on the write socket * instead to be able to detect when client closes get channel in tunneled * mode */ if (watch->conn->control_stream) { watch->controlsrc = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (watch->conn->control_stream), NULL); g_source_set_callback (watch->controlsrc, (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL); g_source_add_child_source ((GSource *) watch, watch->controlsrc); } else { watch->controlsrc = NULL; } g_mutex_unlock (&watch->mutex); } /** * gst_rtsp_watch_attach: * @watch: a #GstRTSPWatch * @context: a GMainContext (if NULL, the default context will be used) * * Adds a #GstRTSPWatch to a context so that it will be executed within that context. * * Returns: the ID (greater than 0) for the watch within the GMainContext. */ guint gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context) { g_return_val_if_fail (watch != NULL, 0); return g_source_attach ((GSource *) watch, context); } /** * gst_rtsp_watch_unref: * @watch: a #GstRTSPWatch * * Decreases the reference count of @watch by one. If the resulting reference * count is zero the watch and associated memory will be destroyed. */ void gst_rtsp_watch_unref (GstRTSPWatch * watch) { g_return_if_fail (watch != NULL); g_source_unref ((GSource *) watch); } /** * gst_rtsp_watch_set_send_backlog: * @watch: a #GstRTSPWatch * @bytes: maximum bytes * @messages: maximum messages * * Set the maximum amount of bytes and messages that will be queued in @watch. * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM. * * A value of 0 for @bytes or @messages means no limits. * * Since: 1.2 */ void gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch, gsize bytes, guint messages) { g_return_if_fail (watch != NULL); g_mutex_lock (&watch->mutex); watch->max_bytes = bytes; watch->max_messages = messages; if (!IS_BACKLOG_FULL (watch)) g_cond_signal (&watch->queue_not_full); g_mutex_unlock (&watch->mutex); GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u", bytes, messages); } /** * gst_rtsp_watch_get_send_backlog: * @watch: a #GstRTSPWatch * @bytes: (out) (allow-none): maximum bytes * @messages: (out) (allow-none): maximum messages * * Get the maximum amount of bytes and messages that will be queued in @watch. * See gst_rtsp_watch_set_send_backlog(). * * Since: 1.2 */ void gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch, gsize * bytes, guint * messages) { g_return_if_fail (watch != NULL); g_mutex_lock (&watch->mutex); if (bytes) *bytes = watch->max_bytes; if (messages) *messages = watch->max_messages; g_mutex_unlock (&watch->mutex); } static GstRTSPResult gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch, GstRTSPSerializedMessage * messages, guint n_messages, guint * id) { GstRTSPResult res; GMainContext *context = NULL; gint i; g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (messages != NULL, GST_RTSP_EINVAL); g_mutex_lock (&watch->mutex); if (watch->flushing) goto flushing; /* try to send the message synchronously first */ if (gst_queue_array_get_length (watch->messages) == 0) { gint j, k; GOutputVector *vectors; GstMapInfo *map_infos; gsize bytes_to_write, bytes_written; guint n_vectors, n_memories, drop_messages; for (i = 0, n_vectors = 0, n_memories = 0; i < n_messages; i++) { n_vectors++; if (messages[i].body_data) { n_vectors++; } else if (messages[i].body_buffer) { n_vectors += gst_buffer_n_memory (messages[i].body_buffer); n_memories += gst_buffer_n_memory (messages[i].body_buffer); } } vectors = g_newa (GOutputVector, n_vectors); map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL; for (i = 0, j = 0, k = 0, bytes_to_write = 0; i < n_messages; i++) { vectors[j].buffer = messages[i].data_is_data_header ? messages[i].data_header : messages[i].data; vectors[j].size = messages[i].data_size; bytes_to_write += vectors[j].size; j++; if (messages[i].body_data) { vectors[j].buffer = messages[i].body_data; vectors[j].size = messages[i].body_data_size; bytes_to_write += vectors[j].size; j++; } else if (messages[i].body_buffer) { gint l, n; n = gst_buffer_n_memory (messages[i].body_buffer); for (l = 0; l < n; l++) { GstMemory *mem = gst_buffer_peek_memory (messages[i].body_buffer, l); gst_memory_map (mem, &map_infos[k], GST_MAP_READ); vectors[j].buffer = map_infos[k].data; vectors[j].size = map_infos[k].size; bytes_to_write += vectors[j].size; k++; j++; } } } res = writev_bytes (watch->conn->output_stream, vectors, n_vectors, &bytes_written, FALSE, watch->conn->cancellable); g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK); /* At this point we sent everything we could without blocking or * error and updated the offsets inside the message accordingly */ /* First of all unmap all memories. This simplifies the code below */ for (k = 0; k < n_memories; k++) { gst_memory_unmap (map_infos[k].memory, &map_infos[k]); } if (res != GST_RTSP_EINTR) { /* actual error or done completely */ if (id != NULL) *id = 0; /* free everything */ for (i = 0, k = 0; i < n_messages; i++) { gst_rtsp_serialized_message_clear (&messages[i]); } goto done; } /* not done, let's skip all messages that were sent already and free them */ for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) { if (bytes_written >= 0) { if (bytes_written >= messages[i].data_size) { guint body_size; /* all data of this message is sent, check body and otherwise * skip the whole message for next time */ messages[i].data_offset = messages[i].data_size; bytes_written -= messages[i].data_size; if (messages[i].body_data) { body_size = messages[i].body_data_size; } else if (messages[i].body_buffer) { body_size = gst_buffer_get_size (messages[i].body_buffer); } else { body_size = 0; } if (bytes_written >= body_size) { /* body written, drop this message */ messages[i].body_offset = body_size; bytes_written -= body_size; drop_messages++; gst_rtsp_serialized_message_clear (&messages[i]); } else { messages[i].body_offset = bytes_written; bytes_written = 0; } } else { /* Need to continue sending from the data of this message */ messages[i].data_offset = bytes_written; bytes_written = 0; } } } g_assert (n_messages > drop_messages); messages += drop_messages; n_messages -= drop_messages; } /* check limits */ if (IS_BACKLOG_FULL (watch)) goto too_much_backlog; for (i = 0; i < n_messages; i++) { GstRTSPSerializedMessage local_message; /* make a record with the data and id for sending async */ local_message = messages[i]; /* copy the body data or take an additional reference to the body buffer * we don't own them here */ if (local_message.body_data) { local_message.body_data = g_memdup (local_message.body_data, local_message.body_data_size); } else if (local_message.body_buffer) { gst_buffer_ref (local_message.body_buffer); } local_message.borrowed = FALSE; /* set an id for the very last message */ if (i == n_messages - 1) { do { /* make sure rec->id is never 0 */ local_message.id = ++watch->id; } while (G_UNLIKELY (local_message.id == 0)); if (id != NULL) *id = local_message.id; } else { local_message.id = 0; } /* add the record to a queue. */ gst_queue_array_push_tail_struct (watch->messages, &local_message); watch->messages_bytes += (local_message.data_size - local_message.data_offset); if (local_message.body_data) watch->messages_bytes += (local_message.body_data_size - local_message.body_offset); else if (local_message.body_buffer) watch->messages_bytes += (gst_buffer_get_size (local_message.body_buffer) - local_message.body_offset); } /* each message chunks is one unit */ watch->messages_count++; /* make sure the main context will now also check for writability on the * socket */ context = ((GSource *) watch)->context; if (!watch->writesrc) { /* remove the read source on the write socket, we will be able to detect * errors while writing */ if (watch->controlsrc) { g_source_remove_child_source ((GSource *) watch, watch->controlsrc); g_source_unref (watch->controlsrc); watch->controlsrc = NULL; } watch->writesrc = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (watch->conn->output_stream), NULL); g_source_set_callback (watch->writesrc, (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL); g_source_add_child_source ((GSource *) watch, watch->writesrc); } res = GST_RTSP_OK; done: g_mutex_unlock (&watch->mutex); if (context) g_main_context_wakeup (context); return res; /* ERRORS */ flushing: { GST_DEBUG ("we are flushing"); g_mutex_unlock (&watch->mutex); for (i = 0; i < n_messages; i++) { gst_rtsp_serialized_message_clear (&messages[i]); } return GST_RTSP_EINTR; } too_much_backlog: { GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %" G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes, watch->messages_bytes, watch->max_messages, watch->messages_count); g_mutex_unlock (&watch->mutex); for (i = 0; i < n_messages; i++) { gst_rtsp_serialized_message_clear (&messages[i]); } return GST_RTSP_ENOMEM; } return GST_RTSP_OK; } /** * gst_rtsp_watch_write_data: * @watch: a #GstRTSPWatch * @data: (array length=size) (transfer full): the data to queue * @size: the size of @data * @id: (out) (allow-none): location for a message ID or %NULL * * Write @data using the connection of the @watch. If it cannot be sent * immediately, it will be queued for transmission in @watch. The contents of * @message will then be serialized and transmitted when the connection of the * @watch becomes writable. In case the @message is queued, the ID returned in * @id will be non-zero and used as the ID argument in the message_sent * callback. * * This function will take ownership of @data and g_free() it after use. * * If the amount of queued data exceeds the limits set with * gst_rtsp_watch_set_send_backlog(), this function will return * #GST_RTSP_ENOMEM. * * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits * are reached. #GST_RTSP_EINTR when @watch was flushing. */ /* FIXME 2.0: This should've been static! */ GstRTSPResult gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, guint size, guint * id) { GstRTSPSerializedMessage serialized_message; memset (&serialized_message, 0, sizeof (serialized_message)); serialized_message.data = (guint8 *) data; serialized_message.data_size = size; return gst_rtsp_watch_write_serialized_messages (watch, &serialized_message, 1, id); } /** * gst_rtsp_watch_send_message: * @watch: a #GstRTSPWatch * @message: a #GstRTSPMessage * @id: (out) (allow-none): location for a message ID or %NULL * * Send a @message using the connection of the @watch. If it cannot be sent * immediately, it will be queued for transmission in @watch. The contents of * @message will then be serialized and transmitted when the connection of the * @watch becomes writable. In case the @message is queued, the ID returned in * @id will be non-zero and used as the ID argument in the message_sent * callback. * * Returns: #GST_RTSP_OK on success. */ GstRTSPResult gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message, guint * id) { g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); return gst_rtsp_watch_send_messages (watch, message, 1, id); } /** * gst_rtsp_watch_send_messages: * @watch: a #GstRTSPWatch * @messages: (array length=n_messages): the messages to send * @n_messages: the number of messages to send * @id: (out) (allow-none): location for a message ID or %NULL * * Sends @messages using the connection of the @watch. If they cannot be sent * immediately, they will be queued for transmission in @watch. The contents of * @messages will then be serialized and transmitted when the connection of the * @watch becomes writable. In case the @messages are queued, the ID returned in * @id will be non-zero and used as the ID argument in the message_sent * callback once the last message is sent. The callback will only be called * once for the last message. * * Returns: #GST_RTSP_OK on success. * * Since: 1.16 */ GstRTSPResult gst_rtsp_watch_send_messages (GstRTSPWatch * watch, GstRTSPMessage * messages, guint n_messages, guint * id) { GstRTSPSerializedMessage *serialized_messages; gint i; g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL); serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages); memset (serialized_messages, 0, sizeof (GstRTSPSerializedMessage) * n_messages); for (i = 0; i < n_messages; i++) { if (!serialize_message (watch->conn, &messages[i], &serialized_messages[i])) goto error; } return gst_rtsp_watch_write_serialized_messages (watch, serialized_messages, n_messages, id); error: for (i = 0; i < n_messages; i++) { gst_rtsp_serialized_message_clear (&serialized_messages[i]); } return GST_RTSP_EINVAL; } /** * gst_rtsp_watch_wait_backlog: * @watch: a #GstRTSPWatch * @timeout: a #GTimeVal timeout * * Wait until there is place in the backlog queue, @timeout is reached * or @watch is set to flushing. * * If @timeout is %NULL this function can block forever. If @timeout * contains a valid timeout, this function will return %GST_RTSP_ETIMEOUT * after the timeout expired. * * The typically use of this function is when gst_rtsp_watch_write_data * returns %GST_RTSP_ENOMEM. The caller then calls this function to wait for * free space in the backlog queue and try again. * * Returns: %GST_RTSP_OK when if there is room in queue. * %GST_RTSP_ETIMEOUT when @timeout was reached. * %GST_RTSP_EINTR when @watch is flushing * %GST_RTSP_EINVAL when called with invalid parameters. * * Since: 1.4 */ GstRTSPResult gst_rtsp_watch_wait_backlog (GstRTSPWatch * watch, GTimeVal * timeout) { gint64 end_time; GstClockTime to; g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; end_time = g_get_monotonic_time () + GST_TIME_AS_USECONDS (to); g_mutex_lock (&watch->mutex); if (watch->flushing) goto flushing; while (IS_BACKLOG_FULL (watch)) { gboolean res; res = g_cond_wait_until (&watch->queue_not_full, &watch->mutex, end_time); if (watch->flushing) goto flushing; if (!res) goto timeout; } g_mutex_unlock (&watch->mutex); return GST_RTSP_OK; /* ERRORS */ flushing: { GST_DEBUG ("we are flushing"); g_mutex_unlock (&watch->mutex); return GST_RTSP_EINTR; } timeout: { GST_DEBUG ("we timed out"); g_mutex_unlock (&watch->mutex); return GST_RTSP_ETIMEOUT; } } /** * gst_rtsp_watch_set_flushing: * @watch: a #GstRTSPWatch * @flushing: new flushing state * * When @flushing is %TRUE, abort a call to gst_rtsp_watch_wait_backlog() * and make sure gst_rtsp_watch_write_data() returns immediately with * #GST_RTSP_EINTR. And empty the queue. * * Since: 1.4 */ void gst_rtsp_watch_set_flushing (GstRTSPWatch * watch, gboolean flushing) { g_return_if_fail (watch != NULL); g_mutex_lock (&watch->mutex); watch->flushing = flushing; g_cond_signal (&watch->queue_not_full); if (flushing) { GstRTSPSerializedMessage *msg; while ((msg = gst_queue_array_pop_head_struct (watch->messages))) { gst_rtsp_serialized_message_clear (msg); } } g_mutex_unlock (&watch->mutex); }