rtsp: don't use sockets for blocking

Use the blocking and non-blocking API of the input/output streams instead
of polling the sockets directly. This also allows us to simplify some
code.
This commit is contained in:
Wim Taymans 2013-05-29 15:27:37 +02:00
parent 909e119a23
commit 4f660c388c

View file

@ -293,9 +293,6 @@ gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
/* set to non-blocking mode so that we can cancel the communication */
g_socket_set_blocking (socket, FALSE);
if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err))
goto getnameinfo_failed;
@ -479,7 +476,6 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri)
goto connect_failed;
socket = g_socket_connection_get_socket (connection);
g_socket_set_blocking (socket, FALSE);
/* get remote address */
g_free (conn->remote_ip);
@ -616,7 +612,6 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
/* get remote address */
socket = g_socket_connection_get_socket (connection);
g_socket_set_blocking (socket, FALSE);
if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error))
goto remote_address_failed;
@ -813,9 +808,11 @@ gen_date_string (gchar * date_string, guint len)
static GstRTSPResult
write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
guint size, GCancellable * cancellable)
guint size, gboolean block, GCancellable * cancellable)
{
guint left;
gssize r;
GError *err = NULL;
if (G_UNLIKELY (*idx > size))
return GST_RTSP_ERROR;
@ -823,31 +820,41 @@ write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
left = size - *idx;
while (left) {
GError *err = NULL;
gssize r;
r = g_pollable_stream_write (stream, (gchar *) & buffer[*idx], left,
block, cancellable, &err);
if (G_UNLIKELY (r < 0))
goto error;
r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left,
cancellable, &err);
if (G_UNLIKELY (r == 0)) {
return GST_RTSP_EINTR;
} else if (G_UNLIKELY (r < 0)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&err);
return GST_RTSP_EINTR;
}
g_clear_error (&err);
return GST_RTSP_ESYS;
} else {
left -= r;
*idx += r;
}
}
return GST_RTSP_OK;
/* ERRORS */
error:
{
if (G_UNLIKELY (r == 0))
return GST_RTSP_EEOF;
GST_DEBUG ("%s", err->message);
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
g_clear_error (&err);
return GST_RTSP_EINTR;
} else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&err);
return GST_RTSP_EINTR;
} else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
g_clear_error (&err);
return GST_RTSP_ETIMEOUT;
}
g_clear_error (&err);
return GST_RTSP_ESYS;
}
}
static gint
fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
GError ** err)
gboolean block, GError ** err)
{
gint out = 0;
@ -868,11 +875,17 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
if (G_LIKELY (size > (guint) out)) {
gssize r;
r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
size - out, conn->cancellable, err);
if (r <= 0) {
if (out == 0)
r = g_pollable_stream_read (conn->input_stream,
(gchar *) & buffer[out], size - out, block, conn->cancellable, err);
if (G_UNLIKELY (r < 0)) {
if (out == 0) {
/* propagate the error */
out = r;
} else {
/* we have some data ignore error */
g_clear_error (err);
}
} else
out += r;
}
@ -882,7 +895,7 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
static gint
fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
GError ** err)
gboolean block, GError ** err)
{
DecodeCtx *ctx = conn->ctxp;
gint out = 0;
@ -904,7 +917,7 @@ fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
break;
/* try to read more bytes */
r = fill_raw_bytes (conn, in, sizeof (in), err);
r = fill_raw_bytes (conn, in, sizeof (in), block, err);
if (r <= 0) {
if (out == 0)
out = r;
@ -917,16 +930,18 @@ fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
&ctx->save);
}
} else {
out = fill_raw_bytes (conn, buffer, size, err);
out = fill_raw_bytes (conn, buffer, size, block, err);
}
return out;
}
static GstRTSPResult
read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
gboolean block)
{
guint left;
gint r;
GError *err = NULL;
if (G_UNLIKELY (*idx > size))
@ -935,24 +950,35 @@ read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
left = size - *idx;
while (left) {
gint r;
r = fill_bytes (conn, &buffer[*idx], left, block, &err);
if (G_UNLIKELY (r <= 0))
goto error;
r = fill_bytes (conn, &buffer[*idx], left, &err);
if (G_UNLIKELY (r == 0)) {
return GST_RTSP_EEOF;
} else if (G_UNLIKELY (r < 0)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&err);
return GST_RTSP_EINTR;
}
g_clear_error (&err);
return GST_RTSP_ESYS;
} else {
left -= r;
*idx += r;
}
}
return GST_RTSP_OK;
/* ERRORS */
error:
{
if (G_UNLIKELY (r == 0))
return GST_RTSP_EEOF;
GST_DEBUG ("%s", err->message);
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
g_clear_error (&err);
return GST_RTSP_EINTR;
} else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&err);
return GST_RTSP_EINTR;
} else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
g_clear_error (&err);
return GST_RTSP_ETIMEOUT;
}
g_clear_error (&err);
return GST_RTSP_ESYS;
}
}
/* The code below tries to handle clients using \r, \n or \r\n to indicate the
@ -962,13 +988,14 @@ read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
* the method used in RTSP (and HTTP) to break long lines.
*/
static GstRTSPResult
read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
gboolean block)
{
GError *err = NULL;
GstRTSPResult res;
while (TRUE) {
guint8 c;
gint r;
guint i;
if (conn->read_ahead == READ_AHEAD_EOH) {
/* the last call to read_line() already determined that we have reached
@ -987,18 +1014,10 @@ read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
conn->read_ahead = 0;
} else {
/* read the next character */
r = fill_bytes (conn, &c, 1, &err);
if (G_UNLIKELY (r == 0)) {
return GST_RTSP_EEOF;
} else if (G_UNLIKELY (r < 0)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&err);
return GST_RTSP_EINTR;
}
g_clear_error (&err);
return GST_RTSP_ESYS;
}
i = 0;
res = read_bytes (conn, &c, &i, 1, block);
if (G_UNLIKELY (res != GST_RTSP_OK))
return res;
}
/* special treatment of line endings */
@ -1007,21 +1026,10 @@ read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
retry:
/* need to read ahead one more character to know what to do... */
r = fill_bytes (conn, &read_ahead, 1, &err);
if (G_UNLIKELY (r == 0)) {
return GST_RTSP_EEOF;
} else if (G_UNLIKELY (r < 0)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
/* remember the original character we read and try again next time */
if (conn->read_ahead == 0)
conn->read_ahead = c;
g_clear_error (&err);
return GST_RTSP_EINTR;
}
g_clear_error (&err);
return GST_RTSP_ESYS;
}
i = 0;
res = read_bytes (conn, &read_ahead, &i, 1, block);
if (G_UNLIKELY (res != GST_RTSP_OK))
return res;
if (read_ahead == ' ' || read_ahead == '\t') {
if (conn->read_ahead == READ_AHEAD_CRLFCR) {
@ -1106,63 +1114,23 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
guint offset;
GstClockTime to;
GstRTSPResult res;
GError *err = NULL;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL);
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
offset = 0;
while (TRUE) {
/* try to write */
res =
write_bytes (conn->output_stream, data, &offset, size,
conn->cancellable);
if (G_LIKELY (res == GST_RTSP_OK))
break;
if (G_UNLIKELY (res != GST_RTSP_EINTR))
goto write_error;
/* not all is written, wait until we can write more */
g_socket_set_timeout (conn->write_socket,
(to + GST_SECOND - 1) / GST_SECOND);
if (!g_socket_condition_wait (conn->write_socket,
G_IO_OUT | G_IO_ERR | G_IO_HUP, conn->cancellable, &err)) {
g_socket_set_timeout (conn->write_socket, 0);
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) {
g_clear_error (&err);
goto stopped;
} else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
g_clear_error (&err);
goto timeout;
}
g_clear_error (&err);
goto select_error;
}
g_socket_set_timeout (conn->write_socket, 0);
}
return GST_RTSP_OK;
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
res =
write_bytes (conn->output_stream, data, &offset, size, TRUE,
conn->cancellable);
g_socket_set_timeout (conn->write_socket, 0);
/* ERRORS */
timeout:
{
return GST_RTSP_ETIMEOUT;
}
select_error:
{
return GST_RTSP_ESYS;
}
stopped:
{
return GST_RTSP_EINTR;
}
write_error:
{
return res;
}
}
static GString *
message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
@ -1618,7 +1586,7 @@ normalize_line (guint8 * buffer)
*/
static GstRTSPResult
build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
GstRTSPConnection * conn)
GstRTSPConnection * conn, gboolean block)
{
GstRTSPResult res;
@ -1630,7 +1598,8 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
builder->offset = 0;
res =
read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1);
read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1,
block);
if (res != GST_RTSP_OK)
goto done;
@ -1653,7 +1622,8 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
case STATE_DATA_HEADER:
{
res =
read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4);
read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4,
block);
if (res != GST_RTSP_OK)
goto done;
@ -1670,7 +1640,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
{
res =
read_bytes (conn, builder->body_data, &builder->offset,
builder->body_len);
builder->body_len, block);
if (res != GST_RTSP_OK)
goto done;
@ -1687,7 +1657,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
case STATE_READ_LINES:
{
res = read_line (conn, builder->buffer, &builder->offset,
sizeof (builder->buffer));
sizeof (builder->buffer), block);
if (res != GST_RTSP_OK)
goto done;
@ -1838,7 +1808,6 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
guint offset;
GstClockTime to;
GstRTSPResult res;
GError *err = NULL;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@ -1850,59 +1819,14 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
offset = 0;
/* configure timeout if any */
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
while (TRUE) {
res = read_bytes (conn, data, &offset, size);
if (G_UNLIKELY (res == GST_RTSP_EEOF))
goto eof;
if (G_LIKELY (res == GST_RTSP_OK))
break;
if (G_UNLIKELY (res != GST_RTSP_EINTR))
goto read_error;
g_socket_set_timeout (conn->read_socket,
(to + GST_SECOND - 1) / GST_SECOND);
if (!g_socket_condition_wait (conn->read_socket,
G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable,
&err)) {
g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
res = read_bytes (conn, data, &offset, size, TRUE);
g_socket_set_timeout (conn->read_socket, 0);
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) {
g_clear_error (&err);
goto stopped;
} else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
g_clear_error (&err);
goto select_timeout;
}
g_clear_error (&err);
goto select_error;
}
g_socket_set_timeout (conn->read_socket, 0);
}
return GST_RTSP_OK;
/* ERRORS */
select_error:
{
return GST_RTSP_ESYS;
}
select_timeout:
{
return GST_RTSP_ETIMEOUT;
}
stopped:
{
return GST_RTSP_EINTR;
}
eof:
{
return GST_RTSP_EEOF;
}
read_error:
{
return res;
}
}
static GstRTSPMessage *
gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code,
@ -1963,21 +1887,22 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
GstRTSPResult res;
GstRTSPBuilder builder;
GstClockTime to;
GError *err = NULL;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
/* configure timeout if any */
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
memset (&builder, 0, sizeof (GstRTSPBuilder));
while (TRUE) {
res = build_next (&builder, message, conn);
if (G_UNLIKELY (res == GST_RTSP_EEOF))
goto eof;
else if (G_LIKELY (res == GST_RTSP_OK)) {
res = build_next (&builder, message, conn, TRUE);
g_socket_set_timeout (conn->read_socket, 0);
if (G_UNLIKELY (res != GST_RTSP_OK))
goto read_error;
if (!conn->manual_http) {
if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
if (conn->tstate == TUNNEL_STATE_NONE &&
@ -2011,55 +1936,12 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
}
}
break;
} else if (G_UNLIKELY (res != GST_RTSP_EINTR))
goto read_error;
g_socket_set_timeout (conn->read_socket,
(to + GST_SECOND - 1) / GST_SECOND);
if (!g_socket_condition_wait (conn->read_socket,
G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable,
&err)) {
g_socket_set_timeout (conn->read_socket, 0);
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
g_clear_error (&err);
goto stopped;
} else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
g_clear_error (&err);
goto select_timeout;
}
g_clear_error (&err);
goto select_error;
}
g_socket_set_timeout (conn->read_socket, 0);
}
/* we have a message here */
build_reset (&builder);
return GST_RTSP_OK;
/* ERRORS */
select_error:
{
res = GST_RTSP_ESYS;
goto cleanup;
}
select_timeout:
{
res = GST_RTSP_ETIMEOUT;
goto cleanup;
}
stopped:
{
res = GST_RTSP_EINTR;
goto cleanup;
}
eof:
{
res = GST_RTSP_EEOF;
goto cleanup;
}
read_error:
cleanup:
{
@ -2907,7 +2789,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
if (watch->readfd.revents & READ_ERR)
goto read_error;
res = build_next (&watch->builder, &watch->message, watch->conn);
res = build_next (&watch->builder, &watch->message, watch->conn, FALSE);
if (res == GST_RTSP_EINTR)
break;
else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
@ -3022,7 +2904,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
}
res = write_bytes (watch->conn->output_stream, watch->write_data,
&watch->write_off, watch->write_size, watch->conn->cancellable);
&watch->write_off, watch->write_size, FALSE,
watch->conn->cancellable);
g_mutex_unlock (&watch->mutex);
if (res == GST_RTSP_EINTR)
@ -3343,7 +3226,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
if (watch->messages->length == 0 && watch->write_data == NULL) {
res =
write_bytes (watch->conn->output_stream, data, &off, size,
watch->conn->cancellable);
FALSE, watch->conn->cancellable);
if (res != GST_RTSP_EINTR) {
if (id != NULL)
*id = 0;