rtsp-server: Add support for buffer lists

This adds new functions for passing buffer lists through the different
layers without breaking API/ABI, and enables the appsink to actually
provide buffer lists.

This should already reduce CPU usage and potentially context switches a
bit by passing a whole buffer list from the appsink instead of
individual buffers. As a next step it would be necessary to
  a) Add support for a vector of data for the GstRTSPMessage body
  b) Add support for sending multiple messages at once to the
    GstRTSPWatch and let it be handled internally
  c) Adding API to GOutputStream that works like writev()

Fixes https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/issues/29
This commit is contained in:
Sebastian Dröge 2018-06-27 12:17:07 +02:00
parent 621e140a8e
commit d708f9736b
7 changed files with 250 additions and 11 deletions

View file

@ -680,6 +680,8 @@ gst_rtsp_stream_transport_get_rtpinfo
GstRTSPSendFunc
gst_rtsp_stream_transport_set_callbacks
GstRTSPSendListFunc
gst_rtsp_stream_transport_set_list_callbacks
GstRTSPKeepAliveFunc
gst_rtsp_stream_transport_set_keepalive
@ -690,8 +692,10 @@ gst_rtsp_stream_transport_set_active
gst_rtsp_stream_transport_set_timed_out
gst_rtsp_stream_transport_is_timed_out
gst_rtsp_stream_transport_send_rtcp
gst_rtsp_stream_transport_send_rtp
gst_rtsp_stream_transport_send_rtp_list
gst_rtsp_stream_transport_send_rtcp
gst_rtsp_stream_transport_send_rtcp_list
<SUBSECTION Standard>
GST_RTSP_STREAM_TRANSPORT_CAST

View file

@ -1198,6 +1198,27 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
return ret;
}
static gboolean
do_send_data_list (GstBufferList * buffer_list, guint8 channel,
GstRTSPClient * client)
{
gboolean ret = TRUE;
guint i, n = gst_buffer_list_length (buffer_list);
/* TODO: Needs support for a) queueing up multiple messages on the
* GstRTSPWatch in do_send_data() above and b) for one message having a body
* consisting of multiple parts here */
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
ret = do_send_data (buffer, channel, client);
if (!ret)
break;
}
return ret;
}
/**
* gst_rtsp_client_close:
* @client: a #GstRTSPClient
@ -2527,6 +2548,9 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx)
gst_rtsp_stream_transport_set_callbacks (trans,
(GstRTSPSendFunc) do_send_data,
(GstRTSPSendFunc) do_send_data, client, NULL);
gst_rtsp_stream_transport_set_list_callbacks (trans,
(GstRTSPSendListFunc) do_send_data_list,
(GstRTSPSendListFunc) do_send_data_list, client, NULL);
g_hash_table_insert (priv->transports,
GINT_TO_POINTER (ct->interleaved.min), trans);
@ -4628,7 +4652,8 @@ gst_rtsp_client_attach (GstRTSPClient * client, GMainContext * context)
/* create watch for the connection and attach */
priv->watch = gst_rtsp_watch_new (priv->connection, &watch_funcs,
g_object_ref (client), (GDestroyNotify) client_watch_notify);
gst_rtsp_client_set_send_func (client, do_send_message, priv->watch,
gst_rtsp_client_set_send_func (client, do_send_message,
g_source_ref ((GSource *) priv->watch),
(GDestroyNotify) gst_rtsp_watch_unref);
gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE);

View file

@ -2260,8 +2260,9 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
}
g_object_set (appsrc, "block", TRUE, "format", GST_FORMAT_TIME, "is-live",
TRUE, NULL);
g_object_set (appsink, "sync", FALSE, "async", FALSE, NULL);
TRUE, "emit-signals", FALSE, NULL);
g_object_set (appsink, "sync", FALSE, "async", FALSE, "emit-signals",
FALSE, "buffer-list", TRUE, NULL);
data = g_new0 (AppSinkSrcData, 1);
data->appsink = appsink;

View file

