rtsp-stream : fix race condition in send_tcp_message

If one thread is inside the send_tcp_message function and are done
sending rtp or rtcp messages so the n_outstanding variable is zero
however have not exit the loop sending the messages. While sending its
messages, transports have been added or removed to the transport list,
so the cache should be updated. If now an additional thread comes to
the function send_tcp_message and trying to send rtp messages it will
first destroy the rtp cache that is still being iterated trough by the
first thread.

Fixes #81
This commit is contained in:
Adam x Nilsson 2019-10-07 12:13:47 +02:00 committed by Mathieu Duponchelle
parent 6b3bd23e40
commit 0b1b6670c8

View file

@ -163,10 +163,8 @@ struct _GstRTSPStreamPrivate
guint n_active;
GList *transports;
guint transports_cookie;
GList *tr_cache_rtp;
GList *tr_cache_rtcp;
guint tr_cache_cookie_rtp;
guint tr_cache_cookie_rtcp;
GPtrArray *tr_cache;
guint tr_cache_cookie;
guint n_tcp_transports;
gboolean have_buffer[2];
guint n_outstanding;
@ -2445,17 +2443,11 @@ on_sender_ssrc_active (GObject * session, GObject * source,
}
static void
clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
clear_tr_cache (GstRTSPStreamPrivate * priv)
{
if (is_rtp) {
g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
g_list_free (priv->tr_cache_rtp);
priv->tr_cache_rtp = NULL;
} else {
g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
g_list_free (priv->tr_cache_rtcp);
priv->tr_cache_rtcp = NULL;
}
if (priv->tr_cache)
g_ptr_array_unref (priv->tr_cache);
priv->tr_cache = NULL;
}
/* Must be called with priv->lock */
@ -2470,6 +2462,7 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
GstBufferList *buffer_list;
guint n_messages = 0;
gboolean is_rtp;
GPtrArray *transports;
if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
return;
@ -2500,71 +2493,46 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
is_rtp = (idx == 0);
if (is_rtp) {
if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
clear_tr_cache (priv, is_rtp);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
const GstRTSPTransport *t =
gst_rtsp_stream_transport_get_transport (tr);
if (priv->tr_cache_cookie != priv->transports_cookie) {
clear_tr_cache (priv);
priv->tr_cache =
g_ptr_array_new_full (priv->n_tcp_transports, g_object_unref);
if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
continue;
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
priv->tr_cache_rtp =
g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
}
priv->tr_cache_cookie_rtp = priv->transports_cookie;
}
} else {
if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
clear_tr_cache (priv, is_rtp);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
const GstRTSPTransport *t =
gst_rtsp_stream_transport_get_transport (tr);
if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
continue;
priv->tr_cache_rtcp =
g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
}
priv->tr_cache_cookie_rtcp = priv->transports_cookie;
if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
continue;
g_ptr_array_add (priv->tr_cache, g_object_ref (tr));
}
priv->tr_cache_cookie = priv->transports_cookie;
}
transports = priv->tr_cache;
g_ptr_array_ref (transports);
priv->n_outstanding += n_messages * priv->n_tcp_transports;
g_mutex_unlock (&priv->lock);
if (is_rtp) {
for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
if (transports) {
for (gint index = 0; index < transports->len; index++) {
GstRTSPStreamTransport *tr =
(GstRTSPStreamTransport *) g_ptr_array_index (transports, index);
gboolean send_ret = TRUE;
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtp (tr, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtp_list (tr, buffer_list);
if (!send_ret) {
/* remove transport on send error */
g_mutex_lock (&priv->lock);
priv->n_outstanding -= n_messages;
update_transport (stream, tr, FALSE);
g_mutex_unlock (&priv->lock);
}
}
} else {
for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
gboolean send_ret = TRUE;
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtcp (tr, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtcp_list (tr, buffer_list);
if (is_rtp) {
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtp (tr, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtp_list (tr, buffer_list);
} else {
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtcp (tr, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtcp_list (tr, buffer_list);
}
if (!send_ret) {
/* remove transport on send error */
@ -2574,6 +2542,7 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
g_mutex_unlock (&priv->lock);
}
}
g_ptr_array_unref (transports);
}
gst_sample_unref (sample);
@ -3869,8 +3838,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
g_mutex_lock (&priv->lock);
}
clear_tr_cache (priv, TRUE);
clear_tr_cache (priv, FALSE);
clear_tr_cache (priv);
GST_INFO ("stream %p leaving bin", stream);