From 50ecbb15965639c89e5c127e749e1e2eef5d0302 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Wed, 5 Feb 2020 20:28:19 +0100 Subject: [PATCH] rtsp-stream: properly protect TCP backlog access Fixes #97 We cannot hold stream->lock while pushing data, but need to consistently check the state of the backlog both from the send_tcp_message function and the on_message_sent function, which may or may not be called from the same thread. This commit introduces internal API to allow for potentially recursive locking of transport streams, addressing a race condition where the RTSP stream could push items out of order when popping them from the backlog. --- gst/rtsp-server/rtsp-server-internal.h | 4 ++++ gst/rtsp-server/rtsp-stream-transport.c | 28 ++++++++++++++++++++++--- gst/rtsp-server/rtsp-stream.c | 18 +++++++++------- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/gst/rtsp-server/rtsp-server-internal.h b/gst/rtsp-server/rtsp-server-internal.h index 13defeefef..03b02e2b20 100644 --- a/gst/rtsp-server/rtsp-server-internal.h +++ b/gst/rtsp-server/rtsp-server-internal.h @@ -42,6 +42,10 @@ gboolean gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamT gboolean gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport *trans); +void gst_rtsp_stream_transport_lock_backlog (GstRTSPStreamTransport * trans); + +void gst_rtsp_stream_transport_unlock_backlog (GstRTSPStreamTransport * trans); + void gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport *trans, GstRTSPBackPressureFunc back_pressure_func, gpointer user_data, diff --git a/gst/rtsp-server/rtsp-stream-transport.c b/gst/rtsp-server/rtsp-stream-transport.c index e30e44096c..8ce16ddcd5 100644 --- a/gst/rtsp-server/rtsp-stream-transport.c +++ b/gst/rtsp-server/rtsp-stream-transport.c @@ -85,6 +85,7 @@ struct _GstRTSPStreamTransportPrivate /* TCP backlog */ GstClockTime first_rtp_timestamp; GstQueueArray *items; + GRecMutex backlog_lock; }; #define MAX_BACKLOG_DURATION (10 * GST_SECOND) @@ -140,6 +141,7 @@ gst_rtsp_stream_transport_init (GstRTSPStreamTransport * trans) trans->priv->first_rtp_timestamp = GST_CLOCK_TIME_NONE; gst_queue_array_set_clear_func (trans->priv->items, (GDestroyNotify) clear_backlog_item); + g_rec_mutex_init (&trans->priv->backlog_lock); } static void @@ -167,6 +169,8 @@ gst_rtsp_stream_transport_finalize (GObject * obj) gst_queue_array_free (priv->items); + g_rec_mutex_clear (&priv->backlog_lock); + G_OBJECT_CLASS (gst_rtsp_stream_transport_parent_class)->finalize (obj); } @@ -832,7 +836,8 @@ get_first_backlog_timestamp (GstRTSPStreamTransport * trans) return ret; } -/* Not MT-safe, caller should ensure consistent locking. Ownership +/* Not MT-safe, caller should ensure consistent locking (see + * gst_rtsp_stream_transport_lock_backlog()). Ownership * of @buffer and @buffer_list is transfered to the transport */ gboolean gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans, @@ -875,7 +880,8 @@ gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans, return ret; } -/* Not MT-safe, caller should ensure consistent locking. Ownership +/* Not MT-safe, caller should ensure consistent locking (see + * gst_rtsp_stream_transport_lock_backlog()). Ownership * of @buffer and @buffer_list is transfered back to the caller */ gboolean gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans, @@ -900,9 +906,25 @@ gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans, return TRUE; } -/* Not MT-safe, caller should ensure consistent locking. */ +/* Not MT-safe, caller should ensure consistent locking. + * See gst_rtsp_stream_transport_lock_backlog() */ gboolean gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport * trans) { return gst_queue_array_is_empty (trans->priv->items); } + +/* Internal API, protects access to the TCP backlog. Safe to + * call recursively */ +void +gst_rtsp_stream_transport_lock_backlog (GstRTSPStreamTransport * trans) +{ + g_rec_mutex_lock (&trans->priv->backlog_lock); +} + +/* See gst_rtsp_stream_transport_lock_backlog() */ +void +gst_rtsp_stream_transport_unlock_backlog (GstRTSPStreamTransport * trans) +{ + g_rec_mutex_unlock (&trans->priv->backlog_lock); +} diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c index 30203ad3f5..7fd171d356 100644 --- a/gst/rtsp-server/rtsp-stream.c +++ b/gst/rtsp-server/rtsp-stream.c @@ -202,6 +202,7 @@ struct _GstRTSPStreamPrivate GstRTSPPublishClockMode publish_clock_mode; GThreadPool *send_pool; + guint32 last_seqnum; }; #define DEFAULT_CONTROL NULL @@ -2605,6 +2606,8 @@ send_tcp_message (GstRTSPStream * stream, gint idx) for (index = 0; index < transports->len; index++) { GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index); + gst_rtsp_stream_transport_lock_backlog (tr); + if (gst_rtsp_stream_transport_backlog_is_empty (tr) && !gst_rtsp_stream_transport_check_back_pressure (tr, idx)) { push_data (stream, tr, buffer, buffer_list, is_rtp); @@ -2612,8 +2615,6 @@ send_tcp_message (GstRTSPStream * stream, gint idx) GstBuffer *buf_ref = NULL; GstBufferList *buflist_ref = NULL; - g_mutex_lock (&priv->lock); - if (buffer) buf_ref = gst_buffer_ref (buffer); if (buffer_list) @@ -2623,11 +2624,13 @@ send_tcp_message (GstRTSPStream * stream, gint idx) buf_ref, buflist_ref, is_rtp)) { GST_WARNING_OBJECT (stream, "Dropping slow transport %" GST_PTR_FORMAT, tr); + g_mutex_lock (&priv->lock); update_transport (stream, tr, FALSE); + g_mutex_unlock (&priv->lock); } - - g_mutex_unlock (&priv->lock); } + + gst_rtsp_stream_transport_unlock_backlog (tr); } g_ptr_array_unref (transports); } @@ -4561,7 +4564,7 @@ on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data) GST_DEBUG_OBJECT (stream, "message send complete"); - g_mutex_lock (&priv->lock); + gst_rtsp_stream_transport_lock_backlog (trans); if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) { GstBuffer *buffer; @@ -4575,13 +4578,12 @@ on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data) g_assert (popped == TRUE); - g_mutex_unlock (&priv->lock); - push_data (stream, trans, buffer, buffer_list, is_rtp); gst_clear_buffer (&buffer); gst_clear_buffer_list (&buffer_list); } else { + g_mutex_lock (&priv->lock); /* iterate from 1 and down, so we prioritize RTCP over RTP */ for (i = 1; i >= 0; i--) { if (priv->have_buffer[i]) { @@ -4602,6 +4604,8 @@ on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data) g_mutex_unlock (&priv->lock); } + + gst_rtsp_stream_transport_unlock_backlog (trans); } /**