gstreamer/subprojects/gst-rtsp-server/gst/rtsp-server/rtsp-stream-transport.c
Bruce Liang 657cc3e6d6 gst-rtsp-server: Fix pushing backlog to client
Check back pressure of a stream transport before popping buffer from its backlog.

If the stream transport is not experiencing back pressure, the buffer can be popped from backlog and pushed to client.

Fixes:#1298

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2936>
2022-09-02 16:04:06 +00:00

1004 lines
26 KiB
C

/* GStreamer
* Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:rtsp-stream-transport
* @short_description: A media stream transport configuration
* @see_also: #GstRTSPStream, #GstRTSPSessionMedia
*
* The #GstRTSPStreamTransport configures the transport used by a
* #GstRTSPStream. It is usually manages by a #GstRTSPSessionMedia object.
*
* With gst_rtsp_stream_transport_set_callbacks(), callbacks can be configured
* to handle the RTP and RTCP packets from the stream, for example when they
* need to be sent over TCP.
*
* With gst_rtsp_stream_transport_set_active() the transports are added and
* removed from the stream.
*
* A #GstRTSPStream will call gst_rtsp_stream_transport_keep_alive() when RTCP
* is received from the client. It will also call
* gst_rtsp_stream_transport_set_timed_out() when a receiver has timed out.
*
* A #GstRTSPClient will call gst_rtsp_stream_transport_message_sent() when it
* has sent a data message for the transport.
*
* Last reviewed on 2013-07-16 (1.0.0)
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>
#include <stdlib.h>
#include "rtsp-stream-transport.h"
#include "rtsp-server-internal.h"
struct _GstRTSPStreamTransportPrivate
{
GstRTSPStream *stream;
GstRTSPSendFunc send_rtp;
GstRTSPSendFunc send_rtcp;
gpointer user_data;
GDestroyNotify notify;
GstRTSPSendListFunc send_rtp_list;
GstRTSPSendListFunc send_rtcp_list;
gpointer list_user_data;
GDestroyNotify list_notify;
GstRTSPBackPressureFunc back_pressure_func;
gpointer back_pressure_func_data;
GDestroyNotify back_pressure_func_notify;
GstRTSPKeepAliveFunc keep_alive;
gpointer ka_user_data;
GDestroyNotify ka_notify;
gboolean timed_out;
GstRTSPMessageSentFunc message_sent;
gpointer ms_user_data;
GDestroyNotify ms_notify;
GstRTSPMessageSentFuncFull message_sent_full;
gpointer msf_user_data;
GDestroyNotify msf_notify;
GstRTSPTransport *transport;
GstRTSPUrl *url;
GObject *rtpsource;
/* TCP backlog */
GstClockTime first_rtp_timestamp;
GstQueueArray *items;
GRecMutex backlog_lock;
};
#define MAX_BACKLOG_DURATION (10 * GST_SECOND)
#define MAX_BACKLOG_SIZE 100
typedef struct
{
GstBuffer *buffer;
GstBufferList *buffer_list;
gboolean is_rtp;
} BackLogItem;
enum
{
PROP_0,
PROP_LAST
};
GST_DEBUG_CATEGORY_STATIC (rtsp_stream_transport_debug);
#define GST_CAT_DEFAULT rtsp_stream_transport_debug
static void gst_rtsp_stream_transport_finalize (GObject * obj);
G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStreamTransport, gst_rtsp_stream_transport,
G_TYPE_OBJECT);
static void
gst_rtsp_stream_transport_class_init (GstRTSPStreamTransportClass * klass)
{
GObjectClass *gobject_class;
gobject_class = G_OBJECT_CLASS (klass);
gobject_class->finalize = gst_rtsp_stream_transport_finalize;
GST_DEBUG_CATEGORY_INIT (rtsp_stream_transport_debug, "rtspmediatransport",
0, "GstRTSPStreamTransport");
}
static void
clear_backlog_item (BackLogItem * item)
{
gst_clear_buffer (&item->buffer);
gst_clear_buffer_list (&item->buffer_list);
}
static void
gst_rtsp_stream_transport_init (GstRTSPStreamTransport * trans)
{
trans->priv = gst_rtsp_stream_transport_get_instance_private (trans);
trans->priv->items = gst_queue_array_new_for_struct (sizeof (BackLogItem), 0);
trans->priv->first_rtp_timestamp = GST_CLOCK_TIME_NONE;
gst_queue_array_set_clear_func (trans->priv->items,
(GDestroyNotify) clear_backlog_item);
g_rec_mutex_init (&trans->priv->backlog_lock);
}
static void
gst_rtsp_stream_transport_finalize (GObject * obj)
{
GstRTSPStreamTransportPrivate *priv;
GstRTSPStreamTransport *trans;
trans = GST_RTSP_STREAM_TRANSPORT (obj);
priv = trans->priv;
/* remove callbacks now */
gst_rtsp_stream_transport_set_callbacks (trans, NULL, NULL, NULL, NULL);
gst_rtsp_stream_transport_set_keepalive (trans, NULL, NULL, NULL);
gst_rtsp_stream_transport_set_message_sent (trans, NULL, NULL, NULL);
if (priv->stream)
g_object_unref (priv->stream);
if (priv->transport)
gst_rtsp_transport_free (priv->transport);
if (priv->url)
gst_rtsp_url_free (priv->url);
gst_queue_array_free (priv->items);
g_rec_mutex_clear (&priv->backlog_lock);
G_OBJECT_CLASS (gst_rtsp_stream_transport_parent_class)->finalize (obj);
}
/**
* gst_rtsp_stream_transport_new:
* @stream: a #GstRTSPStream
* @tr: (transfer full): a GstRTSPTransport
*
* Create a new #GstRTSPStreamTransport that can be used to manage
* @stream with transport @tr.
*
* Returns: (transfer full): a new #GstRTSPStreamTransport
*/
GstRTSPStreamTransport *
gst_rtsp_stream_transport_new (GstRTSPStream * stream, GstRTSPTransport * tr)
{
GstRTSPStreamTransportPrivate *priv;
GstRTSPStreamTransport *trans;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
g_return_val_if_fail (tr != NULL, NULL);
trans = g_object_new (GST_TYPE_RTSP_STREAM_TRANSPORT, NULL);
priv = trans->priv;
priv->stream = stream;
priv->stream = g_object_ref (priv->stream);
priv->transport = tr;
return trans;
}
/**
* gst_rtsp_stream_transport_get_stream:
* @trans: a #GstRTSPStreamTransport
*
* Get the #GstRTSPStream used when constructing @trans.
*
* Returns: (transfer none) (nullable): the stream used when constructing @trans.
*/
GstRTSPStream *
gst_rtsp_stream_transport_get_stream (GstRTSPStreamTransport * trans)
{
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL);
return trans->priv->stream;
}
/**
* gst_rtsp_stream_transport_set_callbacks:
* @trans: a #GstRTSPStreamTransport
* @send_rtp: (scope notified): a callback called when RTP should be sent
* @send_rtcp: (scope notified): a callback called when RTCP should be sent
* @user_data: (closure): user data passed to callbacks
* @notify: (allow-none): called with the user_data when no longer needed.
*
* Install callbacks that will be called when data for a stream should be sent
* to a client. This is usually used when sending RTP/RTCP over TCP.
*/
void
gst_rtsp_stream_transport_set_callbacks (GstRTSPStreamTransport * trans,
GstRTSPSendFunc send_rtp, GstRTSPSendFunc send_rtcp,
gpointer user_data, GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->send_rtp = send_rtp;
priv->send_rtcp = send_rtcp;
if (priv->notify)
priv->notify (priv->user_data);
priv->user_data = user_data;
priv->notify = notify;
}
/**
* gst_rtsp_stream_transport_set_list_callbacks:
* @trans: a #GstRTSPStreamTransport
* @send_rtp_list: (scope notified): a callback called when RTP should be sent
* @send_rtcp_list: (scope notified): a callback called when RTCP should be sent
* @user_data: (closure): user data passed to callbacks
* @notify: (allow-none): called with the user_data when no longer needed.
*
* Install callbacks that will be called when data for a stream should be sent
* to a client. This is usually used when sending RTP/RTCP over TCP.
*
* Since: 1.16
*/
void
gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport * trans,
GstRTSPSendListFunc send_rtp_list, GstRTSPSendListFunc send_rtcp_list,
gpointer user_data, GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->send_rtp_list = send_rtp_list;
priv->send_rtcp_list = send_rtcp_list;
if (priv->list_notify)
priv->list_notify (priv->list_user_data);
priv->list_user_data = user_data;
priv->list_notify = notify;
}
void
gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport *
trans, GstRTSPBackPressureFunc back_pressure_func, gpointer user_data,
GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->back_pressure_func = back_pressure_func;
if (priv->back_pressure_func_notify)
priv->back_pressure_func_notify (priv->back_pressure_func_data);
priv->back_pressure_func_data = user_data;
priv->back_pressure_func_notify = notify;
}
gboolean
gst_rtsp_stream_transport_check_back_pressure (GstRTSPStreamTransport * trans,
gboolean is_rtp)
{
GstRTSPStreamTransportPrivate *priv;
gboolean ret = FALSE;
guint8 channel;
priv = trans->priv;
if (is_rtp)
channel = priv->transport->interleaved.min;
else
channel = priv->transport->interleaved.max;
if (priv->back_pressure_func)
ret = priv->back_pressure_func (channel, priv->back_pressure_func_data);
return ret;
}
/**
* gst_rtsp_stream_transport_set_keepalive:
* @trans: a #GstRTSPStreamTransport
* @keep_alive: (scope notified): a callback called when the receiver is active
* @user_data: (closure): user data passed to callback
* @notify: (allow-none): called with the user_data when no longer needed.
*
* Install callbacks that will be called when RTCP packets are received from the
* receiver of @trans.
*/
void
gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamTransport * trans,
GstRTSPKeepAliveFunc keep_alive, gpointer user_data, GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->keep_alive = keep_alive;
if (priv->ka_notify)
priv->ka_notify (priv->ka_user_data);
priv->ka_user_data = user_data;
priv->ka_notify = notify;
}
/**
* gst_rtsp_stream_transport_set_message_sent:
* @trans: a #GstRTSPStreamTransport
* @message_sent: (scope notified): a callback called when a message has been sent
* @user_data: (closure): user data passed to callback
* @notify: (allow-none): called with the user_data when no longer needed
*
* Install a callback that will be called when a message has been sent on @trans.
*/
void
gst_rtsp_stream_transport_set_message_sent (GstRTSPStreamTransport * trans,
GstRTSPMessageSentFunc message_sent, gpointer user_data,
GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->message_sent = message_sent;
if (priv->ms_notify)
priv->ms_notify (priv->ms_user_data);
priv->ms_user_data = user_data;
priv->ms_notify = notify;
}
/**
* gst_rtsp_stream_transport_set_message_sent_full:
* @trans: a #GstRTSPStreamTransport
* @message_sent: (scope notified): a callback called when a message has been sent
* @user_data: (closure): user data passed to callback
* @notify: (allow-none): called with the user_data when no longer needed
*
* Install a callback that will be called when a message has been sent on @trans.
*
* Since: 1.18
*/
void
gst_rtsp_stream_transport_set_message_sent_full (GstRTSPStreamTransport * trans,
GstRTSPMessageSentFuncFull message_sent, gpointer user_data,
GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->message_sent_full = message_sent;
if (priv->msf_notify)
priv->msf_notify (priv->msf_user_data);
priv->msf_user_data = user_data;
priv->msf_notify = notify;
}
/**
* gst_rtsp_stream_transport_set_transport:
* @trans: a #GstRTSPStreamTransport
* @tr: (transfer full): a client #GstRTSPTransport
*
* Set @tr as the client transport. This function takes ownership of the
* passed @tr.
*/
void
gst_rtsp_stream_transport_set_transport (GstRTSPStreamTransport * trans,
GstRTSPTransport * tr)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
g_return_if_fail (tr != NULL);
priv = trans->priv;
/* keep track of the transports in the stream. */
if (priv->transport)
gst_rtsp_transport_free (priv->transport);
priv->transport = tr;
}
/**
* gst_rtsp_stream_transport_get_transport:
* @trans: a #GstRTSPStreamTransport
*
* Get the transport configured in @trans.
*
* Returns: (transfer none) (nullable): the transport configured in @trans. It remains
* valid for as long as @trans is valid.
*/
const GstRTSPTransport *
gst_rtsp_stream_transport_get_transport (GstRTSPStreamTransport * trans)
{
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL);
return trans->priv->transport;
}
/**
* gst_rtsp_stream_transport_set_url:
* @trans: a #GstRTSPStreamTransport
* @url: (transfer none) (nullable): a client #GstRTSPUrl
*
* Set @url as the client url.
*/
void
gst_rtsp_stream_transport_set_url (GstRTSPStreamTransport * trans,
const GstRTSPUrl * url)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
/* keep track of the transports in the stream. */
if (priv->url)
gst_rtsp_url_free (priv->url);
priv->url = (url ? gst_rtsp_url_copy (url) : NULL);
}
/**
* gst_rtsp_stream_transport_get_url:
* @trans: a #GstRTSPStreamTransport
*
* Get the url configured in @trans.
*
* Returns: (transfer none) (nullable): the url configured in @trans.
* It remains valid for as long as @trans is valid.
*/
const GstRTSPUrl *
gst_rtsp_stream_transport_get_url (GstRTSPStreamTransport * trans)
{
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL);
return trans->priv->url;
}
/**
* gst_rtsp_stream_transport_get_rtpinfo:
* @trans: a #GstRTSPStreamTransport
* @start_time: a star time
*
* Get the RTP-Info string for @trans and @start_time.
*
* Returns: (transfer full) (nullable): the RTPInfo string for @trans
* and @start_time or %NULL when the RTP-Info could not be
* determined. g_free() after usage.
*/
gchar *
gst_rtsp_stream_transport_get_rtpinfo (GstRTSPStreamTransport * trans,
GstClockTime start_time)
{
GstRTSPStreamTransportPrivate *priv;
gchar *url_str;
GString *rtpinfo;
guint rtptime, seq, clock_rate;
GstClockTime running_time = GST_CLOCK_TIME_NONE;
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL);
priv = trans->priv;
if (!gst_rtsp_stream_is_sender (priv->stream))
return NULL;
if (!gst_rtsp_stream_get_rtpinfo (priv->stream, &rtptime, &seq, &clock_rate,
&running_time))
return NULL;
GST_DEBUG ("RTP time %u, seq %u, rate %u, running-time %" GST_TIME_FORMAT,
rtptime, seq, clock_rate, GST_TIME_ARGS (running_time));
if (GST_CLOCK_TIME_IS_VALID (running_time)
&& GST_CLOCK_TIME_IS_VALID (start_time)) {
if (running_time > start_time) {
rtptime -=
gst_util_uint64_scale_int (running_time - start_time, clock_rate,
GST_SECOND);
} else {
rtptime +=
gst_util_uint64_scale_int (start_time - running_time, clock_rate,
GST_SECOND);
}
}
GST_DEBUG ("RTP time %u, for start-time %" GST_TIME_FORMAT,
rtptime, GST_TIME_ARGS (start_time));
rtpinfo = g_string_new ("");
url_str = gst_rtsp_url_get_request_uri (trans->priv->url);
g_string_append_printf (rtpinfo, "url=%s;seq=%u;rtptime=%u",
url_str, seq, rtptime);
g_free (url_str);
return g_string_free (rtpinfo, FALSE);
}
/**
* gst_rtsp_stream_transport_set_active:
* @trans: a #GstRTSPStreamTransport
* @active: new state of @trans
*
* Activate or deactivate datatransfer configured in @trans.
*
* Returns: %TRUE when the state was changed.
*/
gboolean
gst_rtsp_stream_transport_set_active (GstRTSPStreamTransport * trans,
gboolean active)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res;
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
priv = trans->priv;
if (active)
res = gst_rtsp_stream_add_transport (priv->stream, trans);
else
res = gst_rtsp_stream_remove_transport (priv->stream, trans);
return res;
}
/**
* gst_rtsp_stream_transport_set_timed_out:
* @trans: a #GstRTSPStreamTransport
* @timedout: timed out value
*
* Set the timed out state of @trans to @timedout
*/
void
gst_rtsp_stream_transport_set_timed_out (GstRTSPStreamTransport * trans,
gboolean timedout)
{
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
trans->priv->timed_out = timedout;
}
/**
* gst_rtsp_stream_transport_is_timed_out:
* @trans: a #GstRTSPStreamTransport
*
* Check if @trans is timed out.
*
* Returns: %TRUE if @trans timed out.
*/
gboolean
gst_rtsp_stream_transport_is_timed_out (GstRTSPStreamTransport * trans)
{
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
return trans->priv->timed_out;
}
/**
* gst_rtsp_stream_transport_send_rtp:
* @trans: a #GstRTSPStreamTransport
* @buffer: (transfer none): a #GstBuffer
*
* Send @buffer to the installed RTP callback for @trans.
*
* Returns: %TRUE on success
*/
gboolean
gst_rtsp_stream_transport_send_rtp (GstRTSPStreamTransport * trans,
GstBuffer * buffer)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE);
priv = trans->priv;
if (priv->send_rtp)
res =
priv->send_rtp (buffer, priv->transport->interleaved.min,
priv->user_data);
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_send_rtcp:
* @trans: a #GstRTSPStreamTransport
* @buffer: (transfer none): a #GstBuffer
*
* Send @buffer to the installed RTCP callback for @trans.
*
* Returns: %TRUE on success
*/
gboolean
gst_rtsp_stream_transport_send_rtcp (GstRTSPStreamTransport * trans,
GstBuffer * buffer)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE);
priv = trans->priv;
if (priv->send_rtcp)
res =
priv->send_rtcp (buffer, priv->transport->interleaved.max,
priv->user_data);
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_send_rtp_list:
* @trans: a #GstRTSPStreamTransport
* @buffer_list: (transfer none): a #GstBufferList
*
* Send @buffer_list to the installed RTP callback for @trans.
*
* Returns: %TRUE on success
*
* Since: 1.16
*/
gboolean
gst_rtsp_stream_transport_send_rtp_list (GstRTSPStreamTransport * trans,
GstBufferList * buffer_list)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
priv = trans->priv;
if (priv->send_rtp_list) {
res =
priv->send_rtp_list (buffer_list, priv->transport->interleaved.min,
priv->list_user_data);
} else if (priv->send_rtp) {
guint n = gst_buffer_list_length (buffer_list), i;
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
res =
priv->send_rtp (buffer, priv->transport->interleaved.min,
priv->user_data);
if (!res)
break;
}
}
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_send_rtcp_list:
* @trans: a #GstRTSPStreamTransport
* @buffer_list: (transfer none): a #GstBuffer
*
* Send @buffer_list to the installed RTCP callback for @trans.
*
* Returns: %TRUE on success
*
* Since: 1.16
*/
gboolean
gst_rtsp_stream_transport_send_rtcp_list (GstRTSPStreamTransport * trans,
GstBufferList * buffer_list)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
priv = trans->priv;
if (priv->send_rtcp_list) {
res =
priv->send_rtcp_list (buffer_list, priv->transport->interleaved.max,
priv->list_user_data);
} else if (priv->send_rtcp) {
guint n = gst_buffer_list_length (buffer_list), i;
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
res =
priv->send_rtcp (buffer, priv->transport->interleaved.max,
priv->user_data);
if (!res)
break;
}
}
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_keep_alive:
* @trans: a #GstRTSPStreamTransport
*
* Signal the installed keep_alive callback for @trans.
*/
void
gst_rtsp_stream_transport_keep_alive (GstRTSPStreamTransport * trans)
{
GstRTSPStreamTransportPrivate *priv;
priv = trans->priv;
if (priv->keep_alive)
priv->keep_alive (priv->ka_user_data);
}
/**
* gst_rtsp_stream_transport_message_sent:
* @trans: a #GstRTSPStreamTransport
*
* Signal the installed message_sent / message_sent_full callback for @trans.
*
* Since: 1.16
*/
void
gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport * trans)
{
GstRTSPStreamTransportPrivate *priv;
priv = trans->priv;
if (priv->message_sent_full)
priv->message_sent_full (trans, priv->msf_user_data);
if (priv->message_sent)
priv->message_sent (priv->ms_user_data);
}
/**
* gst_rtsp_stream_transport_recv_data:
* @trans: a #GstRTSPStreamTransport
* @channel: a channel
* @buffer: (transfer full): a #GstBuffer
*
* Receive @buffer on @channel @trans.
*
* Returns: a #GstFlowReturn. Returns GST_FLOW_NOT_LINKED when @channel is not
* configured in the transport of @trans.
*/
GstFlowReturn
gst_rtsp_stream_transport_recv_data (GstRTSPStreamTransport * trans,
guint channel, GstBuffer * buffer)
{
GstRTSPStreamTransportPrivate *priv;
const GstRTSPTransport *tr;
GstFlowReturn res;
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
priv = trans->priv;
tr = priv->transport;
if (tr->interleaved.min == channel) {
res = gst_rtsp_stream_recv_rtp (priv->stream, buffer);
} else if (tr->interleaved.max == channel) {
res = gst_rtsp_stream_recv_rtcp (priv->stream, buffer);
} else {
res = GST_FLOW_NOT_LINKED;
}
return res;
}
static GstClockTime
get_backlog_item_timestamp (BackLogItem * item)
{
GstClockTime ret = GST_CLOCK_TIME_NONE;
if (item->buffer) {
ret = GST_BUFFER_DTS_OR_PTS (item->buffer);
} else if (item->buffer_list) {
g_assert (gst_buffer_list_length (item->buffer_list) > 0);
ret = GST_BUFFER_DTS_OR_PTS (gst_buffer_list_get (item->buffer_list, 0));
}
return ret;
}
static GstClockTime
get_first_backlog_timestamp (GstRTSPStreamTransport * trans)
{
GstRTSPStreamTransportPrivate *priv = trans->priv;
GstClockTime ret = GST_CLOCK_TIME_NONE;
guint i, l;
l = gst_queue_array_get_length (priv->items);
for (i = 0; i < l; i++) {
BackLogItem *item = (BackLogItem *)
gst_queue_array_peek_nth_struct (priv->items, i);
if (item->is_rtp) {
ret = get_backlog_item_timestamp (item);
break;
}
}
return ret;
}
/* Not MT-safe, caller should ensure consistent locking (see
* gst_rtsp_stream_transport_lock_backlog()). Ownership
* of @buffer and @buffer_list is transfered to the transport */
gboolean
gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans,
GstBuffer * buffer, GstBufferList * buffer_list, gboolean is_rtp)
{
gboolean ret = TRUE;
BackLogItem item = { 0, };
GstClockTime item_timestamp;
GstRTSPStreamTransportPrivate *priv;
priv = trans->priv;
if (buffer)
item.buffer = buffer;
if (buffer_list)
item.buffer_list = buffer_list;
item.is_rtp = is_rtp;
gst_queue_array_push_tail_struct (priv->items, &item);
item_timestamp = get_backlog_item_timestamp (&item);
if (is_rtp && priv->first_rtp_timestamp != GST_CLOCK_TIME_NONE) {
GstClockTimeDiff queue_duration;
g_assert (GST_CLOCK_TIME_IS_VALID (item_timestamp));
queue_duration = GST_CLOCK_DIFF (priv->first_rtp_timestamp, item_timestamp);
g_assert (queue_duration >= 0);
if (queue_duration > MAX_BACKLOG_DURATION &&
gst_queue_array_get_length (priv->items) > MAX_BACKLOG_SIZE) {
ret = FALSE;
}
} else if (is_rtp) {
priv->first_rtp_timestamp = item_timestamp;
}
return ret;
}
/* Not MT-safe, caller should ensure consistent locking (see
* gst_rtsp_stream_transport_lock_backlog()). Ownership
* of @buffer and @buffer_list is transfered back to the caller,
* if either of those is NULL the underlying object is unreffed */
gboolean
gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans,
GstBuffer ** buffer, GstBufferList ** buffer_list, gboolean * is_rtp)
{
BackLogItem *item;
GstRTSPStreamTransportPrivate *priv;
g_return_val_if_fail (!gst_rtsp_stream_transport_backlog_is_empty (trans),
FALSE);
priv = trans->priv;
item = (BackLogItem *) gst_queue_array_pop_head_struct (priv->items);
priv->first_rtp_timestamp = get_first_backlog_timestamp (trans);
if (buffer)
*buffer = item->buffer;
else if (item->buffer)
gst_buffer_unref (item->buffer);
if (buffer_list)
*buffer_list = item->buffer_list;
else if (item->buffer_list)
gst_buffer_list_unref (item->buffer_list);
if (is_rtp)
*is_rtp = item->is_rtp;
return TRUE;
}
/* Not MT-safe, caller should ensure consistent locking.
* See gst_rtsp_stream_transport_lock_backlog() */
gboolean
gst_rtsp_stream_transport_backlog_peek_is_rtp (GstRTSPStreamTransport * trans)
{
BackLogItem *item;
GstRTSPStreamTransportPrivate *priv;
g_return_val_if_fail (!gst_rtsp_stream_transport_backlog_is_empty (trans),
FALSE);
priv = trans->priv;
item = (BackLogItem *) gst_queue_array_peek_head_struct (priv->items);
return item->is_rtp;
}
/* Not MT-safe, caller should ensure consistent locking.
* See gst_rtsp_stream_transport_lock_backlog() */
gboolean
gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport * trans)
{
return gst_queue_array_is_empty (trans->priv->items);
}
/* Not MT-safe, caller should ensure consistent locking.
* See gst_rtsp_stream_transport_lock_backlog() */
void
gst_rtsp_stream_transport_clear_backlog (GstRTSPStreamTransport * trans)
{
while (!gst_rtsp_stream_transport_backlog_is_empty (trans)) {
gst_rtsp_stream_transport_backlog_pop (trans, NULL, NULL, NULL);
}
}
/* Internal API, protects access to the TCP backlog. Safe to
* call recursively */
void
gst_rtsp_stream_transport_lock_backlog (GstRTSPStreamTransport * trans)
{
g_rec_mutex_lock (&trans->priv->backlog_lock);
}
/* See gst_rtsp_stream_transport_lock_backlog() */
void
gst_rtsp_stream_transport_unlock_backlog (GstRTSPStreamTransport * trans)
{
g_rec_mutex_unlock (&trans->priv->backlog_lock);
}