Limit queued TCP data messages to one per stream

Before, the watch backlog size in GstRTSPClient was changed
dynamically between unlimited and a fixed size, trying to avoid both
unlimited memory usage and deadlocks while waiting for place in the
queue. (Some of the deadlocks were described in a long comment in
handle_request().)

In the previous commit, we changed to a fixed backlog size of 100.
This is possible, because we now handle RTP/RTCP data messages differently
from RTSP request/response messages.

The data messages are messages tunneled over TCP. We allow at most one
queued data message per stream in GstRTSPClient at a time, and
successfully sent data messages are acked by sending a "message-sent"
callback from the GstStreamTransport. Until that ack comes, the
GstRTSPStream does not call pull_sample() on its appsink, and
therefore the streaming thread in the pipeline will not be blocked
inside GstRTSPClient, waiting for a place in the queue.

pull_sample() is called when we have both an ack and a "new-sample"
signal from the appsink. Then, we know there is a buffer to write.

RTSP request/response messages are not acked in the same way as data
messages. The rest of the 100 places in the queue are used for
them. If the queue becomes full of request/response messages, we
return an error and close the connection to the client.

Change-Id: I275310bc90a219ceb2473c098261acc78be84c97
This commit is contained in:
David Svensson Fors 2018-06-28 11:22:21 +02:00 committed by Sebastian Dröge
parent 287345f6ac
commit 12169f1e84
4 changed files with 372 additions and 42 deletions

View file