@ -58,6 +58,11 @@ struct _GstRTSPStreamTransportPrivate
gpointer user_data;
GDestroyNotify notify;
GstRTSPSendListFunc send_rtp_list;
GstRTSPSendListFunc send_rtcp_list;
gpointer list_user_data;
GDestroyNotify list_notify;
GstRTSPKeepAliveFunc keep_alive;
gpointer ka_user_data;
GDestroyNotify ka_notify;
@ -207,6 +212,38 @@ gst_rtsp_stream_transport_set_callbacks (GstRTSPStreamTransport * trans,
priv->notify = notify;
}
/**
* gst_rtsp_stream_transport_set_list_callbacks:
* @trans: a #GstRTSPStreamTransport
* @send_rtp_list: (scope notified): a callback called when RTP should be sent
* @send_rtcp_list: (scope notified): a callback called when RTCP should be sent
* @user_data: (closure): user data passed to callbacks
* @notify: (allow-none): called with the user_data when no longer needed.
*
* Install callbacks that will be called when data for a stream should be sent
* to a client. This is usually used when sending RTP/RTCP over TCP.
*
* Since: 1.16
*/
void
gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport * trans,
GstRTSPSendListFunc send_rtp_list, GstRTSPSendListFunc send_rtcp_list,
gpointer user_data, GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->send_rtp_list = send_rtp_list;
priv->send_rtcp_list = send_rtcp_list;
if (priv->list_notify)
priv->list_notify (priv->list_user_data);
priv->list_user_data = user_data;
priv->list_notify = notify;
}
/**
* gst_rtsp_stream_transport_set_keepalive:
* @trans: a #GstRTSPStreamTransport
@ -531,6 +568,98 @@ gst_rtsp_stream_transport_send_rtcp (GstRTSPStreamTransport * trans,
return res;
}
/**
* gst_rtsp_stream_transport_send_rtp_list:
* @trans: a #GstRTSPStreamTransport
* @buffer_list: (transfer none): a #GstBufferList
*
* Send @buffer_list to the installed RTP callback for @trans.
*
* Returns: %TRUE on success
*
* Since: 1.16
*/
gboolean
gst_rtsp_stream_transport_send_rtp_list (GstRTSPStreamTransport * trans,
GstBufferList * buffer_list)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
priv = trans->priv;
if (priv->send_rtp_list) {
res =
priv->send_rtp_list (buffer_list, priv->transport->interleaved.min,
priv->list_user_data);
} else if (priv->send_rtp) {
guint n = gst_buffer_list_length (buffer_list), i;
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
res =
priv->send_rtp (buffer, priv->transport->interleaved.min,
priv->user_data);
if (!res)
break;
}
}
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_send_rtcp_list:
* @trans: a #GstRTSPStreamTransport
* @buffer_list: (transfer none): a #GstBuffer
*
* Send @buffer_list to the installed RTCP callback for @trans.
*
* Returns: %TRUE on success
*
* Since: 1.16
*/
gboolean
gst_rtsp_stream_transport_send_rtcp_list (GstRTSPStreamTransport * trans,
GstBufferList * buffer_list)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
priv = trans->priv;
if (priv->send_rtcp_list) {
res =
priv->send_rtcp_list (buffer_list, priv->transport->interleaved.max,
priv->list_user_data);
} else if (priv->send_rtcp) {
guint n = gst_buffer_list_length (buffer_list), i;
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
res =
priv->send_rtcp (buffer, priv->transport->interleaved.max,
priv->user_data);
if (!res)
break;
}
}
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_keep_alive:
* @trans: a #GstRTSPStreamTransport

View file

