rtspconnection: Only reset timeout when socket is unused

After sending or retrieving data, gstrtspconnection resets the socket's
timeout to 0 (infinite). This could cause problems if sending and
receiving at the same time. For example, if RTCP data is sent from the
streaming thread while gstrtspsrc is already retrieving data.

With this patch, timeout is only reset to 0 if there is no other
thread using the socket.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1260>
This commit is contained in:
Tobias Ronge 2021-09-07 13:55:08 +02:00
parent 92338e3d80
commit 3ec9795a28

View file

@ -167,6 +167,9 @@ struct _GstRTSPConnection
GSocket *read_socket; GSocket *read_socket;
GSocket *write_socket; GSocket *write_socket;
GSocket *socket0, *socket1; GSocket *socket0, *socket1;
gboolean read_socket_used;
gboolean write_socket_used;
GMutex socket_use_mutex;
gboolean manual_http; gboolean manual_http;
gboolean may_cancel; gboolean may_cancel;
GCancellable *cancellable; GCancellable *cancellable;
@ -478,6 +481,9 @@ gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
newconn->socket0 = socket; newconn->socket0 = socket;
newconn->stream0 = stream; newconn->stream0 = stream;
newconn->write_socket = newconn->read_socket = newconn->socket0; newconn->write_socket = newconn->read_socket = newconn->socket0;
newconn->read_socket_used = FALSE;
newconn->write_socket_used = FALSE;
g_mutex_init (&newconn->socket_use_mutex);
newconn->input_stream = g_io_stream_get_input_stream (stream); newconn->input_stream = g_io_stream_get_input_stream (stream);
newconn->output_stream = g_io_stream_get_output_stream (stream); newconn->output_stream = g_io_stream_get_output_stream (stream);
newconn->control_stream = NULL; newconn->control_stream = NULL;
@ -1074,6 +1080,8 @@ gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
/* this is our read socket */ /* this is our read socket */
conn->read_socket = conn->socket0; conn->read_socket = conn->socket0;
conn->write_socket = conn->socket0; conn->write_socket = conn->socket0;
conn->read_socket_used = FALSE;
conn->write_socket_used = FALSE;
conn->input_stream = g_io_stream_get_input_stream (conn->stream0); conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
conn->output_stream = g_io_stream_get_output_stream (conn->stream0); conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
conn->control_stream = NULL; conn->control_stream = NULL;
@ -1632,6 +1640,74 @@ read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
return GST_RTSP_OK; return GST_RTSP_OK;
} }
static void
set_read_socket_timeout (GstRTSPConnection * conn, gint64 timeout)
{
GstClockTime to_nsecs;
guint to_secs;
g_mutex_lock (&conn->socket_use_mutex);
g_assert (!conn->read_socket_used);
conn->read_socket_used = TRUE;
to_nsecs = timeout * 1000;
to_secs = (to_nsecs + GST_SECOND - 1) / GST_SECOND;
if (to_secs > g_socket_get_timeout (conn->read_socket)) {
g_socket_set_timeout (conn->read_socket, to_secs);
}
g_mutex_unlock (&conn->socket_use_mutex);
}
static void
set_write_socket_timeout (GstRTSPConnection * conn, gint64 timeout)
{
GstClockTime to_nsecs;
guint to_secs;
g_mutex_lock (&conn->socket_use_mutex);
g_assert (!conn->write_socket_used);
conn->write_socket_used = TRUE;
to_nsecs = timeout * 1000;
to_secs = (to_nsecs + GST_SECOND - 1) / GST_SECOND;
if (to_secs > g_socket_get_timeout (conn->write_socket)) {
g_socket_set_timeout (conn->write_socket, to_secs);
}
g_mutex_unlock (&conn->socket_use_mutex);
}
static void
clear_read_socket_timeout (GstRTSPConnection * conn)
{
g_mutex_lock (&conn->socket_use_mutex);
conn->read_socket_used = FALSE;
if (conn->read_socket != conn->write_socket || !conn->write_socket_used) {
g_socket_set_timeout (conn->read_socket, 0);
}
g_mutex_unlock (&conn->socket_use_mutex);
}
static void
clear_write_socket_timeout (GstRTSPConnection * conn)
{
g_mutex_lock (&conn->socket_use_mutex);
conn->write_socket_used = FALSE;
if (conn->write_socket != conn->read_socket || !conn->read_socket_used) {
g_socket_set_timeout (conn->write_socket, 0);
}
g_mutex_unlock (&conn->socket_use_mutex);
}
/** /**
* gst_rtsp_connection_write_usec: * gst_rtsp_connection_write_usec:
* @conn: a #GstRTSPConnection * @conn: a #GstRTSPConnection
@ -1655,7 +1731,6 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
guint size, gint64 timeout) guint size, gint64 timeout)
{ {
guint offset; guint offset;
GstClockTime to;
GstRTSPResult res; GstRTSPResult res;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
@ -1664,13 +1739,13 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
offset = 0; offset = 0;
to = timeout * 1000; set_write_socket_timeout (conn, timeout);
g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
res = res =
write_bytes (conn->output_stream, data, &offset, size, TRUE, write_bytes (conn->output_stream, data, &offset, size, TRUE,
conn->cancellable); conn->cancellable);
g_socket_set_timeout (conn->write_socket, 0);
clear_write_socket_timeout (conn);
return res; return res;
} }
@ -1855,7 +1930,6 @@ GstRTSPResult
gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn, gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
GstRTSPMessage * messages, guint n_messages, gint64 timeout) GstRTSPMessage * messages, guint n_messages, gint64 timeout)
{ {
GstClockTime to;
GstRTSPResult res; GstRTSPResult res;
GstRTSPSerializedMessage *serialized_messages; GstRTSPSerializedMessage *serialized_messages;
GOutputVector *vectors; GOutputVector *vectors;
@ -1977,13 +2051,13 @@ gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
} }
/* write request: this is synchronous */ /* write request: this is synchronous */
to = timeout * 1000; set_write_socket_timeout (conn, timeout);
g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
res = res =
writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written, writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
TRUE, conn->cancellable); TRUE, conn->cancellable);
g_socket_set_timeout (conn->write_socket, 0);
clear_write_socket_timeout (conn);
g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK); g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
@ -2630,7 +2704,6 @@ gst_rtsp_connection_read_usec (GstRTSPConnection * conn, guint8 * data,
guint size, gint64 timeout) guint size, gint64 timeout)
{ {
guint offset; guint offset;
GstClockTime to;
GstRTSPResult res; GstRTSPResult res;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
@ -2643,11 +2716,11 @@ gst_rtsp_connection_read_usec (GstRTSPConnection * conn, guint8 * data,
offset = 0; offset = 0;
/* configure timeout if any */ /* configure timeout if any */
to = timeout * 1000; set_read_socket_timeout (conn, timeout);
g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
res = read_bytes (conn, data, &offset, size, TRUE); res = read_bytes (conn, data, &offset, size, TRUE);
g_socket_set_timeout (conn->read_socket, 0);
clear_read_socket_timeout (conn);
return res; return res;
} }
@ -2712,19 +2785,18 @@ gst_rtsp_connection_receive_usec (GstRTSPConnection * conn,
{ {
GstRTSPResult res; GstRTSPResult res;
GstRTSPBuilder builder; GstRTSPBuilder builder;
GstClockTime to;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
/* configure timeout if any */ /* configure timeout if any */
to = timeout * 1000; set_read_socket_timeout (conn, timeout);
g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
memset (&builder, 0, sizeof (GstRTSPBuilder)); memset (&builder, 0, sizeof (GstRTSPBuilder));
res = build_next (&builder, message, conn, TRUE); res = build_next (&builder, message, conn, TRUE);
g_socket_set_timeout (conn->read_socket, 0);
clear_read_socket_timeout (conn);
if (G_UNLIKELY (res != GST_RTSP_OK)) if (G_UNLIKELY (res != GST_RTSP_OK))
goto read_error; goto read_error;
@ -2822,6 +2894,8 @@ gst_rtsp_connection_close (GstRTSPConnection * conn)
conn->write_socket = NULL; conn->write_socket = NULL;
conn->read_socket = NULL; conn->read_socket = NULL;
conn->write_socket_used = FALSE;
conn->read_socket_used = FALSE;
conn->tunneled = FALSE; conn->tunneled = FALSE;
conn->tstate = TUNNEL_STATE_NONE; conn->tstate = TUNNEL_STATE_NONE;
conn->ctxp = NULL; conn->ctxp = NULL;