@ -70,13 +70,15 @@ struct _GstRTSPClientPrivate
GstRTSPConnection *connection;
GstRTSPWatch *watch;
GMainContext *watch_context;
guint close_seq;
gchar *server_ip;
gboolean is_ipv6;
GstRTSPClientSendFunc send_func; /* protected by send_lock */
gpointer send_data; /* protected by send_lock */
GDestroyNotify send_notify; /* protected by send_lock */
/* protected by send_lock */
GstRTSPClientSendFunc send_func;
gpointer send_data;
GDestroyNotify send_notify;
guint close_seq;
GArray *data_seqs;
GstRTSPSessionPool *session_pool;
gulong session_removed_id;
@ -105,11 +107,15 @@ struct _GstRTSPClientPrivate
GstRTSPTunnelState tstate;
};
typedef struct
{
guint8 channel;
guint seq;
} DataSeq;
static GMutex tunnels_lock;
static GHashTable *tunnels; /* protected by tunnels_lock */
/* FIXME make this configurable. We don't want to do this yet because it will
* be superceeded by a cache object later */
#define WATCH_BACKLOG_SIZE 100
#define DEFAULT_SESSION_POOL NULL
@ -587,6 +593,7 @@ gst_rtsp_client_init (GstRTSPClient * client)
g_mutex_init (&priv->send_lock);
g_mutex_init (&priv->watch_lock);
priv->close_seq = 0;
priv->data_seqs = g_array_new (FALSE, FALSE, sizeof (DataSeq));
priv->drop_backlog = DEFAULT_DROP_BACKLOG;
priv->transports =
g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
@ -754,6 +761,7 @@ gst_rtsp_client_finalize (GObject * obj)
g_assert (priv->sessions == NULL);
g_assert (priv->session_removed_id == 0);
g_array_unref (priv->data_seqs);
g_hash_table_unref (priv->transports);
g_hash_table_unref (priv->pipelined_requests);
@ -1055,6 +1063,82 @@ no_prepare:
}
}
static inline DataSeq *
get_data_seq_element (GstRTSPClient * client, guint8 channel)
{
GstRTSPClientPrivate *priv = client->priv;
GArray *data_seqs = priv->data_seqs;
gint i = 0;
while (i < data_seqs->len) {
DataSeq *data_seq = &g_array_index (data_seqs, DataSeq, i);
if (data_seq->channel == channel)
return data_seq;
i++;
}
return NULL;
}
static void
add_data_seq (GstRTSPClient * client, guint8 channel)
{
GstRTSPClientPrivate *priv = client->priv;
DataSeq data_seq = {.channel = channel,.seq = 0 };
if (get_data_seq_element (client, channel) == NULL)
g_array_append_val (priv->data_seqs, data_seq);
}
static void
set_data_seq (GstRTSPClient * client, guint8 channel, guint seq)
{
DataSeq *data_seq;
data_seq = get_data_seq_element (client, channel);
g_assert_nonnull (data_seq);
data_seq->seq = seq;
}
static guint
get_data_seq (GstRTSPClient * client, guint8 channel)
{
DataSeq *data_seq;
data_seq = get_data_seq_element (client, channel);
g_assert_nonnull (data_seq);
return data_seq->seq;
}
static gboolean
get_data_channel (GstRTSPClient * client, guint seq, guint8 * channel)
{
GstRTSPClientPrivate *priv = client->priv;
GArray *data_seqs = priv->data_seqs;
gint i = 0;
while (i < data_seqs->len) {
DataSeq *data_seq = &g_array_index (data_seqs, DataSeq, i);
if (data_seq->seq == seq) {
*channel = data_seq->channel;
return TRUE;
}
i++;
}
return FALSE;
}
static gboolean
do_close (gpointer user_data)
{
GstRTSPClient *client = user_data;
gst_rtsp_client_close (client);
return G_SOURCE_REMOVE;
}
static gboolean
do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
{
@ -1074,6 +1158,11 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
g_mutex_lock (&priv->send_lock);
if (get_data_seq (client, channel) != 0) {
GST_WARNING ("already a queued data message for channel %d", channel);
g_mutex_unlock (&priv->send_lock);
return FALSE;
}
if (priv->send_func)
ret = priv->send_func (client, &message, FALSE, priv->send_data);
g_mutex_unlock (&priv->send_lock);
@ -1083,6 +1172,16 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
gst_rtsp_message_unset (&message);
if (!ret) {
GSource *idle_src;
/* close in watch context */
idle_src = g_idle_source_new ();
g_source_set_callback (idle_src, do_close, client, NULL);
g_source_attach (idle_src, priv->watch_context);
g_source_unref (idle_src);
}
return ret;
}
@ -2355,6 +2454,8 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx)
g_hash_table_insert (priv->transports,
GINT_TO_POINTER (ct->interleaved.max), trans);
g_object_ref (trans);
add_data_seq (client, ct->interleaved.min);
add_data_seq (client, ct->interleaved.max);
}
/* create and serialize the server transport */
@ -4058,33 +4159,48 @@ do_send_message (GstRTSPClient * client, GstRTSPMessage * message,
gboolean close, gpointer user_data)
{
GstRTSPClientPrivate *priv = client->priv;
guint id = 0;
GstRTSPResult ret;
GTimeVal time;
time.tv_sec = 1;
time.tv_usec = 0;
/* send the message */
ret = gst_rtsp_watch_send_message (priv->watch, message, &id);
if (ret != GST_RTSP_OK)
goto error;
do {
/* send the response and store the seq number so we can wait until it's
* written to the client to close the connection */
ret =
gst_rtsp_watch_send_message (priv->watch, message,
close ? &priv->close_seq : NULL);
if (ret == GST_RTSP_OK)
break;
/* if close flag is set, store the seq number so we can wait until it's
* written to the client to close the connection */
if (close)
priv->close_seq = id;
if (ret != GST_RTSP_ENOMEM)
if (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA) {
guint8 channel = 0;
GstRTSPResult r;
r = gst_rtsp_message_parse_data (message, &channel);
if (r != GST_RTSP_OK) {
ret = r;
goto error;
}
/* drop backlog */
if (priv->drop_backlog)
break;
/* check if the message has been queued for transmission in watch */
if (id) {
/* store the seq number so we can wait until it has been sent */
GST_DEBUG_OBJECT (client, "wait for message %d, channel %d", id, channel);
set_data_seq (client, channel, id);
} else {
GstRTSPStreamTransport *trans;
/* queue was full, wait for more space */
GST_DEBUG_OBJECT (client, "waiting for backlog");
ret = gst_rtsp_watch_wait_backlog (priv->watch, &time);
GST_DEBUG_OBJECT (client, "Resend due to backlog full");
} while (ret != GST_RTSP_EINTR);
trans =
g_hash_table_lookup (priv->transports,
GINT_TO_POINTER ((gint) channel));
if (trans) {
GST_DEBUG_OBJECT (client, "emit 'message-sent' signal");
g_mutex_unlock (&priv->send_lock);
gst_rtsp_stream_transport_message_sent (trans);
g_mutex_lock (&priv->send_lock);
}
}
}
return ret == GST_RTSP_OK;
@ -4092,7 +4208,7 @@ do_send_message (GstRTSPClient * client, GstRTSPMessage * message,
error:
{
GST_DEBUG_OBJECT (client, "got error %d", ret);
return ret == GST_RTSP_OK;
return FALSE;
}
}
@ -4108,13 +4224,33 @@ message_sent (GstRTSPWatch * watch, guint cseq, gpointer user_data)
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
GstRTSPClientPrivate *priv = client->priv;
GstRTSPStreamTransport *trans = NULL;
guint8 channel = 0;
gboolean close = FALSE;
g_mutex_lock (&priv->send_lock);
if (get_data_channel (client, cseq, &channel)) {
trans = g_hash_table_lookup (priv->transports, GINT_TO_POINTER (channel));
set_data_seq (client, channel, 0);
}
if (priv->close_seq && priv->close_seq == cseq) {
GST_INFO ("client %p: send close message", client);
close = TRUE;
priv->close_seq = 0;
gst_rtsp_client_close (client);
}
g_mutex_unlock (&priv->send_lock);
if (trans) {
GST_DEBUG_OBJECT (client, "emit 'message-sent' signal");
gst_rtsp_stream_transport_message_sent (trans);
}
if (close)
gst_rtsp_client_close (client);
return GST_RTSP_OK;
}