@ -56,6 +56,22 @@ typedef struct _GstRTSPStreamTransportPrivate GstRTSPStreamTransportPrivate;
* Returns: %TRUE on success
*/
typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer user_data);
/**
* GstRTSPSendListFunc:
* @buffer_list: a #GstBufferList
* @channel: a channel
* @user_data: user data
*
* Function registered with gst_rtsp_stream_transport_set_callbacks() and
* called when @buffer_list must be sent on @channel.
*
* Returns: %TRUE on success
*
* Since: 1.16
*/
typedef gboolean (*GstRTSPSendListFunc) (GstBufferList *buffer_list, guint8 channel, gpointer user_data);
/**
* GstRTSPKeepAliveFunc:
* @user_data: user data
@ -131,6 +147,13 @@ void gst_rtsp_stream_transport_set_callbacks (GstRTSPStreamT
gpointer user_data,
GDestroyNotify notify);
GST_RTSP_SERVER_API
void gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport *trans,
GstRTSPSendListFunc send_rtp_list,
GstRTSPSendListFunc send_rtcp_list,
gpointer user_data,
GDestroyNotify notify);
GST_RTSP_SERVER_API
void gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamTransport *trans,
GstRTSPKeepAliveFunc keep_alive,
@ -170,6 +193,14 @@ GST_RTSP_SERVER_API
gboolean gst_rtsp_stream_transport_send_rtcp (GstRTSPStreamTransport *trans,
GstBuffer *buffer);
GST_RTSP_SERVER_API
gboolean gst_rtsp_stream_transport_send_rtp_list (GstRTSPStreamTransport *trans,
GstBufferList *buffer_list);
GST_RTSP_SERVER_API
gboolean gst_rtsp_stream_transport_send_rtcp_list(GstRTSPStreamTransport *trans,
GstBufferList *buffer_list);
GST_RTSP_SERVER_API
GstFlowReturn gst_rtsp_stream_transport_recv_data (GstRTSPStreamTransport *trans,
guint channel, GstBuffer *buffer);

View file

@ -2456,6 +2456,8 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
GList *walk;
GstSample *sample;
GstBuffer *buffer;
GstBufferList *buffer_list;
guint n_messages = 0;
gboolean is_rtp;
if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
@ -2476,6 +2478,14 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
}
buffer = gst_sample_get_buffer (sample);
buffer_list = gst_sample_get_buffer_list (sample);
/* We will get one message-sent notification per message,
* i.e. per buffer that is actually sent out */
if (buffer)
n_messages += 1;
if (buffer_list)
n_messages += gst_buffer_list_length (buffer_list);
is_rtp = (idx == 0);
@ -2513,17 +2523,24 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
}
}
priv->n_outstanding += priv->n_tcp_transports;
priv->n_outstanding += n_messages * priv->n_tcp_transports;
g_mutex_unlock (&priv->lock);
if (is_rtp) {
for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
if (!gst_rtsp_stream_transport_send_rtp (tr, buffer)) {
gboolean send_ret = TRUE;
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtp (tr, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtp_list (tr, buffer_list);
if (!send_ret) {
/* remove transport on send error */
g_mutex_lock (&priv->lock);
priv->n_outstanding--;
priv->n_outstanding -= n_messages;
update_transport (stream, tr, FALSE);
g_mutex_unlock (&priv->lock);
}
@ -2531,10 +2548,17 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
} else {
for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
if (!gst_rtsp_stream_transport_send_rtcp (tr, buffer)) {
gboolean send_ret = TRUE;
if (buffer)
send_ret = gst_rtsp_stream_transport_send_rtcp (tr, buffer);
if (buffer_list)
send_ret = gst_rtsp_stream_transport_send_rtcp_list (tr, buffer_list);
if (!send_ret) {
/* remove transport on send error */
g_mutex_lock (&priv->lock);
priv->n_outstanding--;
priv->n_outstanding -= n_messages;
update_transport (stream, tr, FALSE);
g_mutex_unlock (&priv->lock);
}
@ -3369,8 +3393,8 @@ create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
} else if (is_tcp && !priv->appsink[i]) {
/* make appsink */
priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
g_object_set (priv->appsink[i], "emit-signals", FALSE, "max-buffers", 1,
NULL);
g_object_set (priv->appsink[i], "emit-signals", FALSE, "buffer-list",
TRUE, "max-buffers", 1, NULL);
/* we need to set sync and preroll to FALSE for the sink to avoid
* deadlock. This is only needed for sink sending RTCP data. */

View file

@ -3850,6 +3850,27 @@ do_send_data (GstBuffer * buffer, guint8 channel,
return res == GST_RTSP_OK;
}
static gboolean
do_send_data_list (GstBufferList * buffer_list, guint8 channel,
GstRTSPStreamContext * context)
{
gboolean ret = TRUE;
guint i, n = gst_buffer_list_length (buffer_list);
/* TODO: Needs support for a) queueing up multiple messages on the
* GstRTSPWatch in do_send_data() above and b) for one message having a body
* consisting of multiple parts here */
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
ret = do_send_data (buffer, channel, context);
if (!ret)
break;
}
return ret;
}
static GstRTSPResult
gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
{
@ -4147,6 +4168,10 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
gst_rtsp_stream_transport_set_callbacks (context->stream_transport,
(GstRTSPSendFunc) do_send_data,
(GstRTSPSendFunc) do_send_data, context, NULL);
gst_rtsp_stream_transport_set_list_callbacks
(context->stream_transport,
(GstRTSPSendListFunc) do_send_data_list,
(GstRTSPSendListFunc) do_send_data_list, context, NULL);
}
/* The stream_transport now owns the transport */