stream: refactor TCP backpressure handling

The previous implementation stopped sending TCP messages to
all clients when a single one stopped consuming them, which
obviously created problems for shared media.

Instead, we now manage a backlog in stream-transport, and slow
clients are removed once this backlog exceeds a maximum duration,
currently hardcoded.

Fixes #80
This commit is contained in:
Mathieu Duponchelle 2019-10-15 19:08:32 +02:00
parent 95ce953e34
commit dd32924eb0
5 changed files with 406 additions and 76 deletions

View file

@ -53,6 +53,7 @@
#include "rtsp-client.h"
#include "rtsp-sdp.h"
#include "rtsp-params.h"
#include "rtsp-server-internal.h"
typedef enum
{
@ -1202,6 +1203,12 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
return ret;
}
static gboolean
do_check_back_pressure (guint8 channel, GstRTSPClient * client)
{
return get_data_seq (client, channel) != 0;
}
static gboolean
do_send_data_list (GstBufferList * buffer_list, guint8 channel,
GstRTSPClient * client)
@ -2854,6 +2861,9 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx)
(GstRTSPSendListFunc) do_send_data_list,
(GstRTSPSendListFunc) do_send_data_list, client, NULL);
gst_rtsp_stream_transport_set_back_pressure_callback (trans,
(GstRTSPBackPressureFunc) do_check_back_pressure, client, NULL);
g_hash_table_insert (priv->transports,
GINT_TO_POINTER (ct->interleaved.min), trans);
g_object_ref (trans);

View file

@ -0,0 +1,55 @@
/* GStreamer
* Copyright (C) 2019 Mathieu Duponchelle <mathieu@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_RTSP_SERVER_INTERNAL_H__
#define __GST_RTSP_SERVER_INTERNAL_H__
#include <glib.h>
G_BEGIN_DECLS
#include "rtsp-stream-transport.h"
/* Internal GstRTSPStreamTransport interface */
typedef gboolean (*GstRTSPBackPressureFunc) (guint8 channel, gpointer user_data);
gboolean gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport *trans,
GstBuffer *buffer,
GstBufferList *buffer_list,
gboolean is_rtp);
gboolean gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport *trans,
GstBuffer **buffer,
GstBufferList **buffer_list,
gboolean *is_rtp);
gboolean gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport *trans);
void gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport *trans,
GstRTSPBackPressureFunc back_pressure_func,
gpointer user_data,
GDestroyNotify notify);
gboolean gst_rtsp_stream_transport_check_back_pressure (GstRTSPStreamTransport *trans,
guint8 channel);
G_END_DECLS
#endif /* __GST_RTSP_SERVER_INTERNAL_H__ */

View file