View file

@ -35,6 +35,9 @@
* is received from the client. It will also call
* gst_rtsp_stream_transport_set_timed_out() when a receiver has timed out.
*
* A #GstRTSPClient will call gst_rtsp_stream_transport_message_sent() when it
* has sent a data message for the transport.
*
* Last reviewed on 2013-07-16 (1.0.0)
*/
@ -58,6 +61,10 @@ struct _GstRTSPStreamTransportPrivate
gboolean active;
gboolean timed_out;
GstRTSPMessageSentFunc message_sent;
gpointer ms_user_data;
GDestroyNotify ms_notify;
GstRTSPTransport *transport;
GstRTSPUrl *url;
@ -109,6 +116,7 @@ gst_rtsp_stream_transport_finalize (GObject * obj)
/* remove callbacks now */
gst_rtsp_stream_transport_set_callbacks (trans, NULL, NULL, NULL, NULL);
gst_rtsp_stream_transport_set_keepalive (trans, NULL, NULL, NULL);
gst_rtsp_stream_transport_set_message_sent (trans, NULL, NULL, NULL);
if (priv->stream)
g_object_unref (priv->stream);
@ -223,6 +231,33 @@ gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamTransport * trans,
priv->ka_notify = notify;
}
/**
* gst_rtsp_stream_transport_set_message_sent:
* @trans: a #GstRTSPStreamTransport
* @message_sent: (scope notified): a callback called when a message has been sent
* @user_data: (closure): user data passed to callback
* @notify: (allow-none): called with the user_data when no longer needed
*
* Install a callback that will be called when a message has been sent on @trans.
*/
void
gst_rtsp_stream_transport_set_message_sent (GstRTSPStreamTransport * trans,
GstRTSPMessageSentFunc message_sent, gpointer user_data,
GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->message_sent = message_sent;
if (priv->ms_notify)
priv->ms_notify (priv->ms_user_data);
priv->ms_user_data = user_data;
priv->ms_notify = notify;
}
/**
* gst_rtsp_stream_transport_set_transport:
@ -510,6 +545,23 @@ gst_rtsp_stream_transport_keep_alive (GstRTSPStreamTransport * trans)
priv->keep_alive (priv->ka_user_data);
}
/**
* gst_rtsp_stream_transport_message_sent:
* @trans: a #GstRTSPStreamTransport
*
* Signal the installed message_sent callback for @trans.
*/
void
gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport * trans)
{
GstRTSPStreamTransportPrivate *priv;
priv = trans->priv;
if (priv->message_sent)
priv->message_sent (priv->ms_user_data);
}
/**
* gst_rtsp_stream_transport_recv_data:
* @trans: a #GstRTSPStreamTransport

View file

@ -65,6 +65,15 @@ typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpo
*/
typedef void (*GstRTSPKeepAliveFunc) (gpointer user_data);
/**
* GstRTSPMessageSentFunc:
* @user_data: user data
*
* Function registered with gst_rtsp_stream_transport_set_message_sent()
* and called when a message has been sent on the transport.
*/
typedef void (*GstRTSPMessageSentFunc) (gpointer user_data);
/**
* GstRTSPStreamTransport:
* @parent: parent instance
@ -128,9 +137,18 @@ void gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamT
gpointer user_data,
GDestroyNotify notify);
GST_RTSP_SERVER_API
void gst_rtsp_stream_transport_set_message_sent (GstRTSPStreamTransport *trans,
GstRTSPMessageSentFunc message_sent,
gpointer user_data,
GDestroyNotify notify);
GST_RTSP_SERVER_API
void gst_rtsp_stream_transport_keep_alive (GstRTSPStreamTransport *trans);
GST_RTSP_SERVER_API
void gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport *trans);
GST_RTSP_SERVER_API
gboolean gst_rtsp_stream_transport_set_active (GstRTSPStreamTransport *trans,
gboolean active);

View file

@ -158,6 +158,9 @@ struct _GstRTSPStreamPrivate
GList *tr_cache_rtcp;
guint tr_cache_cookie_rtp;
guint tr_cache_cookie_rtcp;
guint n_tcp_transports;
gboolean have_buffer[2];
guint n_outstanding;
gint dscp_qos;
@ -208,6 +211,10 @@ static void gst_rtsp_stream_set_property (GObject * object, guint propid,
static void gst_rtsp_stream_finalize (GObject * obj);
static gboolean
update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
gboolean add);
static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
@ -2115,32 +2122,52 @@ clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
}
}
static GstFlowReturn
handle_new_sample (GstAppSink * sink, gpointer user_data)
static void
send_tcp_message (GstRTSPStream * stream, gint idx)
{
GstRTSPStreamPrivate *priv;
GstRTSPStreamPrivate *priv = stream->priv;
GstAppSink *sink;
GList *walk;
GstSample *sample;
GstBuffer *buffer;
GstRTSPStream *stream;
gboolean is_rtp;
sample = gst_app_sink_pull_sample (sink);
if (!sample)
return GST_FLOW_OK;
g_mutex_lock (&priv->lock);
if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
g_mutex_unlock (&priv->lock);
return;
}
priv->have_buffer[idx] = FALSE;
if (priv->appsink[idx] == NULL) {
/* session expired */
g_mutex_unlock (&priv->lock);
return;
}
sink = GST_APP_SINK (priv->appsink[idx]);
sample = gst_app_sink_pull_sample (sink);
if (!sample) {
g_mutex_unlock (&priv->lock);
return;
}
stream = (GstRTSPStream *) user_data;
priv = stream->priv;
buffer = gst_sample_get_buffer (sample);
is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
is_rtp = (idx == 0);
g_mutex_lock (&priv->lock);
if (is_rtp) {
if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
clear_tr_cache (priv, is_rtp);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
continue;
priv->tr_cache_rtp =
g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
}
@ -2151,26 +2178,72 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
clear_tr_cache (priv, is_rtp);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
continue;
priv->tr_cache_rtcp =
g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
}
priv->tr_cache_cookie_rtcp = priv->transports_cookie;
}
}
priv->n_outstanding += priv->n_tcp_transports;
g_mutex_unlock (&priv->lock);
if (is_rtp) {
for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
gst_rtsp_stream_transport_send_rtp (tr, buffer);
if (!gst_rtsp_stream_transport_send_rtp (tr, buffer)) {
/* remove transport on send error */
g_mutex_lock (&priv->lock);
priv->n_outstanding--;
update_transport (stream, tr, FALSE);
g_mutex_unlock (&priv->lock);
}
}
} else {
for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
gst_rtsp_stream_transport_send_rtcp (tr, buffer);
if (!gst_rtsp_stream_transport_send_rtcp (tr, buffer)) {
/* remove transport on send error */
g_mutex_lock (&priv->lock);
priv->n_outstanding--;
update_transport (stream, tr, FALSE);
g_mutex_unlock (&priv->lock);
}
}
}
gst_sample_unref (sample);
}
static GstFlowReturn
handle_new_sample (GstAppSink * sink, gpointer user_data)
{
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
int i;
int idx = -1;
g_mutex_lock (&priv->lock);
for (i = 0; i < 2; i++)
if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
priv->have_buffer[i] = TRUE;
if (priv->n_outstanding == 0) {
/* send message */
idx = i;
}
break;
}
g_mutex_unlock (&priv->lock);
if (idx != -1)
send_tcp_message (stream, idx);
return GST_FLOW_OK;
}
@ -2971,7 +3044,8 @@ create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
} else if (is_tcp && !priv->appsink[i]) {
/* make appsink */
priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
g_object_set (priv->appsink[i], "emit-signals", FALSE, "max-buffers", 1,
NULL);
/* we need to set sync and preroll to FALSE for the sink to avoid
* deadlock. This is only needed for sink sending RTCP data. */
@ -3913,9 +3987,11 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
if (add) {
GST_INFO ("adding TCP %s", tr->destination);
priv->transports = g_list_prepend (priv->transports, trans);
priv->n_tcp_transports++;
} else {
GST_INFO ("removing TCP %s", tr->destination);
priv->transports = g_list_remove (priv->transports, trans);
priv->n_tcp_transports--;
}
priv->transports_cookie++;
break;
@ -3936,6 +4012,51 @@ mcast_error:
}
}
static void
on_message_sent (gpointer user_data)
{
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
gint idx = -1;
GST_DEBUG_OBJECT (stream, "message send complete");
g_mutex_lock (&priv->lock);
g_assert (priv->n_outstanding >= 0);
if (priv->n_outstanding == 0)
goto no_outstanding;
priv->n_outstanding--;
if (priv->n_outstanding == 0) {
gint i;
/* iterate from 1 and down, so we prioritize RTCP over RTP */
for (i = 1; i >= 0; i--) {
if (priv->have_buffer[i]) {
/* send message */
idx = i;
break;
}
}
}
g_mutex_unlock (&priv->lock);
if (idx != -1)
send_tcp_message (stream, idx);
return;
/* ERRORS */
no_outstanding:
{
GST_INFO ("no outstanding messages");
g_mutex_unlock (&priv->lock);
return;
}
}
/**
* gst_rtsp_stream_add_transport:
@ -3965,6 +4086,9 @@ gst_rtsp_stream_add_transport (GstRTSPStream * stream,
g_mutex_lock (&priv->lock);
res = update_transport (stream, trans, TRUE);
if (res)
gst_rtsp_stream_transport_set_message_sent (trans, on_message_sent, stream,
NULL);
g_mutex_unlock (&priv->lock);
return res;