rtsp-stream: marshal calls to send_tcp_message to a single thread

In order to address the race condition pointed out at
https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/merge_requests/108#note_403579
we get rid of the send thread pool, and instead spawn and manage
a single thread to pull samples from app sinks and add them to
the transport's backlogs.

Additionally, we now also always go through the backlogs in order
to simplify the logic.
This commit is contained in:
Mathieu Duponchelle 2020-02-06 22:46:18 +01:00 committed by Mathieu Duponchelle
parent 50ecbb1596
commit 54b6b3bcab

View file

@ -190,6 +190,18 @@ struct _GstRTSPStreamPrivate
gint dscp_qos;
/* Sending logic for TCP */
GThread *send_thread;
GCond send_cond;
GMutex send_lock;
/* @send_lock is released when pushing data out, we use
* a cookie to decide whether we should wait on @send_cond
* before checking the transports' backlogs again
*/
guint send_cookie;
/* Used to control shutdown of @send_thread */
gboolean continue_sending;
/* stream blocking */
gulong blocked_id[2];
gboolean blocking;
@ -314,6 +326,11 @@ gst_rtsp_stream_init (GstRTSPStream * stream)
g_mutex_init (&priv->lock);
priv->continue_sending = TRUE;
priv->send_cookie = 0;
g_cond_init (&priv->send_cond);
g_mutex_init (&priv->send_lock);
priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, (GDestroyNotify) gst_caps_unref);
priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
@ -400,6 +417,9 @@ gst_rtsp_stream_finalize (GObject * obj)
g_hash_table_unref (priv->keys);
g_hash_table_destroy (priv->ptmap);
g_mutex_clear (&priv->send_lock);
g_cond_clear (&priv->send_cond);
G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
}
@ -2548,6 +2568,33 @@ ensure_cached_transports (GstRTSPStream * stream)
}
}
/* Must be called *without* priv->lock */
static void
check_transport_backlog (GstRTSPStream * stream, GstRTSPStreamTransport * trans)
{
gst_rtsp_stream_transport_lock_backlog (trans);
if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) {
GstBuffer *buffer;
GstBufferList *buffer_list;
gboolean is_rtp;
gboolean popped;
popped =
gst_rtsp_stream_transport_backlog_pop (trans, &buffer, &buffer_list,
&is_rtp);
g_assert (popped == TRUE);
push_data (stream, trans, buffer, buffer_list, is_rtp);
gst_clear_buffer (&buffer);
gst_clear_buffer_list (&buffer_list);
}
gst_rtsp_stream_transport_unlock_backlog (trans);
}
/* Must be called with priv->lock */
static void
send_tcp_message (GstRTSPStream * stream, gint idx)
@ -2598,6 +2645,33 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
if (transports)
g_ptr_array_ref (transports);
if (transports) {
gint index;
for (index = 0; index < transports->len; index++) {
GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
GstBuffer *buf_ref = NULL;
GstBufferList *buflist_ref = NULL;
gst_rtsp_stream_transport_lock_backlog (tr);
if (buffer)
buf_ref = gst_buffer_ref (buffer);
if (buffer_list)
buflist_ref = gst_buffer_list_ref (buffer_list);
if (!gst_rtsp_stream_transport_backlog_push (tr,
buf_ref, buflist_ref, is_rtp)) {
GST_ERROR_OBJECT (stream,
"Dropping slow transport %" GST_PTR_FORMAT, tr);
update_transport (stream, tr, FALSE);
}
gst_rtsp_stream_transport_unlock_backlog (tr);
}
}
gst_sample_unref (sample);
g_mutex_unlock (&priv->lock);
if (transports) {
@ -2606,51 +2680,32 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
for (index = 0; index < transports->len; index++) {
GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
gst_rtsp_stream_transport_lock_backlog (tr);
if (gst_rtsp_stream_transport_backlog_is_empty (tr)
&& !gst_rtsp_stream_transport_check_back_pressure (tr, idx)) {
push_data (stream, tr, buffer, buffer_list, is_rtp);
} else {
GstBuffer *buf_ref = NULL;
GstBufferList *buflist_ref = NULL;
if (buffer)
buf_ref = gst_buffer_ref (buffer);
if (buffer_list)
buflist_ref = gst_buffer_list_ref (buffer_list);
if (!gst_rtsp_stream_transport_backlog_push (tr,
buf_ref, buflist_ref, is_rtp)) {
GST_WARNING_OBJECT (stream,
"Dropping slow transport %" GST_PTR_FORMAT, tr);
g_mutex_lock (&priv->lock);
update_transport (stream, tr, FALSE);
g_mutex_unlock (&priv->lock);
}
}
gst_rtsp_stream_transport_unlock_backlog (tr);
check_transport_backlog (stream, tr);
}
g_ptr_array_unref (transports);
}
gst_sample_unref (sample);
g_mutex_lock (&priv->lock);
}
static void
send_thread_main (gpointer data, gpointer user_data)
static gpointer
send_func (GstRTSPStream * stream)
{
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
gint idx;
gint i;
gboolean cont = TRUE;
g_mutex_lock (&priv->lock);
g_mutex_lock (&priv->send_lock);
while (cont) {
int i;
int idx = -1;
guint cookie;
cookie = priv->send_cookie;
g_mutex_unlock (&priv->send_lock);
g_mutex_lock (&priv->lock);
do {
idx = -1;
/* iterate from 1 and down, so we prioritize RTCP over RTP */
for (i = 1; i >= 0; i--) {
if (priv->have_buffer[i]) {
@ -2660,12 +2715,22 @@ send_thread_main (gpointer data, gpointer user_data)
}
}
if (idx != -1)
if (idx != -1) {
send_tcp_message (stream, idx);
} while (idx != -1);
}
GST_DEBUG_OBJECT (stream, "send thread done");
g_mutex_unlock (&priv->lock);
g_mutex_unlock (&priv->lock);
g_mutex_lock (&priv->send_lock);
while (cookie == priv->send_cookie) {
g_cond_wait (&priv->send_cond, &priv->send_lock);
}
cont = priv->continue_sending;
}
g_mutex_unlock (&priv->send_lock);
return NULL;
}
static GstFlowReturn
@ -2674,28 +2739,27 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
int i;
int idx = -1;
g_mutex_lock (&priv->lock);
if (priv->send_pool == NULL) {
GST_DEBUG_OBJECT (stream, "create thread pool");
priv->send_pool =
g_thread_pool_new (send_thread_main, user_data, 1, TRUE, NULL);
}
for (i = 0; i < 2; i++)
for (i = 0; i < 2; i++) {
if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
priv->have_buffer[i] = TRUE;
idx = i;
break;
}
}
if (idx != -1)
send_tcp_message (stream, idx);
if (priv->send_thread == NULL) {
priv->send_thread = g_thread_new (NULL, (GThreadFunc) send_func, user_data);
}
g_mutex_unlock (&priv->lock);
g_mutex_lock (&priv->send_lock);
priv->send_cookie++;
g_cond_signal (&priv->send_cond);
g_mutex_unlock (&priv->send_lock);
return GST_FLOW_OK;
}
@ -3904,6 +3968,16 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
priv = stream->priv;
g_mutex_lock (&priv->send_lock);
priv->continue_sending = FALSE;
priv->send_cookie++;
g_cond_signal (&priv->send_cond);
g_mutex_unlock (&priv->send_lock);
if (priv->send_thread) {
g_thread_join (priv->send_thread);
}
g_mutex_lock (&priv->lock);
if (priv->joined_bin == NULL)
goto was_not_joined;
@ -4557,55 +4631,17 @@ mcast_error:
static void
on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data)
{
GstRTSPStream *stream = user_data;
GstRTSPStream *stream = GST_RTSP_STREAM (user_data);
GstRTSPStreamPrivate *priv = stream->priv;
gint idx = -1;
gint i;
GST_DEBUG_OBJECT (stream, "message send complete");
gst_rtsp_stream_transport_lock_backlog (trans);
check_transport_backlog (stream, trans);
if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) {
GstBuffer *buffer;
GstBufferList *buffer_list;
gboolean is_rtp;
gboolean popped;
popped =
gst_rtsp_stream_transport_backlog_pop (trans, &buffer, &buffer_list,
&is_rtp);
g_assert (popped == TRUE);
push_data (stream, trans, buffer, buffer_list, is_rtp);
gst_clear_buffer (&buffer);
gst_clear_buffer_list (&buffer_list);
} else {
g_mutex_lock (&priv->lock);
/* iterate from 1 and down, so we prioritize RTCP over RTP */
for (i = 1; i >= 0; i--) {
if (priv->have_buffer[i]) {
/* send message */
idx = i;
break;
}
}
if (idx != -1) {
gint dummy;
if (priv->send_pool) {
GST_DEBUG_OBJECT (stream, "start thread");
g_thread_pool_push (priv->send_pool, &dummy, NULL);
}
}
g_mutex_unlock (&priv->lock);
}
gst_rtsp_stream_transport_unlock_backlog (trans);
g_mutex_lock (&priv->send_lock);
priv->send_cookie++;
g_cond_signal (&priv->send_cond);
g_mutex_unlock (&priv->send_lock);
}
/**
@ -6027,8 +6063,8 @@ gst_rtsp_stream_set_rate_control (GstRTSPStream * stream, gboolean enabled)
if (stream->priv->appsink[0])
g_object_set (stream->priv->appsink[0], "sync", enabled, NULL);
if (stream->priv->payloader
&& g_object_class_find_property (G_OBJECT_GET_CLASS (stream->
priv->payloader), "onvif-no-rate-control"))
&& g_object_class_find_property (G_OBJECT_GET_CLASS (stream->priv->
payloader), "onvif-no-rate-control"))
g_object_set (stream->priv->payloader, "onvif-no-rate-control", !enabled,
NULL);
if (stream->priv->session) {