From 54b6b3bcab504b28ce48c557ed735871e9c1bdc9 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 6 Feb 2020 22:46:18 +0100 Subject: [PATCH] rtsp-stream: marshal calls to send_tcp_message to a single thread In order to address the race condition pointed out at https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/merge_requests/108#note_403579 we get rid of the send thread pool, and instead spawn and manage a single thread to pull samples from app sinks and add them to the transport's backlogs. Additionally, we now also always go through the backlogs in order to simplify the logic. --- gst/rtsp-server/rtsp-stream.c | 226 ++++++++++++++++++++-------------- 1 file changed, 131 insertions(+), 95 deletions(-) diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 7fd171d356..dc484e30a2 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -190,6 +190,18 @@ struct _GstRTSPStreamPrivate gint dscp_qos; + /* Sending logic for TCP */ + GThread *send_thread; + GCond send_cond; + GMutex send_lock; + /* @send_lock is released when pushing data out, we use + * a cookie to decide whether we should wait on @send_cond + * before checking the transports' backlogs again + */ + guint send_cookie; + /* Used to control shutdown of @send_thread */ + gboolean continue_sending; + /* stream blocking */ gulong blocked_id[2]; gboolean blocking; @@ -314,6 +326,11 @@ gst_rtsp_stream_init (GstRTSPStream * stream) g_mutex_init (&priv->lock); + priv->continue_sending = TRUE; + priv->send_cookie = 0; + g_cond_init (&priv->send_cond); + g_mutex_init (&priv->send_lock); + priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_caps_unref); priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL, @@ -400,6 +417,9 @@ gst_rtsp_stream_finalize (GObject * obj) g_hash_table_unref (priv->keys); g_hash_table_destroy (priv->ptmap); + g_mutex_clear (&priv->send_lock); + g_cond_clear (&priv->send_cond); + G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj); } @@ -2548,6 +2568,33 @@ ensure_cached_transports (GstRTSPStream * stream) } } +/* Must be called *without* priv->lock */ +static void +check_transport_backlog (GstRTSPStream * stream, GstRTSPStreamTransport * trans) +{ + gst_rtsp_stream_transport_lock_backlog (trans); + + if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) { + GstBuffer *buffer; + GstBufferList *buffer_list; + gboolean is_rtp; + gboolean popped; + + popped = + gst_rtsp_stream_transport_backlog_pop (trans, &buffer, &buffer_list, + &is_rtp); + + g_assert (popped == TRUE); + + push_data (stream, trans, buffer, buffer_list, is_rtp); + + gst_clear_buffer (&buffer); + gst_clear_buffer_list (&buffer_list); + } + + gst_rtsp_stream_transport_unlock_backlog (trans); +} + /* Must be called with priv->lock */ static void send_tcp_message (GstRTSPStream * stream, gint idx) @@ -2598,6 +2645,33 @@ send_tcp_message (GstRTSPStream * stream, gint idx) if (transports) g_ptr_array_ref (transports); + if (transports) { + gint index; + + for (index = 0; index < transports->len; index++) { + GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index); + GstBuffer *buf_ref = NULL; + GstBufferList *buflist_ref = NULL; + + gst_rtsp_stream_transport_lock_backlog (tr); + + if (buffer) + buf_ref = gst_buffer_ref (buffer); + if (buffer_list) + buflist_ref = gst_buffer_list_ref (buffer_list); + + if (!gst_rtsp_stream_transport_backlog_push (tr, + buf_ref, buflist_ref, is_rtp)) { + GST_ERROR_OBJECT (stream, + "Dropping slow transport %" GST_PTR_FORMAT, tr); + update_transport (stream, tr, FALSE); + } + + gst_rtsp_stream_transport_unlock_backlog (tr); + } + } + gst_sample_unref (sample); + g_mutex_unlock (&priv->lock); if (transports) { @@ -2606,51 +2680,32 @@ send_tcp_message (GstRTSPStream * stream, gint idx) for (index = 0; index < transports->len; index++) { GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index); - gst_rtsp_stream_transport_lock_backlog (tr); - - if (gst_rtsp_stream_transport_backlog_is_empty (tr) - && !gst_rtsp_stream_transport_check_back_pressure (tr, idx)) { - push_data (stream, tr, buffer, buffer_list, is_rtp); - } else { - GstBuffer *buf_ref = NULL; - GstBufferList *buflist_ref = NULL; - - if (buffer) - buf_ref = gst_buffer_ref (buffer); - if (buffer_list) - buflist_ref = gst_buffer_list_ref (buffer_list); - - if (!gst_rtsp_stream_transport_backlog_push (tr, - buf_ref, buflist_ref, is_rtp)) { - GST_WARNING_OBJECT (stream, - "Dropping slow transport %" GST_PTR_FORMAT, tr); - g_mutex_lock (&priv->lock); - update_transport (stream, tr, FALSE); - g_mutex_unlock (&priv->lock); - } - } - - gst_rtsp_stream_transport_unlock_backlog (tr); + check_transport_backlog (stream, tr); } g_ptr_array_unref (transports); } - gst_sample_unref (sample); g_mutex_lock (&priv->lock); } -static void -send_thread_main (gpointer data, gpointer user_data) +static gpointer +send_func (GstRTSPStream * stream) { - GstRTSPStream *stream = user_data; GstRTSPStreamPrivate *priv = stream->priv; - gint idx; - gint i; + gboolean cont = TRUE; - g_mutex_lock (&priv->lock); + g_mutex_lock (&priv->send_lock); + + while (cont) { + int i; + int idx = -1; + guint cookie; + + cookie = priv->send_cookie; + g_mutex_unlock (&priv->send_lock); + + g_mutex_lock (&priv->lock); - do { - idx = -1; /* iterate from 1 and down, so we prioritize RTCP over RTP */ for (i = 1; i >= 0; i--) { if (priv->have_buffer[i]) { @@ -2660,12 +2715,22 @@ send_thread_main (gpointer data, gpointer user_data) } } - if (idx != -1) + if (idx != -1) { send_tcp_message (stream, idx); - } while (idx != -1); + } - GST_DEBUG_OBJECT (stream, "send thread done"); - g_mutex_unlock (&priv->lock); + g_mutex_unlock (&priv->lock); + + g_mutex_lock (&priv->send_lock); + while (cookie == priv->send_cookie) { + g_cond_wait (&priv->send_cond, &priv->send_lock); + } + cont = priv->continue_sending; + } + + g_mutex_unlock (&priv->send_lock); + + return NULL; } static GstFlowReturn @@ -2674,28 +2739,27 @@ handle_new_sample (GstAppSink * sink, gpointer user_data) GstRTSPStream *stream = user_data; GstRTSPStreamPrivate *priv = stream->priv; int i; - int idx = -1; g_mutex_lock (&priv->lock); - if (priv->send_pool == NULL) { - GST_DEBUG_OBJECT (stream, "create thread pool"); - priv->send_pool = - g_thread_pool_new (send_thread_main, user_data, 1, TRUE, NULL); - } - - for (i = 0; i < 2; i++) + for (i = 0; i < 2; i++) { if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) { priv->have_buffer[i] = TRUE; - idx = i; break; } + } - if (idx != -1) - send_tcp_message (stream, idx); + if (priv->send_thread == NULL) { + priv->send_thread = g_thread_new (NULL, (GThreadFunc) send_func, user_data); + } g_mutex_unlock (&priv->lock); + g_mutex_lock (&priv->send_lock); + priv->send_cookie++; + g_cond_signal (&priv->send_cond); + g_mutex_unlock (&priv->send_lock); + return GST_FLOW_OK; } @@ -3904,6 +3968,16 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin, priv = stream->priv; + g_mutex_lock (&priv->send_lock); + priv->continue_sending = FALSE; + priv->send_cookie++; + g_cond_signal (&priv->send_cond); + g_mutex_unlock (&priv->send_lock); + + if (priv->send_thread) { + g_thread_join (priv->send_thread); + } + g_mutex_lock (&priv->lock); if (priv->joined_bin == NULL) goto was_not_joined; @@ -4557,55 +4631,17 @@ mcast_error: static void on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data) { - GstRTSPStream *stream = user_data; + GstRTSPStream *stream = GST_RTSP_STREAM (user_data); GstRTSPStreamPrivate *priv = stream->priv; - gint idx = -1; - gint i; GST_DEBUG_OBJECT (stream, "message send complete"); - gst_rtsp_stream_transport_lock_backlog (trans); + check_transport_backlog (stream, trans); - if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) { - GstBuffer *buffer; - GstBufferList *buffer_list; - gboolean is_rtp; - gboolean popped; - - popped = - gst_rtsp_stream_transport_backlog_pop (trans, &buffer, &buffer_list, - &is_rtp); - - g_assert (popped == TRUE); - - push_data (stream, trans, buffer, buffer_list, is_rtp); - - gst_clear_buffer (&buffer); - gst_clear_buffer_list (&buffer_list); - } else { - g_mutex_lock (&priv->lock); - /* iterate from 1 and down, so we prioritize RTCP over RTP */ - for (i = 1; i >= 0; i--) { - if (priv->have_buffer[i]) { - /* send message */ - idx = i; - break; - } - } - - if (idx != -1) { - gint dummy; - - if (priv->send_pool) { - GST_DEBUG_OBJECT (stream, "start thread"); - g_thread_pool_push (priv->send_pool, &dummy, NULL); - } - } - - g_mutex_unlock (&priv->lock); - } - - gst_rtsp_stream_transport_unlock_backlog (trans); + g_mutex_lock (&priv->send_lock); + priv->send_cookie++; + g_cond_signal (&priv->send_cond); + g_mutex_unlock (&priv->send_lock); } /** @@ -6027,8 +6063,8 @@ gst_rtsp_stream_set_rate_control (GstRTSPStream * stream, gboolean enabled) if (stream->priv->appsink[0]) g_object_set (stream->priv->appsink[0], "sync", enabled, NULL); if (stream->priv->payloader - && g_object_class_find_property (G_OBJECT_GET_CLASS (stream-> - priv->payloader), "onvif-no-rate-control")) + && g_object_class_find_property (G_OBJECT_GET_CLASS (stream->priv-> + payloader), "onvif-no-rate-control")) g_object_set (stream->priv->payloader, "onvif-no-rate-control", !enabled, NULL); if (stream->priv->session) {