From 0b1b6670c8b73ad0311f90a6387bdd372cea5896 Mon Sep 17 00:00:00 2001 From: Adam x Nilsson Date: Mon, 7 Oct 2019 12:13:47 +0200 Subject: [PATCH] rtsp-stream : fix race condition in send_tcp_message If one thread is inside the send_tcp_message function and are done sending rtp or rtcp messages so the n_outstanding variable is zero however have not exit the loop sending the messages. While sending its messages, transports have been added or removed to the transport list, so the cache should be updated. If now an additional thread comes to the function send_tcp_message and trying to send rtp messages it will first destroy the rtp cache that is still being iterated trough by the first thread. Fixes #81 --- gst/rtsp-server/rtsp-stream.c | 108 ++++++++++++---------------------- 1 file changed, 38 insertions(+), 70 deletions(-) diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 4a5efcf77b..e13bfbd04e 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -163,10 +163,8 @@ struct _GstRTSPStreamPrivate guint n_active; GList *transports; guint transports_cookie; - GList *tr_cache_rtp; - GList *tr_cache_rtcp; - guint tr_cache_cookie_rtp; - guint tr_cache_cookie_rtcp; + GPtrArray *tr_cache; + guint tr_cache_cookie; guint n_tcp_transports; gboolean have_buffer[2]; guint n_outstanding; @@ -2445,17 +2443,11 @@ on_sender_ssrc_active (GObject * session, GObject * source, } static void -clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp) +clear_tr_cache (GstRTSPStreamPrivate * priv) { - if (is_rtp) { - g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL); - g_list_free (priv->tr_cache_rtp); - priv->tr_cache_rtp = NULL; - } else { - g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL); - g_list_free (priv->tr_cache_rtcp); - priv->tr_cache_rtcp = NULL; - } + if (priv->tr_cache) + g_ptr_array_unref (priv->tr_cache); + priv->tr_cache = NULL; } /* Must be called with priv->lock */ @@ -2470,6 +2462,7 @@ send_tcp_message (GstRTSPStream * stream, gint idx) GstBufferList *buffer_list; guint n_messages = 0; gboolean is_rtp; + GPtrArray *transports; if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) { return; @@ -2500,71 +2493,46 @@ send_tcp_message (GstRTSPStream * stream, gint idx) is_rtp = (idx == 0); - if (is_rtp) { - if (priv->tr_cache_cookie_rtp != priv->transports_cookie) { - clear_tr_cache (priv, is_rtp); - for (walk = priv->transports; walk; walk = g_list_next (walk)) { - GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; - const GstRTSPTransport *t = - gst_rtsp_stream_transport_get_transport (tr); + if (priv->tr_cache_cookie != priv->transports_cookie) { + clear_tr_cache (priv); + priv->tr_cache = + g_ptr_array_new_full (priv->n_tcp_transports, g_object_unref); - if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP) - continue; + for (walk = priv->transports; walk; walk = g_list_next (walk)) { + GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; + const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr); - priv->tr_cache_rtp = - g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr)); - } - priv->tr_cache_cookie_rtp = priv->transports_cookie; - } - } else { - if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) { - clear_tr_cache (priv, is_rtp); - for (walk = priv->transports; walk; walk = g_list_next (walk)) { - GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; - const GstRTSPTransport *t = - gst_rtsp_stream_transport_get_transport (tr); - - if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP) - continue; - - priv->tr_cache_rtcp = - g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr)); - } - priv->tr_cache_cookie_rtcp = priv->transports_cookie; + if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP) + continue; + + g_ptr_array_add (priv->tr_cache, g_object_ref (tr)); } + priv->tr_cache_cookie = priv->transports_cookie; } + transports = priv->tr_cache; + g_ptr_array_ref (transports); priv->n_outstanding += n_messages * priv->n_tcp_transports; g_mutex_unlock (&priv->lock); - if (is_rtp) { - for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) { - GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; + if (transports) { + for (gint index = 0; index < transports->len; index++) { + GstRTSPStreamTransport *tr = + (GstRTSPStreamTransport *) g_ptr_array_index (transports, index); gboolean send_ret = TRUE; - if (buffer) - send_ret = gst_rtsp_stream_transport_send_rtp (tr, buffer); - if (buffer_list) - send_ret = gst_rtsp_stream_transport_send_rtp_list (tr, buffer_list); - - if (!send_ret) { - /* remove transport on send error */ - g_mutex_lock (&priv->lock); - priv->n_outstanding -= n_messages; - update_transport (stream, tr, FALSE); - g_mutex_unlock (&priv->lock); - } - } - } else { - for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) { - GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data; - gboolean send_ret = TRUE; - - if (buffer) - send_ret = gst_rtsp_stream_transport_send_rtcp (tr, buffer); - if (buffer_list) - send_ret = gst_rtsp_stream_transport_send_rtcp_list (tr, buffer_list); + if (is_rtp) { + if (buffer) + send_ret = gst_rtsp_stream_transport_send_rtp (tr, buffer); + if (buffer_list) + send_ret = gst_rtsp_stream_transport_send_rtp_list (tr, buffer_list); + } else { + if (buffer) + send_ret = gst_rtsp_stream_transport_send_rtcp (tr, buffer); + if (buffer_list) + send_ret = gst_rtsp_stream_transport_send_rtcp_list (tr, buffer_list); + } if (!send_ret) { /* remove transport on send error */ @@ -2574,6 +2542,7 @@ send_tcp_message (GstRTSPStream * stream, gint idx) g_mutex_unlock (&priv->lock); } } + g_ptr_array_unref (transports); } gst_sample_unref (sample); @@ -3869,8 +3838,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, g_mutex_lock (&priv->lock); } - clear_tr_cache (priv, TRUE); - clear_tr_cache (priv, FALSE); + clear_tr_cache (priv); GST_INFO ("stream %p leaving bin", stream);