mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-23 15:48:23 +00:00
rtspconnection: protect cancellable by a mutex
It is entirely possible for the cancellable to be cancelled (and freed) in gst_rtsp_connection_flush() while there may be an ongoing read/write operation. Nothing prevents gst_rtsp_connection_flush() from waiting for the outstanding read/writes. This could lead to a crash like (where cancellable has been freed within gst_rtsp_connection_flush()): #0 0x00007ffff4351096 in g_output_stream_writev (stream=stream@entry=0x7fff30002950, vectors=vectors@entry=0x7ffe2c6afa80, n_vectors=n_vectors@entry=3, bytes_written=bytes_written@entry=0x7ffe2c6af950, cancellable=cancellable@entry=0x7fff300288a0, error=error@entry=0x7ffe2c6af958) at ../subprojects/glib/gio/goutputstream.c:377 #1 0x00007ffff44b2c38 in writev_bytes (stream=0x7fff30002950, vectors=vectors@entry=0x7ffe2c6afa80, n_vectors=n_vectors@entry=3, bytes_written=bytes_written@entry=0x7ffe2c6afb90, block=block@entry=1, cancellable=0x7fff300288a0) at ../subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c:1320 #2 0x00007ffff44b583e in gst_rtsp_connection_send_messages_usec (conn=0x7fff30001370, messages=messages@entry=0x7ffe2c6afcc0, n_messages=n_messages@entry=1, timeout=timeout@entry=3000000) at ../subprojects/gst-plugins-base/gst-libs/gst/rtsp/gstrtspconnection.c:2056 #3 0x00007ffff44d2669 in gst_rtsp_client_sink_connection_send_messages (sink=0x7fffac0192c0, timeout=3000000, n_messages=1, messages=0x7ffe2c6afcc0, conninfo=0x7fffac019610) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:1929 #4 gst_rtsp_client_sink_try_send (sink=sink@entry=0x7fffac0192c0, conninfo=conninfo@entry=0x7fffac019610, requests=requests@entry=0x7ffe2c6afcc0, n_requests=n_requests@entry=1, response=response@entry=0x0, code=code@entry=0x0) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:2845 #5 0x00007ffff44d3077 in do_send_data (buffer=0x7fff38075c60, channel=<optimized out>, context=0x7fffac042640) at ../subprojects/gst-rtsp-server/gst/rtsp-sink/gstrtspclientsink.c:3896 #6 0x00007ffff4281cc6 in gst_rtsp_stream_transport_send_rtp (trans=trans@entry=0x7fff20061f80, buffer=<optimized out>) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream-transport.c:632 #7 0x00007ffff4278e9b in push_data (stream=0x7fff40019bf0, is_rtp=<optimized out>, buffer_list=0x0, buffer=<optimized out>, trans=0x7fff20061f80) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2586 #8 check_transport_backlog (stream=0x7fff40019bf0, trans=0x7fff20061f80) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2645 #9 0x00007ffff42793b3 in send_tcp_message (idx=<optimized out>, stream=0x7fff40019bf0) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2741 #10 send_func (stream=0x7fff40019bf0) at ../subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream.c:2776 #11 0x00007ffff7d59fad in g_thread_proxy (data=0x7fffbc062920) at ../subprojects/glib/glib/gthread.c:827 #12 0x00007ffff7a8ce2d in start_thread () from /lib64/libc.so.6 #13 0x00007ffff7b12620 in clone3 () from /lib64/libc.so.6 Fix by adding a cancellable lock and returning an extra reference used across all read/write operations. gst_rtsp_connection_flush() can free the in-use cancellable and it will no longer affect any in progress read/write. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2816>
This commit is contained in:
parent
3907c8d72f
commit
91d0a48eea
1 changed files with 79 additions and 18 deletions
|
@ -172,7 +172,8 @@ struct _GstRTSPConnection
|
|||
GMutex socket_use_mutex;
|
||||
gboolean manual_http;
|
||||
gboolean may_cancel;
|
||||
GCancellable *cancellable;
|
||||
GMutex cancellable_mutex;
|
||||
GCancellable *cancellable; /* protected by cancellable_mutex */
|
||||
|
||||
gchar tunnelid[TUNNELID_LEN];
|
||||
gboolean tunneled;
|
||||
|
@ -352,6 +353,20 @@ socket_client_event (GSocketClient * client, GSocketClientEvent event,
|
|||
}
|
||||
}
|
||||
|
||||
/* transfer full */
|
||||
static GCancellable *
|
||||
get_cancellable (GstRTSPConnection * conn)
|
||||
{
|
||||
GCancellable *cancellable = NULL;
|
||||
|
||||
g_mutex_lock (&conn->cancellable_mutex);
|
||||
if (conn->cancellable)
|
||||
cancellable = g_object_ref (conn->cancellable);
|
||||
g_mutex_unlock (&conn->cancellable_mutex);
|
||||
|
||||
return cancellable;
|
||||
}
|
||||
|
||||
/**
|
||||
* gst_rtsp_connection_create:
|
||||
* @url: a #GstRTSPUrl
|
||||
|
@ -377,6 +392,7 @@ gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
|
|||
|
||||
newconn->may_cancel = TRUE;
|
||||
newconn->cancellable = g_cancellable_new ();
|
||||
g_mutex_init (&newconn->cancellable_mutex);
|
||||
newconn->client = g_socket_client_new ();
|
||||
|
||||
if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
|
||||
|
@ -839,6 +855,7 @@ setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,
|
|||
gchar *connection_uri = NULL;
|
||||
gchar *request_uri = NULL;
|
||||
gchar *host = NULL;
|
||||
GCancellable *cancellable;
|
||||
|
||||
url = conn->url;
|
||||
|
||||
|
@ -896,18 +913,23 @@ setup_tunneling (GstRTSPConnection * conn, gint64 timeout, gchar * uri,
|
|||
|
||||
connection_uri = get_tunneled_connection_uri_strdup (url, url_port);
|
||||
|
||||
cancellable = get_cancellable (conn);
|
||||
|
||||
/* connect to the host/port */
|
||||
if (conn->proxy_host) {
|
||||
connection = g_socket_client_connect_to_host (conn->client,
|
||||
conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
|
||||
conn->proxy_host, conn->proxy_port, cancellable, &error);
|
||||
request_uri = g_strdup (connection_uri);
|
||||
} else {
|
||||
connection = g_socket_client_connect_to_uri (conn->client,
|
||||
connection_uri, 0, conn->cancellable, &error);
|
||||
connection_uri, 0, cancellable, &error);
|
||||
request_uri =
|
||||
g_strdup_printf ("%s%s%s", url->abspath,
|
||||
url->query ? "?" : "", url->query ? url->query : "");
|
||||
}
|
||||
|
||||
g_clear_object (&cancellable);
|
||||
|
||||
if (connection == NULL)
|
||||
goto connect_failed;
|
||||
|
||||
|
@ -1033,6 +1055,7 @@ gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
|
|||
GstClockTime to;
|
||||
guint16 url_port;
|
||||
GstRTSPUrl *url;
|
||||
GCancellable *cancellable;
|
||||
|
||||
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
|
||||
g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
|
||||
|
@ -1052,18 +1075,23 @@ gst_rtsp_connection_connect_with_response_usec (GstRTSPConnection * conn,
|
|||
connection_uri = gst_rtsp_url_get_request_uri (url);
|
||||
}
|
||||
|
||||
cancellable = get_cancellable (conn);
|
||||
|
||||
if (conn->proxy_host) {
|
||||
connection = g_socket_client_connect_to_host (conn->client,
|
||||
conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
|
||||
conn->proxy_host, conn->proxy_port, cancellable, &error);
|
||||
request_uri = g_strdup (connection_uri);
|
||||
} else {
|
||||
connection = g_socket_client_connect_to_uri (conn->client,
|
||||
connection_uri, url_port, conn->cancellable, &error);
|
||||
connection_uri, url_port, cancellable, &error);
|
||||
|
||||
/* use the relative component of the uri for non-proxy connections */
|
||||
request_uri = g_strdup_printf ("%s%s%s", url->abspath,
|
||||
url->query ? "?" : "", url->query ? url->query : "");
|
||||
}
|
||||
|
||||
g_clear_object (&cancellable);
|
||||
|
||||
if (connection == NULL)
|
||||
goto connect_failed;
|
||||
|
||||
|
@ -1289,6 +1317,8 @@ write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
|
|||
/* ERRORS */
|
||||
error:
|
||||
{
|
||||
g_object_unref (cancellable);
|
||||
|
||||
if (G_UNLIKELY (r == 0))
|
||||
return GST_RTSP_EEOF;
|
||||
|
||||
|
@ -1422,13 +1452,19 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
|
|||
if (G_LIKELY (size > (guint) out)) {
|
||||
gssize r;
|
||||
gsize count = size - out;
|
||||
GCancellable *cancellable;
|
||||
|
||||
cancellable = conn->may_cancel ? get_cancellable (conn) : NULL;
|
||||
|
||||
if (block)
|
||||
r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
|
||||
count, conn->may_cancel ? conn->cancellable : NULL, err);
|
||||
count, cancellable, err);
|
||||
else
|
||||
r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
|
||||
(conn->input_stream), (gchar *) & buffer[out], count,
|
||||
conn->may_cancel ? conn->cancellable : NULL, err);
|
||||
cancellable, err);
|
||||
|
||||
g_clear_object (&cancellable);
|
||||
|
||||
if (G_UNLIKELY (r < 0)) {
|
||||
if (out == 0) {
|
||||
|
@ -1732,6 +1768,7 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
|
|||
{
|
||||
guint offset;
|
||||
GstRTSPResult res;
|
||||
GCancellable *cancellable;
|
||||
|
||||
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
|
||||
g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
|
||||
|
@ -1741,9 +1778,10 @@ gst_rtsp_connection_write_usec (GstRTSPConnection * conn, const guint8 * data,
|
|||
|
||||
set_write_socket_timeout (conn, timeout);
|
||||
|
||||
cancellable = get_cancellable (conn);
|
||||
res =
|
||||
write_bytes (conn->output_stream, data, &offset, size, TRUE,
|
||||
conn->cancellable);
|
||||
write_bytes (conn->output_stream, data, &offset, size, TRUE, cancellable);
|
||||
g_clear_object (&cancellable);
|
||||
|
||||
clear_write_socket_timeout (conn);
|
||||
|
||||
|
@ -1937,6 +1975,7 @@ gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
|
|||
guint n_vectors, n_memories;
|
||||
gint i, j, k;
|
||||
gsize bytes_to_write, bytes_written;
|
||||
GCancellable *cancellable;
|
||||
|
||||
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
|
||||
g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
|
||||
|
@ -2053,9 +2092,11 @@ gst_rtsp_connection_send_messages_usec (GstRTSPConnection * conn,
|
|||
/* write request: this is synchronous */
|
||||
set_write_socket_timeout (conn, timeout);
|
||||
|
||||
cancellable = get_cancellable (conn);
|
||||
res =
|
||||
writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
|
||||
TRUE, conn->cancellable);
|
||||
TRUE, cancellable);
|
||||
g_clear_object (&cancellable);
|
||||
|
||||
clear_write_socket_timeout (conn);
|
||||
|
||||
|
@ -2928,8 +2969,10 @@ gst_rtsp_connection_free (GstRTSPConnection * conn)
|
|||
|
||||
res = gst_rtsp_connection_close (conn);
|
||||
|
||||
if (conn->cancellable)
|
||||
g_object_unref (conn->cancellable);
|
||||
g_mutex_lock (&conn->cancellable_mutex);
|
||||
g_clear_object (&conn->cancellable);
|
||||
g_mutex_unlock (&conn->cancellable_mutex);
|
||||
g_mutex_clear (&conn->cancellable_mutex);
|
||||
if (conn->client)
|
||||
g_object_unref (conn->client);
|
||||
if (conn->tls_database)
|
||||
|
@ -2975,6 +3018,7 @@ gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
|
|||
GMainContext *ctx;
|
||||
GSource *rs, *ws, *ts;
|
||||
GIOCondition condition;
|
||||
GCancellable *cancellable;
|
||||
|
||||
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
|
||||
g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
|
||||
|
@ -2992,21 +3036,22 @@ gst_rtsp_connection_poll_usec (GstRTSPConnection * conn, GstRTSPEvent events,
|
|||
g_source_unref (ts);
|
||||
}
|
||||
|
||||
cancellable = get_cancellable (conn);
|
||||
if (events & GST_RTSP_EV_READ) {
|
||||
rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
|
||||
conn->cancellable);
|
||||
cancellable);
|
||||
g_source_set_dummy_callback (rs);
|
||||
g_source_attach (rs, ctx);
|
||||
g_source_unref (rs);
|
||||
}
|
||||
|
||||
if (events & GST_RTSP_EV_WRITE) {
|
||||
ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
|
||||
conn->cancellable);
|
||||
ws = g_socket_create_source (conn->write_socket, G_IO_OUT, cancellable);
|
||||
g_source_set_dummy_callback (ws);
|
||||
g_source_attach (ws, ctx);
|
||||
g_source_unref (ws);
|
||||
}
|
||||
g_clear_object (&cancellable);
|
||||
|
||||
/* Returns after handling all pending events */
|
||||
while (!g_main_context_iteration (ctx, TRUE));
|
||||
|
@ -3115,10 +3160,14 @@ gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
|
|||
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
|
||||
|
||||
if (flush) {
|
||||
g_cancellable_cancel (conn->cancellable);
|
||||
GCancellable *cancellable = get_cancellable (conn);
|
||||
g_cancellable_cancel (cancellable);
|
||||
g_clear_object (&cancellable);
|
||||
} else {
|
||||
g_mutex_lock (&conn->cancellable_mutex);
|
||||
g_object_unref (conn->cancellable);
|
||||
conn->cancellable = g_cancellable_new ();
|
||||
g_mutex_unlock (&conn->cancellable_mutex);
|
||||
}
|
||||
|
||||
return GST_RTSP_OK;
|
||||
|
@ -3632,6 +3681,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
|
|||
}
|
||||
|
||||
/* clean up some of the state of conn2 */
|
||||
g_mutex_lock (&conn2->cancellable_mutex);
|
||||
g_cancellable_cancel (conn2->cancellable);
|
||||
conn2->write_socket = conn2->read_socket = NULL;
|
||||
conn2->socket0 = NULL;
|
||||
|
@ -3642,6 +3692,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
|
|||
conn2->control_stream = NULL;
|
||||
g_object_unref (conn2->cancellable);
|
||||
conn2->cancellable = NULL;
|
||||
g_mutex_unlock (&conn2->cancellable_mutex);
|
||||
|
||||
/* We make socket0 the write socket and socket1 the read socket. */
|
||||
conn->write_socket = conn->socket0;
|
||||
|
@ -4000,6 +4051,7 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
|
|||
guint n_vectors, n_memories, n_ids, drop_messages;
|
||||
gint i, j, l, n_mmap;
|
||||
GstRTSPSerializedMessage *msg;
|
||||
GCancellable *cancellable;
|
||||
|
||||
/* if this connection was already closed, stop now */
|
||||
if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
|
||||
|
@ -4125,11 +4177,15 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
|
|||
}
|
||||
}
|
||||
|
||||
cancellable = get_cancellable (watch->conn);
|
||||
|
||||
res =
|
||||
writev_bytes (watch->conn->output_stream, vectors, n_vectors,
|
||||
&bytes_written, FALSE, watch->conn->cancellable);
|
||||
&bytes_written, FALSE, cancellable);
|
||||
g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
|
||||
|
||||
g_clear_object (&cancellable);
|
||||
|
||||
/* First unmap all memories here, this simplifies the code below
|
||||
* as we don't have to skip all memories that were already written
|
||||
* before */
|
||||
|
@ -4505,6 +4561,7 @@ gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
|
|||
{
|
||||
GstRTSPResult res;
|
||||
GMainContext *context = NULL;
|
||||
GCancellable *cancellable;
|
||||
gint i;
|
||||
|
||||
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
|
||||
|
@ -4565,11 +4622,15 @@ gst_rtsp_watch_write_serialized_messages (GstRTSPWatch * watch,
|
|||
}
|
||||
}
|
||||
|
||||
cancellable = get_cancellable (watch->conn);
|
||||
|
||||
res =
|
||||
writev_bytes (watch->conn->output_stream, vectors, n_vectors,
|
||||
&bytes_written, FALSE, watch->conn->cancellable);
|
||||
&bytes_written, FALSE, cancellable);
|
||||
g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
|
||||
|
||||
g_clear_object (&cancellable);
|
||||
|
||||
/* At this point we sent everything we could without blocking or
|
||||
* error and updated the offsets inside the message accordingly */
|
||||
|
||||
|
|
Loading…
Reference in a new issue