diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index fa0d09d986..c0f3ec7a9d 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -293,9 +293,6 @@ gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip, g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); - /* set to non-blocking mode so that we can cancel the communication */ - g_socket_set_blocking (socket, FALSE); - if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err)) goto getnameinfo_failed; @@ -479,7 +476,6 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri) goto connect_failed; socket = g_socket_connection_get_socket (connection); - g_socket_set_blocking (socket, FALSE); /* get remote address */ g_free (conn->remote_ip); @@ -616,7 +612,6 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout) /* get remote address */ socket = g_socket_connection_get_socket (connection); - g_socket_set_blocking (socket, FALSE); if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error)) goto remote_address_failed; @@ -813,9 +808,11 @@ gen_date_string (gchar * date_string, guint len) static GstRTSPResult write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx, - guint size, GCancellable * cancellable) + guint size, gboolean block, GCancellable * cancellable) { guint left; + gssize r; + GError *err = NULL; if (G_UNLIKELY (*idx > size)) return GST_RTSP_ERROR; @@ -823,31 +820,41 @@ write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx, left = size - *idx; while (left) { - GError *err = NULL; - gssize r; + r = g_pollable_stream_write (stream, (gchar *) & buffer[*idx], left, + block, cancellable, &err); + if (G_UNLIKELY (r < 0)) + goto error; - r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left, - cancellable, &err); - if (G_UNLIKELY (r == 0)) { - return GST_RTSP_EINTR; - } else if (G_UNLIKELY (r < 0)) { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - g_clear_error (&err); - return GST_RTSP_EINTR; - } - g_clear_error (&err); - return GST_RTSP_ESYS; - } else { - left -= r; - *idx += r; - } + left -= r; + *idx += r; } return GST_RTSP_OK; + + /* ERRORS */ +error: + { + if (G_UNLIKELY (r == 0)) + return GST_RTSP_EEOF; + + GST_DEBUG ("%s", err->message); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_clear_error (&err); + return GST_RTSP_EINTR; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); + return GST_RTSP_EINTR; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + return GST_RTSP_ETIMEOUT; + } + g_clear_error (&err); + return GST_RTSP_ESYS; + } } static gint fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, - GError ** err) + gboolean block, GError ** err) { gint out = 0; @@ -868,11 +875,17 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, if (G_LIKELY (size > (guint) out)) { gssize r; - r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out], - size - out, conn->cancellable, err); - if (r <= 0) { - if (out == 0) + r = g_pollable_stream_read (conn->input_stream, + (gchar *) & buffer[out], size - out, block, conn->cancellable, err); + + if (G_UNLIKELY (r < 0)) { + if (out == 0) { + /* propagate the error */ out = r; + } else { + /* we have some data ignore error */ + g_clear_error (err); + } } else out += r; } @@ -882,7 +895,7 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, static gint fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, - GError ** err) + gboolean block, GError ** err) { DecodeCtx *ctx = conn->ctxp; gint out = 0; @@ -904,7 +917,7 @@ fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, break; /* try to read more bytes */ - r = fill_raw_bytes (conn, in, sizeof (in), err); + r = fill_raw_bytes (conn, in, sizeof (in), block, err); if (r <= 0) { if (out == 0) out = r; @@ -917,16 +930,18 @@ fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size, &ctx->save); } } else { - out = fill_raw_bytes (conn, buffer, size, err); + out = fill_raw_bytes (conn, buffer, size, block, err); } return out; } static GstRTSPResult -read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) +read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size, + gboolean block) { guint left; + gint r; GError *err = NULL; if (G_UNLIKELY (*idx > size)) @@ -935,24 +950,35 @@ read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) left = size - *idx; while (left) { - gint r; + r = fill_bytes (conn, &buffer[*idx], left, block, &err); + if (G_UNLIKELY (r <= 0)) + goto error; - r = fill_bytes (conn, &buffer[*idx], left, &err); - if (G_UNLIKELY (r == 0)) { - return GST_RTSP_EEOF; - } else if (G_UNLIKELY (r < 0)) { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - g_clear_error (&err); - return GST_RTSP_EINTR; - } - g_clear_error (&err); - return GST_RTSP_ESYS; - } else { - left -= r; - *idx += r; - } + left -= r; + *idx += r; } return GST_RTSP_OK; + + /* ERRORS */ +error: + { + if (G_UNLIKELY (r == 0)) + return GST_RTSP_EEOF; + + GST_DEBUG ("%s", err->message); + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_clear_error (&err); + return GST_RTSP_EINTR; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&err); + return GST_RTSP_EINTR; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { + g_clear_error (&err); + return GST_RTSP_ETIMEOUT; + } + g_clear_error (&err); + return GST_RTSP_ESYS; + } } /* The code below tries to handle clients using \r, \n or \r\n to indicate the @@ -962,13 +988,14 @@ read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) * the method used in RTSP (and HTTP) to break long lines. */ static GstRTSPResult -read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) +read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size, + gboolean block) { - GError *err = NULL; + GstRTSPResult res; while (TRUE) { guint8 c; - gint r; + guint i; if (conn->read_ahead == READ_AHEAD_EOH) { /* the last call to read_line() already determined that we have reached @@ -987,18 +1014,10 @@ read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) conn->read_ahead = 0; } else { /* read the next character */ - r = fill_bytes (conn, &c, 1, &err); - if (G_UNLIKELY (r == 0)) { - return GST_RTSP_EEOF; - } else if (G_UNLIKELY (r < 0)) { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - g_clear_error (&err); - return GST_RTSP_EINTR; - } - - g_clear_error (&err); - return GST_RTSP_ESYS; - } + i = 0; + res = read_bytes (conn, &c, &i, 1, block); + if (G_UNLIKELY (res != GST_RTSP_OK)) + return res; } /* special treatment of line endings */ @@ -1007,21 +1026,10 @@ read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size) retry: /* need to read ahead one more character to know what to do... */ - r = fill_bytes (conn, &read_ahead, 1, &err); - if (G_UNLIKELY (r == 0)) { - return GST_RTSP_EEOF; - } else if (G_UNLIKELY (r < 0)) { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - /* remember the original character we read and try again next time */ - if (conn->read_ahead == 0) - conn->read_ahead = c; - g_clear_error (&err); - return GST_RTSP_EINTR; - } - - g_clear_error (&err); - return GST_RTSP_ESYS; - } + i = 0; + res = read_bytes (conn, &read_ahead, &i, 1, block); + if (G_UNLIKELY (res != GST_RTSP_OK)) + return res; if (read_ahead == ' ' || read_ahead == '\t') { if (conn->read_ahead == READ_AHEAD_CRLFCR) { @@ -1106,62 +1114,22 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data, guint offset; GstClockTime to; GstRTSPResult res; - GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL); g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL); - to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; - offset = 0; - while (TRUE) { - /* try to write */ - res = - write_bytes (conn->output_stream, data, &offset, size, - conn->cancellable); - if (G_LIKELY (res == GST_RTSP_OK)) - break; - if (G_UNLIKELY (res != GST_RTSP_EINTR)) - goto write_error; - /* not all is written, wait until we can write more */ - g_socket_set_timeout (conn->write_socket, - (to + GST_SECOND - 1) / GST_SECOND); - if (!g_socket_condition_wait (conn->write_socket, - G_IO_OUT | G_IO_ERR | G_IO_HUP, conn->cancellable, &err)) { - g_socket_set_timeout (conn->write_socket, 0); - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { - g_clear_error (&err); - goto stopped; - } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { - g_clear_error (&err); - goto timeout; - } - g_clear_error (&err); - goto select_error; - } - g_socket_set_timeout (conn->write_socket, 0); - } - return GST_RTSP_OK; + to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; - /* ERRORS */ -timeout: - { - return GST_RTSP_ETIMEOUT; - } -select_error: - { - return GST_RTSP_ESYS; - } -stopped: - { - return GST_RTSP_EINTR; - } -write_error: - { - return res; - } + g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND); + res = + write_bytes (conn->output_stream, data, &offset, size, TRUE, + conn->cancellable); + g_socket_set_timeout (conn->write_socket, 0); + + return res; } static GString * @@ -1618,7 +1586,7 @@ normalize_line (guint8 * buffer) */ static GstRTSPResult build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, - GstRTSPConnection * conn) + GstRTSPConnection * conn, gboolean block) { GstRTSPResult res; @@ -1630,7 +1598,8 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, builder->offset = 0; res = - read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1); + read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1, + block); if (res != GST_RTSP_OK) goto done; @@ -1653,7 +1622,8 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, case STATE_DATA_HEADER: { res = - read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4); + read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4, + block); if (res != GST_RTSP_OK) goto done; @@ -1670,7 +1640,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, { res = read_bytes (conn, builder->body_data, &builder->offset, - builder->body_len); + builder->body_len, block); if (res != GST_RTSP_OK) goto done; @@ -1687,7 +1657,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message, case STATE_READ_LINES: { res = read_line (conn, builder->buffer, &builder->offset, - sizeof (builder->buffer)); + sizeof (builder->buffer), block); if (res != GST_RTSP_OK) goto done; @@ -1838,7 +1808,6 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, guint offset; GstClockTime to; GstRTSPResult res; - GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); @@ -1850,58 +1819,13 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size, offset = 0; /* configure timeout if any */ - to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; + to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; - while (TRUE) { - res = read_bytes (conn, data, &offset, size); - if (G_UNLIKELY (res == GST_RTSP_EEOF)) - goto eof; - if (G_LIKELY (res == GST_RTSP_OK)) - break; - if (G_UNLIKELY (res != GST_RTSP_EINTR)) - goto read_error; + g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND); + res = read_bytes (conn, data, &offset, size, TRUE); + g_socket_set_timeout (conn->read_socket, 0); - g_socket_set_timeout (conn->read_socket, - (to + GST_SECOND - 1) / GST_SECOND); - if (!g_socket_condition_wait (conn->read_socket, - G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable, - &err)) { - g_socket_set_timeout (conn->read_socket, 0); - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) { - g_clear_error (&err); - goto stopped; - } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { - g_clear_error (&err); - goto select_timeout; - } - g_clear_error (&err); - goto select_error; - } - g_socket_set_timeout (conn->read_socket, 0); - } - return GST_RTSP_OK; - - /* ERRORS */ -select_error: - { - return GST_RTSP_ESYS; - } -select_timeout: - { - return GST_RTSP_ETIMEOUT; - } -stopped: - { - return GST_RTSP_EINTR; - } -eof: - { - return GST_RTSP_EEOF; - } -read_error: - { - return res; - } + return res; } static GstRTSPMessage * @@ -1951,7 +1875,7 @@ no_message: * Attempt to read into @message from the connected @conn, blocking up to * the specified @timeout. @timeout can be #NULL, in which case this function * might block forever. - * + * * This function can be cancelled with gst_rtsp_connection_flush(). * * Returns: #GST_RTSP_OK on success. @@ -1963,75 +1887,53 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message, GstRTSPResult res; GstRTSPBuilder builder; GstClockTime to; - GError *err = NULL; g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); /* configure timeout if any */ - to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE; + to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0; + g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND); memset (&builder, 0, sizeof (GstRTSPBuilder)); - while (TRUE) { - res = build_next (&builder, message, conn); - if (G_UNLIKELY (res == GST_RTSP_EEOF)) - goto eof; - else if (G_LIKELY (res == GST_RTSP_OK)) { - if (!conn->manual_http) { - if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { - if (conn->tstate == TUNNEL_STATE_NONE && - message->type_data.request.method == GST_RTSP_GET) { - GstRTSPMessage *response; + res = build_next (&builder, message, conn, TRUE); + g_socket_set_timeout (conn->read_socket, 0); - conn->tstate = TUNNEL_STATE_GET; + if (G_UNLIKELY (res != GST_RTSP_OK)) + goto read_error; - /* tunnel GET request, we can reply now */ - response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message); - res = gst_rtsp_connection_send (conn, response, timeout); - gst_rtsp_message_free (response); - if (res == GST_RTSP_OK) - res = GST_RTSP_ETGET; - goto cleanup; - } else if (conn->tstate == TUNNEL_STATE_NONE && - message->type_data.request.method == GST_RTSP_POST) { - conn->tstate = TUNNEL_STATE_POST; + if (!conn->manual_http) { + if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + if (conn->tstate == TUNNEL_STATE_NONE && + message->type_data.request.method == GST_RTSP_GET) { + GstRTSPMessage *response; - /* tunnel POST request, the caller now has to link the two - * connections. */ - res = GST_RTSP_ETPOST; - goto cleanup; - } else { - res = GST_RTSP_EPARSE; - goto cleanup; - } - } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { - res = GST_RTSP_EPARSE; - goto cleanup; - } + conn->tstate = TUNNEL_STATE_GET; + + /* tunnel GET request, we can reply now */ + response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message); + res = gst_rtsp_connection_send (conn, response, timeout); + gst_rtsp_message_free (response); + if (res == GST_RTSP_OK) + res = GST_RTSP_ETGET; + goto cleanup; + } else if (conn->tstate == TUNNEL_STATE_NONE && + message->type_data.request.method == GST_RTSP_POST) { + conn->tstate = TUNNEL_STATE_POST; + + /* tunnel POST request, the caller now has to link the two + * connections. */ + res = GST_RTSP_ETPOST; + goto cleanup; + } else { + res = GST_RTSP_EPARSE; + goto cleanup; } - - break; - } else if (G_UNLIKELY (res != GST_RTSP_EINTR)) - goto read_error; - - g_socket_set_timeout (conn->read_socket, - (to + GST_SECOND - 1) / GST_SECOND); - if (!g_socket_condition_wait (conn->read_socket, - G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable, - &err)) { - g_socket_set_timeout (conn->read_socket, 0); - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { - g_clear_error (&err); - goto stopped; - } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) { - g_clear_error (&err); - goto select_timeout; - } - g_clear_error (&err); - goto select_error; + } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { + res = GST_RTSP_EPARSE; + goto cleanup; } - g_socket_set_timeout (conn->read_socket, 0); } /* we have a message here */ @@ -2040,26 +1942,6 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message, return GST_RTSP_OK; /* ERRORS */ -select_error: - { - res = GST_RTSP_ESYS; - goto cleanup; - } -select_timeout: - { - res = GST_RTSP_ETIMEOUT; - goto cleanup; - } -stopped: - { - res = GST_RTSP_EINTR; - goto cleanup; - } -eof: - { - res = GST_RTSP_EEOF; - goto cleanup; - } read_error: cleanup: { @@ -2907,7 +2789,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, if (watch->readfd.revents & READ_ERR) goto read_error; - res = build_next (&watch->builder, &watch->message, watch->conn); + res = build_next (&watch->builder, &watch->message, watch->conn, FALSE); if (res == GST_RTSP_EINTR) break; else if (G_UNLIKELY (res == GST_RTSP_EEOF)) { @@ -3022,7 +2904,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, } res = write_bytes (watch->conn->output_stream, watch->write_data, - &watch->write_off, watch->write_size, watch->conn->cancellable); + &watch->write_off, watch->write_size, FALSE, + watch->conn->cancellable); g_mutex_unlock (&watch->mutex); if (res == GST_RTSP_EINTR) @@ -3343,7 +3226,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, if (watch->messages->length == 0 && watch->write_data == NULL) { res = write_bytes (watch->conn->output_stream, data, &off, size, - watch->conn->cancellable); + FALSE, watch->conn->cancellable); if (res != GST_RTSP_EINTR) { if (id != NULL) *id = 0;