@ -48,6 +48,7 @@
#include <stdlib.h>
#include "rtsp-stream-transport.h"
#include "rtsp-server-internal.h"
struct _GstRTSPStreamTransportPrivate
{
@ -63,6 +64,10 @@ struct _GstRTSPStreamTransportPrivate
gpointer list_user_data;
GDestroyNotify list_notify;
GstRTSPBackPressureFunc back_pressure_func;
gpointer back_pressure_func_data;
GDestroyNotify back_pressure_func_notify;
GstRTSPKeepAliveFunc keep_alive;
gpointer ka_user_data;
GDestroyNotify ka_notify;
@ -77,8 +82,23 @@ struct _GstRTSPStreamTransportPrivate
GstRTSPUrl *url;
GObject *rtpsource;
/* TCP backlog */
GstClockTime first_rtp_timestamp;
GstQueueArray *items;
};
#define MAX_BACKLOG_DURATION (10 * GST_SECOND)
#define MAX_BACKLOG_SIZE 100
typedef struct
{
GstBuffer *buffer;
GstBufferList *buffer_list;
gboolean is_rtp;
} BackLogItem;
enum
{
PROP_0,
@ -106,10 +126,21 @@ gst_rtsp_stream_transport_class_init (GstRTSPStreamTransportClass * klass)
0, "GstRTSPStreamTransport");
}
static void
clear_backlog_item (BackLogItem * item)
{
gst_clear_buffer (&item->buffer);
gst_clear_buffer_list (&item->buffer_list);
}
static void
gst_rtsp_stream_transport_init (GstRTSPStreamTransport * trans)
{
trans->priv = gst_rtsp_stream_transport_get_instance_private (trans);
trans->priv->items = gst_queue_array_new_for_struct (sizeof (BackLogItem), 0);
trans->priv->first_rtp_timestamp = GST_CLOCK_TIME_NONE;
gst_queue_array_set_clear_func (trans->priv->items,
(GDestroyNotify) clear_backlog_item);
}
static void
@ -135,6 +166,8 @@ gst_rtsp_stream_transport_finalize (GObject * obj)
if (priv->url)
gst_rtsp_url_free (priv->url);
gst_queue_array_free (priv->items);
G_OBJECT_CLASS (gst_rtsp_stream_transport_parent_class)->finalize (obj);
}
@ -244,6 +277,39 @@ gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport * trans,
priv->list_notify = notify;
}
void
gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport *
trans, GstRTSPBackPressureFunc back_pressure_func, gpointer user_data,
GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->back_pressure_func = back_pressure_func;
if (priv->back_pressure_func_notify)
priv->back_pressure_func_notify (priv->back_pressure_func_data);
priv->back_pressure_func_data = user_data;
priv->back_pressure_func_notify = notify;
}
gboolean
gst_rtsp_stream_transport_check_back_pressure (GstRTSPStreamTransport * trans,
guint8 channel)
{
GstRTSPStreamTransportPrivate *priv;
gboolean ret = FALSE;
priv = trans->priv;
if (priv->back_pressure_func)
ret = priv->back_pressure_func (channel, priv->back_pressure_func_data);
return ret;
}
/**
* gst_rtsp_stream_transport_set_keepalive:
* @trans: a #GstRTSPStreamTransport
@ -693,7 +759,7 @@ gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport * trans)
priv = trans->priv;
if (priv->message_sent)
priv->message_sent (priv->ms_user_data);
priv->message_sent (trans, priv->ms_user_data);
}
/**
@ -729,3 +795,115 @@ gst_rtsp_stream_transport_recv_data (GstRTSPStreamTransport * trans,
}
return res;
}
static GstClockTime
get_backlog_item_timestamp (BackLogItem * item)
{
GstClockTime ret = GST_CLOCK_TIME_NONE;
if (item->buffer) {
ret = GST_BUFFER_DTS_OR_PTS (item->buffer);
} else if (item->buffer_list) {
g_assert (gst_buffer_list_length (item->buffer_list) > 0);
ret = GST_BUFFER_DTS_OR_PTS (gst_buffer_list_get (item->buffer_list, 0));
}
return ret;
}
static GstClockTime
get_first_backlog_timestamp (GstRTSPStreamTransport * trans)
{
GstRTSPStreamTransportPrivate *priv = trans->priv;
GstClockTime ret = GST_CLOCK_TIME_NONE;
guint i, l;
l = gst_queue_array_get_length (priv->items);
for (i = 0; i < l; i++) {
BackLogItem *item = (BackLogItem *)
gst_queue_array_peek_nth_struct (priv->items, i);
if (item->is_rtp) {
ret = get_backlog_item_timestamp (item);
break;
}
}
return ret;
}
/* Not MT-safe, caller should ensure consistent locking. Ownership
* of @buffer and @buffer_list is transfered to the transport */
gboolean
gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans,
GstBuffer * buffer, GstBufferList * buffer_list, gboolean is_rtp)
{
gboolean ret = TRUE;
BackLogItem item = { 0, };
GstClockTime item_timestamp;
GstRTSPStreamTransportPrivate *priv;
priv = trans->priv;
if (buffer)
item.buffer = buffer;
if (buffer_list)
item.buffer_list = buffer_list;
item.is_rtp = is_rtp;
gst_queue_array_push_tail_struct (priv->items, &item);
item_timestamp = get_backlog_item_timestamp (&item);
if (is_rtp && priv->first_rtp_timestamp != GST_CLOCK_TIME_NONE) {
GstClockTimeDiff queue_duration;
g_assert (GST_CLOCK_TIME_IS_VALID (item_timestamp));
queue_duration = GST_CLOCK_DIFF (priv->first_rtp_timestamp, item_timestamp);
g_assert (queue_duration >= 0);
if (queue_duration > MAX_BACKLOG_DURATION &&
gst_queue_array_get_length (priv->items) > MAX_BACKLOG_SIZE) {
ret = FALSE;
}
} else if (is_rtp) {
priv->first_rtp_timestamp = item_timestamp;
}
return ret;
}
/* Not MT-safe, caller should ensure consistent locking. Ownership
* of @buffer and @buffer_list is transfered back to the caller */
gboolean
gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans,
GstBuffer ** buffer, GstBufferList ** buffer_list, gboolean * is_rtp)
{
BackLogItem *item;
GstRTSPStreamTransportPrivate *priv;
g_return_val_if_fail (!gst_rtsp_stream_transport_backlog_is_empty (trans),
FALSE);
priv = trans->priv;
item = (BackLogItem *) gst_queue_array_pop_head_struct (priv->items);
priv->first_rtp_timestamp = get_first_backlog_timestamp (trans);
*buffer = item->buffer;
*buffer_list = item->buffer_list;
*is_rtp = item->is_rtp;
return TRUE;
}
/* Not MT-safe, caller should ensure consistent locking. */
gboolean
gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport * trans)
{
return gst_queue_array_is_empty (trans->priv->items);
}

