rtsp-stream: properly protect TCP backlog access

Fixes #97

We cannot hold stream->lock while pushing data, but need
to consistently check the state of the backlog both from
the send_tcp_message function and the on_message_sent function,
which may or may not be called from the same thread.

This commit introduces internal API to allow for potentially
recursive locking of transport streams, addressing a race
condition where the RTSP stream could push items out of order
when popping them from the backlog.
This commit is contained in:
Mathieu Duponchelle 2020-02-05 20:28:19 +01:00 committed by Mathieu Duponchelle
parent a2ba3639a5
commit 50ecbb1596
3 changed files with 40 additions and 10 deletions

View file

@ -42,6 +42,10 @@ gboolean gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamT
gboolean gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport *trans); gboolean gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport *trans);
void gst_rtsp_stream_transport_lock_backlog (GstRTSPStreamTransport * trans);
void gst_rtsp_stream_transport_unlock_backlog (GstRTSPStreamTransport * trans);
void gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport *trans, void gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport *trans,
GstRTSPBackPressureFunc back_pressure_func, GstRTSPBackPressureFunc back_pressure_func,
gpointer user_data, gpointer user_data,

View file

@ -85,6 +85,7 @@ struct _GstRTSPStreamTransportPrivate
/* TCP backlog */ /* TCP backlog */
GstClockTime first_rtp_timestamp; GstClockTime first_rtp_timestamp;
GstQueueArray *items; GstQueueArray *items;
GRecMutex backlog_lock;
}; };
#define MAX_BACKLOG_DURATION (10 * GST_SECOND) #define MAX_BACKLOG_DURATION (10 * GST_SECOND)
@ -140,6 +141,7 @@ gst_rtsp_stream_transport_init (GstRTSPStreamTransport * trans)
trans->priv->first_rtp_timestamp = GST_CLOCK_TIME_NONE; trans->priv->first_rtp_timestamp = GST_CLOCK_TIME_NONE;
gst_queue_array_set_clear_func (trans->priv->items, gst_queue_array_set_clear_func (trans->priv->items,
(GDestroyNotify) clear_backlog_item); (GDestroyNotify) clear_backlog_item);
g_rec_mutex_init (&trans->priv->backlog_lock);
} }
static void static void
@ -167,6 +169,8 @@ gst_rtsp_stream_transport_finalize (GObject * obj)
gst_queue_array_free (priv->items); gst_queue_array_free (priv->items);
g_rec_mutex_clear (&priv->backlog_lock);
G_OBJECT_CLASS (gst_rtsp_stream_transport_parent_class)->finalize (obj); G_OBJECT_CLASS (gst_rtsp_stream_transport_parent_class)->finalize (obj);
} }
@ -832,7 +836,8 @@ get_first_backlog_timestamp (GstRTSPStreamTransport * trans)
return ret; return ret;
} }
/* Not MT-safe, caller should ensure consistent locking. Ownership /* Not MT-safe, caller should ensure consistent locking (see
* gst_rtsp_stream_transport_lock_backlog()). Ownership
* of @buffer and @buffer_list is transfered to the transport */ * of @buffer and @buffer_list is transfered to the transport */
gboolean gboolean
gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans, gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans,
@ -875,7 +880,8 @@ gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans,
return ret; return ret;
} }
/* Not MT-safe, caller should ensure consistent locking. Ownership /* Not MT-safe, caller should ensure consistent locking (see
* gst_rtsp_stream_transport_lock_backlog()). Ownership
* of @buffer and @buffer_list is transfered back to the caller */ * of @buffer and @buffer_list is transfered back to the caller */
gboolean gboolean
gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans, gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans,
@ -900,9 +906,25 @@ gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans,
return TRUE; return TRUE;
} }
/* Not MT-safe, caller should ensure consistent locking. */ /* Not MT-safe, caller should ensure consistent locking.
* See gst_rtsp_stream_transport_lock_backlog() */
gboolean gboolean
gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport * trans) gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport * trans)
{ {
return gst_queue_array_is_empty (trans->priv->items); return gst_queue_array_is_empty (trans->priv->items);
} }
/* Internal API, protects access to the TCP backlog. Safe to
* call recursively */
void
gst_rtsp_stream_transport_lock_backlog (GstRTSPStreamTransport * trans)
{
g_rec_mutex_lock (&trans->priv->backlog_lock);
}
/* See gst_rtsp_stream_transport_lock_backlog() */
void
gst_rtsp_stream_transport_unlock_backlog (GstRTSPStreamTransport * trans)
{
g_rec_mutex_unlock (&trans->priv->backlog_lock);
}

View file

@ -202,6 +202,7 @@ struct _GstRTSPStreamPrivate
GstRTSPPublishClockMode publish_clock_mode; GstRTSPPublishClockMode publish_clock_mode;
GThreadPool *send_pool; GThreadPool *send_pool;
guint32 last_seqnum;
}; };
#define DEFAULT_CONTROL NULL #define DEFAULT_CONTROL NULL
@ -2605,6 +2606,8 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
for (index = 0; index < transports->len; index++) { for (index = 0; index < transports->len; index++) {
GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index); GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
gst_rtsp_stream_transport_lock_backlog (tr);
if (gst_rtsp_stream_transport_backlog_is_empty (tr) if (gst_rtsp_stream_transport_backlog_is_empty (tr)
&& !gst_rtsp_stream_transport_check_back_pressure (tr, idx)) { && !gst_rtsp_stream_transport_check_back_pressure (tr, idx)) {
push_data (stream, tr, buffer, buffer_list, is_rtp); push_data (stream, tr, buffer, buffer_list, is_rtp);
@ -2612,8 +2615,6 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
GstBuffer *buf_ref = NULL; GstBuffer *buf_ref = NULL;
GstBufferList *buflist_ref = NULL; GstBufferList *buflist_ref = NULL;
g_mutex_lock (&priv->lock);
if (buffer) if (buffer)
buf_ref = gst_buffer_ref (buffer); buf_ref = gst_buffer_ref (buffer);
if (buffer_list) if (buffer_list)
@ -2623,12 +2624,14 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
buf_ref, buflist_ref, is_rtp)) { buf_ref, buflist_ref, is_rtp)) {
GST_WARNING_OBJECT (stream, GST_WARNING_OBJECT (stream,
"Dropping slow transport %" GST_PTR_FORMAT, tr); "Dropping slow transport %" GST_PTR_FORMAT, tr);
g_mutex_lock (&priv->lock);
update_transport (stream, tr, FALSE); update_transport (stream, tr, FALSE);
}
g_mutex_unlock (&priv->lock); g_mutex_unlock (&priv->lock);
} }
} }
gst_rtsp_stream_transport_unlock_backlog (tr);
}
g_ptr_array_unref (transports); g_ptr_array_unref (transports);
} }
gst_sample_unref (sample); gst_sample_unref (sample);
@ -4561,7 +4564,7 @@ on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data)
GST_DEBUG_OBJECT (stream, "message send complete"); GST_DEBUG_OBJECT (stream, "message send complete");
g_mutex_lock (&priv->lock); gst_rtsp_stream_transport_lock_backlog (trans);
if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) { if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) {
GstBuffer *buffer; GstBuffer *buffer;
@ -4575,13 +4578,12 @@ on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data)
g_assert (popped == TRUE); g_assert (popped == TRUE);
g_mutex_unlock (&priv->lock);
push_data (stream, trans, buffer, buffer_list, is_rtp); push_data (stream, trans, buffer, buffer_list, is_rtp);
gst_clear_buffer (&buffer); gst_clear_buffer (&buffer);
gst_clear_buffer_list (&buffer_list); gst_clear_buffer_list (&buffer_list);
} else { } else {
g_mutex_lock (&priv->lock);
/* iterate from 1 and down, so we prioritize RTCP over RTP */ /* iterate from 1 and down, so we prioritize RTCP over RTP */
for (i = 1; i >= 0; i--) { for (i = 1; i >= 0; i--) {
if (priv->have_buffer[i]) { if (priv->have_buffer[i]) {
@ -4602,6 +4604,8 @@ on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data)
g_mutex_unlock (&priv->lock); g_mutex_unlock (&priv->lock);
} }
gst_rtsp_stream_transport_unlock_backlog (trans);
} }
/** /**