View file

@ -18,6 +18,7 @@
*/
#include <gst/gst.h>
#include <gst/base/base.h>
#include <gst/rtsp/gstrtsprange.h>
#include <gst/rtsp/gstrtspurl.h>
@ -88,7 +89,7 @@ typedef void (*GstRTSPKeepAliveFunc) (gpointer user_data);
* Function registered with gst_rtsp_stream_transport_set_message_sent()
* and called when a message has been sent on the transport.
*/
typedef void (*GstRTSPMessageSentFunc) (gpointer user_data);
typedef void (*GstRTSPMessageSentFunc) (GstRTSPStreamTransport *trans, gpointer user_data);
/**
* GstRTSPStreamTransport:
@ -183,8 +184,6 @@ void gst_rtsp_stream_transport_set_timed_out (GstRTSPStreamT
GST_RTSP_SERVER_API
gboolean gst_rtsp_stream_transport_is_timed_out (GstRTSPStreamTransport *trans);
GST_RTSP_SERVER_API
gboolean gst_rtsp_stream_transport_send_rtp (GstRTSPStreamTransport *trans,
GstBuffer *buffer);

View file

@ -43,7 +43,26 @@
* stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
* the destination again.
*
* Last reviewed on 2013-07-16 (1.0.0)
* Each #GstRTSPStreamTransport spawns one queue that will serve as a backlog of a
* controllable maximum size when the reflux from the TCP connection's backpressure
* starts spilling all over.
*
* Unlike the backlog in rtspconnection, which we have decided should only contain
* at most one RTP and one RTCP data message in order to allow control messages to
* go through unobstructed, this backlog only consists of data messages, allowing
* us to fill it up without concern.
*
* When multiple TCP transports exist, for example in the context of a shared media,
* we only pop samples from our appsinks when at least one of the transports doesn't
* experience back pressure: this allows us to pace our sample popping to the speed
* of the fastest client.
*
* When a sample is popped, it is either sent directly on transports that don't
* experience backpressure, or queued on the transport's backlog otherwise. Samples
* are then popped from that backlog when the transport reports it has sent the message.
*
* Once the backlog reaches an overly large duration, the transport is dropped as
* the client was deemed too slow.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
@ -61,6 +80,7 @@
#include <gst/rtp/gstrtpbuffer.h>
#include "rtsp-stream.h"
#include "rtsp-server-internal.h"
struct _GstRTSPStreamPrivate
{
@ -167,7 +187,6 @@ struct _GstRTSPStreamPrivate
guint tr_cache_cookie;
guint n_tcp_transports;
gboolean have_buffer[2];
guint n_outstanding;
gint dscp_qos;
@ -2450,13 +2469,93 @@ clear_tr_cache (GstRTSPStreamPrivate * priv)
priv->tr_cache = NULL;
}
/* With lock taken */
static gboolean
any_transport_ready (GstRTSPStream * stream, guint8 channel)
{
gboolean ret = TRUE;
GstRTSPStreamPrivate *priv = stream->priv;
GPtrArray *transports;
gint index;
transports = priv->tr_cache;
if (!transports)
goto done;
for (index = 0; index < transports->len; index++) {
GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
if (!gst_rtsp_stream_transport_check_back_pressure (tr, channel)) {
ret = TRUE;
break;
} else {
ret = FALSE;
}
}
done:
return ret;
}
/* Must be called *without* priv->lock */
static void
push_data (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
GstBuffer * buffer, GstBufferList * buffer_list, gboolean is_rtp)
{
GstRTSPStreamPrivate *priv = stream->priv;
gboolean send_ret = TRUE;
if (is_rtp) {
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtp (trans, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtp_list (trans, buffer_list);
} else {
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtcp (trans, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtcp_list (trans, buffer_list);
}
if (!send_ret) {
/* remove transport on send error */
g_mutex_lock (&priv->lock);
update_transport (stream, trans, FALSE);
g_mutex_unlock (&priv->lock);
}
}
/* With priv->lock */
static void
ensure_cached_transports (GstRTSPStream * stream)
{
GstRTSPStreamPrivate *priv = stream->priv;
GList *walk;
if (priv->tr_cache_cookie != priv->transports_cookie) {
clear_tr_cache (priv);
priv->tr_cache =
g_ptr_array_new_full (priv->n_tcp_transports, g_object_unref);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
continue;
g_ptr_array_add (priv->tr_cache, g_object_ref (tr));
}
priv->tr_cache_cookie = priv->transports_cookie;
}
}
/* Must be called with priv->lock */
static void
send_tcp_message (GstRTSPStream * stream, gint idx)
{
GstRTSPStreamPrivate *priv = stream->priv;
GstAppSink *sink;
GList *walk;
GstSample *sample;
GstBuffer *buffer;
GstBufferList *buffer_list;
@ -2464,9 +2563,13 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
gboolean is_rtp;
GPtrArray *transports;
if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
if (!priv->have_buffer[idx])
return;
ensure_cached_transports (stream);
if (!any_transport_ready (stream, idx))
return;
}
priv->have_buffer[idx] = FALSE;
@ -2493,52 +2596,38 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
is_rtp = (idx == 0);
if (priv->tr_cache_cookie != priv->transports_cookie) {
clear_tr_cache (priv);
priv->tr_cache =
g_ptr_array_new_full (priv->n_tcp_transports, g_object_unref);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
continue;
g_ptr_array_add (priv->tr_cache, g_object_ref (tr));
}
priv->tr_cache_cookie = priv->transports_cookie;
}
transports = priv->tr_cache;
g_ptr_array_ref (transports);
priv->n_outstanding += n_messages * priv->n_tcp_transports;
g_mutex_unlock (&priv->lock);
if (transports) {
for (gint index = 0; index < transports->len; index++) {
GstRTSPStreamTransport *tr =
(GstRTSPStreamTransport *) g_ptr_array_index (transports, index);
gboolean send_ret = TRUE;
gint index;
if (is_rtp) {
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtp (tr, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtp_list (tr, buffer_list);
for (index = 0; index < transports->len; index++) {
GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
if (gst_rtsp_stream_transport_backlog_is_empty (tr)
&& !gst_rtsp_stream_transport_check_back_pressure (tr, idx)) {
push_data (stream, tr, buffer, buffer_list, is_rtp);
} else {
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtcp (tr, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtcp_list (tr, buffer_list);
}
GstBuffer *buf_ref = NULL;
GstBufferList *buflist_ref = NULL;
if (!send_ret) {
/* remove transport on send error */
g_mutex_lock (&priv->lock);
priv->n_outstanding -= n_messages;
update_transport (stream, tr, FALSE);
if (buffer)
buf_ref = gst_buffer_ref (buffer);
if (buffer_list)
buflist_ref = gst_buffer_list_ref (buffer_list);
if (!gst_rtsp_stream_transport_backlog_push (tr,
buf_ref, buflist_ref, is_rtp)) {
GST_WARNING_OBJECT (stream,
"Dropping slow transport %" GST_PTR_FORMAT, tr);
update_transport (stream, tr, FALSE);
}
g_mutex_unlock (&priv->lock);
}
}
@ -2558,6 +2647,7 @@ send_thread_main (gpointer data, gpointer user_data)
gint i;
g_mutex_lock (&priv->lock);
do {
idx = -1;
/* iterate from 1 and down, so we prioritize RTCP over RTP */
@ -2569,9 +2659,9 @@ send_thread_main (gpointer data, gpointer user_data)
}
}
if (idx != -1 && priv->n_outstanding == 0)
if (idx != -1)
send_tcp_message (stream, idx);
} while (idx != -1 && priv->n_outstanding == 0);
} while (idx != -1);
GST_DEBUG_OBJECT (stream, "send thread done");
g_mutex_unlock (&priv->lock);
@ -2596,10 +2686,7 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
for (i = 0; i < 2; i++)
if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
priv->have_buffer[i] = TRUE;
if (priv->n_outstanding == 0) {
/* send message */
idx = i;
}
idx = i;
break;
}
@ -4459,25 +4546,36 @@ mcast_error:
}
static void
on_message_sent (gpointer user_data)
on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data)
{
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
gint idx = -1;
gint i;
GST_DEBUG_OBJECT (stream, "message send complete");
g_mutex_lock (&priv->lock);
g_assert (priv->n_outstanding >= 0);
if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) {
GstBuffer *buffer;
GstBufferList *buffer_list;
gboolean is_rtp;
gboolean popped;
if (priv->n_outstanding == 0)
goto no_outstanding;
popped =
gst_rtsp_stream_transport_backlog_pop (trans, &buffer, &buffer_list,
&is_rtp);
priv->n_outstanding--;
if (priv->n_outstanding == 0) {
gint i;
g_assert (popped == TRUE);
g_mutex_unlock (&priv->lock);
push_data (stream, trans, buffer, buffer_list, is_rtp);
gst_clear_buffer (&buffer);
gst_clear_buffer_list (&buffer_list);
} else {
/* iterate from 1 and down, so we prioritize RTCP over RTP */
for (i = 1; i >= 0; i--) {
if (priv->have_buffer[i]) {
@ -4486,27 +4584,17 @@ on_message_sent (gpointer user_data)
break;
}
}
}
if (idx != -1) {
gint dummy;
if (idx != -1) {
gint dummy;
if (priv->send_pool) {
GST_DEBUG_OBJECT (stream, "start thread");
g_thread_pool_push (priv->send_pool, &dummy, NULL);
if (priv->send_pool) {
GST_DEBUG_OBJECT (stream, "start thread");
g_thread_pool_push (priv->send_pool, &dummy, NULL);
}
}
}
g_mutex_unlock (&priv->lock);
return;
/* ERRORS */
no_outstanding:
{
GST_INFO ("no outstanding messages");
g_mutex_unlock (&priv->lock);
return;
}
}
@ -5928,8 +6016,8 @@ gst_rtsp_stream_set_rate_control (GstRTSPStream * stream, gboolean enabled)
if (stream->priv->appsink[0])
g_object_set (stream->priv->appsink[0], "sync", enabled, NULL);
if (stream->priv->payloader
&& g_object_class_find_property (G_OBJECT_GET_CLASS (stream->priv->
payloader), "onvif-no-rate-control"))
&& g_object_class_find_property (G_OBJECT_GET_CLASS (stream->
priv->payloader), "onvif-no-rate-control"))
g_object_set (stream->priv->payloader, "onvif-no-rate-control", !enabled,
NULL);
if (stream->priv->session) {