gstreamer/gst/rtsp-server/rtsp-stream-transport.c
Adam x Nilsson 9c5ca231a6 rtsp-stream: Removing invalid transports returns false
When removing transports an assertion was that the transports passed in
for removal are present in the list, however that can't be assumed.
As an example if a transport was removed from a thread running
send_tcp_message, the main thread can try to remove the same transport
again if it gets a handle_pause_request. This will not effect the
transport list but it will effect n_tcp_transports as it will be
decrement and then have the wrong value.
2019-11-25 19:12:10 +01:00

902 lines
24 KiB
C

/* GStreamer
* Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:rtsp-stream-transport
* @short_description: A media stream transport configuration
* @see_also: #GstRTSPStream, #GstRTSPSessionMedia
*
* The #GstRTSPStreamTransport configures the transport used by a
* #GstRTSPStream. It is usually manages by a #GstRTSPSessionMedia object.
*
* With gst_rtsp_stream_transport_set_callbacks(), callbacks can be configured
* to handle the RTP and RTCP packets from the stream, for example when they
* need to be sent over TCP.
*
* With gst_rtsp_stream_transport_set_active() the transports are added and
* removed from the stream.
*
* A #GstRTSPStream will call gst_rtsp_stream_transport_keep_alive() when RTCP
* is received from the client. It will also call
* gst_rtsp_stream_transport_set_timed_out() when a receiver has timed out.
*
* A #GstRTSPClient will call gst_rtsp_stream_transport_message_sent() when it
* has sent a data message for the transport.
*
* Last reviewed on 2013-07-16 (1.0.0)
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>
#include <stdlib.h>
#include "rtsp-stream-transport.h"
#include "rtsp-server-internal.h"
struct _GstRTSPStreamTransportPrivate
{
GstRTSPStream *stream;
GstRTSPSendFunc send_rtp;
GstRTSPSendFunc send_rtcp;
gpointer user_data;
GDestroyNotify notify;
GstRTSPSendListFunc send_rtp_list;
GstRTSPSendListFunc send_rtcp_list;
gpointer list_user_data;
GDestroyNotify list_notify;
GstRTSPBackPressureFunc back_pressure_func;
gpointer back_pressure_func_data;
GDestroyNotify back_pressure_func_notify;
GstRTSPKeepAliveFunc keep_alive;
gpointer ka_user_data;
GDestroyNotify ka_notify;
gboolean timed_out;
GstRTSPMessageSentFunc message_sent;
gpointer ms_user_data;
GDestroyNotify ms_notify;
GstRTSPTransport *transport;
GstRTSPUrl *url;
GObject *rtpsource;
/* TCP backlog */
GstClockTime first_rtp_timestamp;
GstQueueArray *items;
};
#define MAX_BACKLOG_DURATION (10 * GST_SECOND)
#define MAX_BACKLOG_SIZE 100
typedef struct
{
GstBuffer *buffer;
GstBufferList *buffer_list;
gboolean is_rtp;
} BackLogItem;
enum
{
PROP_0,
PROP_LAST
};
GST_DEBUG_CATEGORY_STATIC (rtsp_stream_transport_debug);
#define GST_CAT_DEFAULT rtsp_stream_transport_debug
static void gst_rtsp_stream_transport_finalize (GObject * obj);
G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStreamTransport, gst_rtsp_stream_transport,
G_TYPE_OBJECT);
static void
gst_rtsp_stream_transport_class_init (GstRTSPStreamTransportClass * klass)
{
GObjectClass *gobject_class;
gobject_class = G_OBJECT_CLASS (klass);
gobject_class->finalize = gst_rtsp_stream_transport_finalize;
GST_DEBUG_CATEGORY_INIT (rtsp_stream_transport_debug, "rtspmediatransport",
0, "GstRTSPStreamTransport");
}
static void
clear_backlog_item (BackLogItem * item)
{
gst_clear_buffer (&item->buffer);
gst_clear_buffer_list (&item->buffer_list);
}
static void
gst_rtsp_stream_transport_init (GstRTSPStreamTransport * trans)
{
trans->priv = gst_rtsp_stream_transport_get_instance_private (trans);
trans->priv->items = gst_queue_array_new_for_struct (sizeof (BackLogItem), 0);
trans->priv->first_rtp_timestamp = GST_CLOCK_TIME_NONE;
gst_queue_array_set_clear_func (trans->priv->items,
(GDestroyNotify) clear_backlog_item);
}
static void
gst_rtsp_stream_transport_finalize (GObject * obj)
{
GstRTSPStreamTransportPrivate *priv;
GstRTSPStreamTransport *trans;
trans = GST_RTSP_STREAM_TRANSPORT (obj);
priv = trans->priv;
/* remove callbacks now */
gst_rtsp_stream_transport_set_callbacks (trans, NULL, NULL, NULL, NULL);
gst_rtsp_stream_transport_set_keepalive (trans, NULL, NULL, NULL);
gst_rtsp_stream_transport_set_message_sent (trans, NULL, NULL, NULL);
if (priv->stream)
g_object_unref (priv->stream);
if (priv->transport)
gst_rtsp_transport_free (priv->transport);
if (priv->url)
gst_rtsp_url_free (priv->url);
gst_queue_array_free (priv->items);
G_OBJECT_CLASS (gst_rtsp_stream_transport_parent_class)->finalize (obj);
}
/**
* gst_rtsp_stream_transport_new:
* @stream: a #GstRTSPStream
* @tr: (transfer full): a GstRTSPTransport
*
* Create a new #GstRTSPStreamTransport that can be used to manage
* @stream with transport @tr.
*
* Returns: (transfer full): a new #GstRTSPStreamTransport
*/
GstRTSPStreamTransport *
gst_rtsp_stream_transport_new (GstRTSPStream * stream, GstRTSPTransport * tr)
{
GstRTSPStreamTransportPrivate *priv;
GstRTSPStreamTransport *trans;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
g_return_val_if_fail (tr != NULL, NULL);
trans = g_object_new (GST_TYPE_RTSP_STREAM_TRANSPORT, NULL);
priv = trans->priv;
priv->stream = stream;
priv->stream = g_object_ref (priv->stream);
priv->transport = tr;
return trans;
}
/**
* gst_rtsp_stream_transport_get_stream:
* @trans: a #GstRTSPStreamTransport
*
* Get the #GstRTSPStream used when constructing @trans.
*
* Returns: (transfer none) (nullable): the stream used when constructing @trans.
*/
GstRTSPStream *
gst_rtsp_stream_transport_get_stream (GstRTSPStreamTransport * trans)
{
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL);
return trans->priv->stream;
}
/**
* gst_rtsp_stream_transport_set_callbacks:
* @trans: a #GstRTSPStreamTransport
* @send_rtp: (scope notified): a callback called when RTP should be sent
* @send_rtcp: (scope notified): a callback called when RTCP should be sent
* @user_data: (closure): user data passed to callbacks
* @notify: (allow-none): called with the user_data when no longer needed.
*
* Install callbacks that will be called when data for a stream should be sent
* to a client. This is usually used when sending RTP/RTCP over TCP.
*/
void
gst_rtsp_stream_transport_set_callbacks (GstRTSPStreamTransport * trans,
GstRTSPSendFunc send_rtp, GstRTSPSendFunc send_rtcp,
gpointer user_data, GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->send_rtp = send_rtp;
priv->send_rtcp = send_rtcp;
if (priv->notify)
priv->notify (priv->user_data);
priv->user_data = user_data;
priv->notify = notify;
}
/**
* gst_rtsp_stream_transport_set_list_callbacks:
* @trans: a #GstRTSPStreamTransport
* @send_rtp_list: (scope notified): a callback called when RTP should be sent
* @send_rtcp_list: (scope notified): a callback called when RTCP should be sent
* @user_data: (closure): user data passed to callbacks
* @notify: (allow-none): called with the user_data when no longer needed.
*
* Install callbacks that will be called when data for a stream should be sent
* to a client. This is usually used when sending RTP/RTCP over TCP.
*
* Since: 1.16
*/
void
gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport * trans,
GstRTSPSendListFunc send_rtp_list, GstRTSPSendListFunc send_rtcp_list,
gpointer user_data, GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->send_rtp_list = send_rtp_list;
priv->send_rtcp_list = send_rtcp_list;
if (priv->list_notify)
priv->list_notify (priv->list_user_data);
priv->list_user_data = user_data;
priv->list_notify = notify;
}
void
gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport *
trans, GstRTSPBackPressureFunc back_pressure_func, gpointer user_data,
GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->back_pressure_func = back_pressure_func;
if (priv->back_pressure_func_notify)
priv->back_pressure_func_notify (priv->back_pressure_func_data);
priv->back_pressure_func_data = user_data;
priv->back_pressure_func_notify = notify;
}
gboolean
gst_rtsp_stream_transport_check_back_pressure (GstRTSPStreamTransport * trans,
guint8 channel)
{
GstRTSPStreamTransportPrivate *priv;
gboolean ret = FALSE;
priv = trans->priv;
if (priv->back_pressure_func)
ret = priv->back_pressure_func (channel, priv->back_pressure_func_data);
return ret;
}
/**
* gst_rtsp_stream_transport_set_keepalive:
* @trans: a #GstRTSPStreamTransport
* @keep_alive: (scope notified): a callback called when the receiver is active
* @user_data: (closure): user data passed to callback
* @notify: (allow-none): called with the user_data when no longer needed.
*
* Install callbacks that will be called when RTCP packets are received from the
* receiver of @trans.
*/
void
gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamTransport * trans,
GstRTSPKeepAliveFunc keep_alive, gpointer user_data, GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->keep_alive = keep_alive;
if (priv->ka_notify)
priv->ka_notify (priv->ka_user_data);
priv->ka_user_data = user_data;
priv->ka_notify = notify;
}
/**
* gst_rtsp_stream_transport_set_message_sent:
* @trans: a #GstRTSPStreamTransport
* @message_sent: (scope notified): a callback called when a message has been sent
* @user_data: (closure): user data passed to callback
* @notify: (allow-none): called with the user_data when no longer needed
*
* Install a callback that will be called when a message has been sent on @trans.
*/
void
gst_rtsp_stream_transport_set_message_sent (GstRTSPStreamTransport * trans,
GstRTSPMessageSentFunc message_sent, gpointer user_data,
GDestroyNotify notify)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
priv->message_sent = message_sent;
if (priv->ms_notify)
priv->ms_notify (priv->ms_user_data);
priv->ms_user_data = user_data;
priv->ms_notify = notify;
}
/**
* gst_rtsp_stream_transport_set_transport:
* @trans: a #GstRTSPStreamTransport
* @tr: (transfer full): a client #GstRTSPTransport
*
* Set @tr as the client transport. This function takes ownership of the
* passed @tr.
*/
void
gst_rtsp_stream_transport_set_transport (GstRTSPStreamTransport * trans,
GstRTSPTransport * tr)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
g_return_if_fail (tr != NULL);
priv = trans->priv;
/* keep track of the transports in the stream. */
if (priv->transport)
gst_rtsp_transport_free (priv->transport);
priv->transport = tr;
}
/**
* gst_rtsp_stream_transport_get_transport:
* @trans: a #GstRTSPStreamTransport
*
* Get the transport configured in @trans.
*
* Returns: (transfer none) (nullable): the transport configured in @trans. It remains
* valid for as long as @trans is valid.
*/
const GstRTSPTransport *
gst_rtsp_stream_transport_get_transport (GstRTSPStreamTransport * trans)
{
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL);
return trans->priv->transport;
}
/**
* gst_rtsp_stream_transport_set_url:
* @trans: a #GstRTSPStreamTransport
* @url: (transfer none) (nullable): a client #GstRTSPUrl
*
* Set @url as the client url.
*/
void
gst_rtsp_stream_transport_set_url (GstRTSPStreamTransport * trans,
const GstRTSPUrl * url)
{
GstRTSPStreamTransportPrivate *priv;
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
priv = trans->priv;
/* keep track of the transports in the stream. */
if (priv->url)
gst_rtsp_url_free (priv->url);
priv->url = (url ? gst_rtsp_url_copy (url) : NULL);
}
/**
* gst_rtsp_stream_transport_get_url:
* @trans: a #GstRTSPStreamTransport
*
* Get the url configured in @trans.
*
* Returns: (transfer none) (nullable): the url configured in @trans.
* It remains valid for as long as @trans is valid.
*/
const GstRTSPUrl *
gst_rtsp_stream_transport_get_url (GstRTSPStreamTransport * trans)
{
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL);
return trans->priv->url;
}
/**
* gst_rtsp_stream_transport_get_rtpinfo:
* @trans: a #GstRTSPStreamTransport
* @start_time: a star time
*
* Get the RTP-Info string for @trans and @start_time.
*
* Returns: (transfer full) (nullable): the RTPInfo string for @trans
* and @start_time or %NULL when the RTP-Info could not be
* determined. g_free() after usage.
*/
gchar *
gst_rtsp_stream_transport_get_rtpinfo (GstRTSPStreamTransport * trans,
GstClockTime start_time)
{
GstRTSPStreamTransportPrivate *priv;
gchar *url_str;
GString *rtpinfo;
guint rtptime, seq, clock_rate;
GstClockTime running_time = GST_CLOCK_TIME_NONE;
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL);
priv = trans->priv;
if (!gst_rtsp_stream_is_sender (priv->stream))
return NULL;
if (!gst_rtsp_stream_get_rtpinfo (priv->stream, &rtptime, &seq, &clock_rate,
&running_time))
return NULL;
GST_DEBUG ("RTP time %u, seq %u, rate %u, running-time %" GST_TIME_FORMAT,
rtptime, seq, clock_rate, GST_TIME_ARGS (running_time));
if (GST_CLOCK_TIME_IS_VALID (running_time)
&& GST_CLOCK_TIME_IS_VALID (start_time)) {
if (running_time > start_time) {
rtptime -=
gst_util_uint64_scale_int (running_time - start_time, clock_rate,
GST_SECOND);
} else {
rtptime +=
gst_util_uint64_scale_int (start_time - running_time, clock_rate,
GST_SECOND);
}
}
GST_DEBUG ("RTP time %u, for start-time %" GST_TIME_FORMAT,
rtptime, GST_TIME_ARGS (start_time));
rtpinfo = g_string_new ("");
url_str = gst_rtsp_url_get_request_uri (trans->priv->url);
g_string_append_printf (rtpinfo, "url=%s;seq=%u;rtptime=%u",
url_str, seq, rtptime);
g_free (url_str);
return g_string_free (rtpinfo, FALSE);
}
/**
* gst_rtsp_stream_transport_set_active:
* @trans: a #GstRTSPStreamTransport
* @active: new state of @trans
*
* Activate or deactivate datatransfer configured in @trans.
*
* Returns: %TRUE when the state was changed.
*/
gboolean
gst_rtsp_stream_transport_set_active (GstRTSPStreamTransport * trans,
gboolean active)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res;
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
priv = trans->priv;
if (active)
res = gst_rtsp_stream_add_transport (priv->stream, trans);
else
res = gst_rtsp_stream_remove_transport (priv->stream, trans);
return res;
}
/**
* gst_rtsp_stream_transport_set_timed_out:
* @trans: a #GstRTSPStreamTransport
* @timedout: timed out value
*
* Set the timed out state of @trans to @timedout
*/
void
gst_rtsp_stream_transport_set_timed_out (GstRTSPStreamTransport * trans,
gboolean timedout)
{
g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
trans->priv->timed_out = timedout;
}
/**
* gst_rtsp_stream_transport_is_timed_out:
* @trans: a #GstRTSPStreamTransport
*
* Check if @trans is timed out.
*
* Returns: %TRUE if @trans timed out.
*/
gboolean
gst_rtsp_stream_transport_is_timed_out (GstRTSPStreamTransport * trans)
{
g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
return trans->priv->timed_out;
}
/**
* gst_rtsp_stream_transport_send_rtp:
* @trans: a #GstRTSPStreamTransport
* @buffer: (transfer none): a #GstBuffer
*
* Send @buffer to the installed RTP callback for @trans.
*
* Returns: %TRUE on success
*/
gboolean
gst_rtsp_stream_transport_send_rtp (GstRTSPStreamTransport * trans,
GstBuffer * buffer)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE);
priv = trans->priv;
if (priv->send_rtp)
res =
priv->send_rtp (buffer, priv->transport->interleaved.min,
priv->user_data);
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_send_rtcp:
* @trans: a #GstRTSPStreamTransport
* @buffer: (transfer none): a #GstBuffer
*
* Send @buffer to the installed RTCP callback for @trans.
*
* Returns: %TRUE on success
*/
gboolean
gst_rtsp_stream_transport_send_rtcp (GstRTSPStreamTransport * trans,
GstBuffer * buffer)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE);
priv = trans->priv;
if (priv->send_rtcp)
res =
priv->send_rtcp (buffer, priv->transport->interleaved.max,
priv->user_data);
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_send_rtp_list:
* @trans: a #GstRTSPStreamTransport
* @buffer_list: (transfer none): a #GstBufferList
*
* Send @buffer_list to the installed RTP callback for @trans.
*
* Returns: %TRUE on success
*
* Since: 1.16
*/
gboolean
gst_rtsp_stream_transport_send_rtp_list (GstRTSPStreamTransport * trans,
GstBufferList * buffer_list)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
priv = trans->priv;
if (priv->send_rtp_list) {
res =
priv->send_rtp_list (buffer_list, priv->transport->interleaved.min,
priv->list_user_data);
} else if (priv->send_rtp) {
guint n = gst_buffer_list_length (buffer_list), i;
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
res =
priv->send_rtp (buffer, priv->transport->interleaved.min,
priv->user_data);
if (!res)
break;
}
}
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_send_rtcp_list:
* @trans: a #GstRTSPStreamTransport
* @buffer_list: (transfer none): a #GstBuffer
*
* Send @buffer_list to the installed RTCP callback for @trans.
*
* Returns: %TRUE on success
*
* Since: 1.16
*/
gboolean
gst_rtsp_stream_transport_send_rtcp_list (GstRTSPStreamTransport * trans,
GstBufferList * buffer_list)
{
GstRTSPStreamTransportPrivate *priv;
gboolean res = FALSE;
g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
priv = trans->priv;
if (priv->send_rtcp_list) {
res =
priv->send_rtcp_list (buffer_list, priv->transport->interleaved.max,
priv->list_user_data);
} else if (priv->send_rtcp) {
guint n = gst_buffer_list_length (buffer_list), i;
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
res =
priv->send_rtcp (buffer, priv->transport->interleaved.max,
priv->user_data);
if (!res)
break;
}
}
if (res)
gst_rtsp_stream_transport_keep_alive (trans);
return res;
}
/**
* gst_rtsp_stream_transport_keep_alive:
* @trans: a #GstRTSPStreamTransport
*
* Signal the installed keep_alive callback for @trans.
*/
void
gst_rtsp_stream_transport_keep_alive (GstRTSPStreamTransport * trans)
{
GstRTSPStreamTransportPrivate *priv;
priv = trans->priv;
if (priv->keep_alive)
priv->keep_alive (priv->ka_user_data);
}
/**
* gst_rtsp_stream_transport_message_sent:
* @trans: a #GstRTSPStreamTransport
*
* Signal the installed message_sent callback for @trans.
*
* Since: 1.16
*/
void
gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport * trans)
{
GstRTSPStreamTransportPrivate *priv;
priv = trans->priv;
if (priv->message_sent)
priv->message_sent (trans, priv->ms_user_data);
}
/**
* gst_rtsp_stream_transport_recv_data:
* @trans: a #GstRTSPStreamTransport
* @channel: a channel
* @buffer: (transfer full): a #GstBuffer
*
* Receive @buffer on @channel @trans.
*
* Returns: a #GstFlowReturn. Returns GST_FLOW_NOT_LINKED when @channel is not
* configured in the transport of @trans.
*/
GstFlowReturn
gst_rtsp_stream_transport_recv_data (GstRTSPStreamTransport * trans,
guint channel, GstBuffer * buffer)
{
GstRTSPStreamTransportPrivate *priv;
const GstRTSPTransport *tr;
GstFlowReturn res;
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
priv = trans->priv;
tr = priv->transport;
if (tr->interleaved.min == channel) {
res = gst_rtsp_stream_recv_rtp (priv->stream, buffer);
} else if (tr->interleaved.max == channel) {
res = gst_rtsp_stream_recv_rtcp (priv->stream, buffer);
} else {
res = GST_FLOW_NOT_LINKED;
}
return res;
}
static GstClockTime
get_backlog_item_timestamp (BackLogItem * item)
{
GstClockTime ret = GST_CLOCK_TIME_NONE;
if (item->buffer) {
ret = GST_BUFFER_DTS_OR_PTS (item->buffer);
} else if (item->buffer_list) {
g_assert (gst_buffer_list_length (item->buffer_list) > 0);
ret = GST_BUFFER_DTS_OR_PTS (gst_buffer_list_get (item->buffer_list, 0));
}
return ret;
}
static GstClockTime
get_first_backlog_timestamp (GstRTSPStreamTransport * trans)
{
GstRTSPStreamTransportPrivate *priv = trans->priv;
GstClockTime ret = GST_CLOCK_TIME_NONE;
guint i, l;
l = gst_queue_array_get_length (priv->items);
for (i = 0; i < l; i++) {
BackLogItem *item = (BackLogItem *)
gst_queue_array_peek_nth_struct (priv->items, i);
if (item->is_rtp) {
ret = get_backlog_item_timestamp (item);
break;
}
}
return ret;
}
/* Not MT-safe, caller should ensure consistent locking. Ownership
* of @buffer and @buffer_list is transfered to the transport */
gboolean
gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans,
GstBuffer * buffer, GstBufferList * buffer_list, gboolean is_rtp)
{
gboolean ret = TRUE;
BackLogItem item = { 0, };
GstClockTime item_timestamp;
GstRTSPStreamTransportPrivate *priv;
priv = trans->priv;
if (buffer)
item.buffer = buffer;
if (buffer_list)
item.buffer_list = buffer_list;
item.is_rtp = is_rtp;
gst_queue_array_push_tail_struct (priv->items, &item);
item_timestamp = get_backlog_item_timestamp (&item);
if (is_rtp && priv->first_rtp_timestamp != GST_CLOCK_TIME_NONE) {
GstClockTimeDiff queue_duration;
g_assert (GST_CLOCK_TIME_IS_VALID (item_timestamp));
queue_duration = GST_CLOCK_DIFF (priv->first_rtp_timestamp, item_timestamp);
g_assert (queue_duration >= 0);
if (queue_duration > MAX_BACKLOG_DURATION &&
gst_queue_array_get_length (priv->items) > MAX_BACKLOG_SIZE) {
ret = FALSE;
}
} else if (is_rtp) {
priv->first_rtp_timestamp = item_timestamp;
}
return ret;
}
/* Not MT-safe, caller should ensure consistent locking. Ownership
* of @buffer and @buffer_list is transfered back to the caller */
gboolean
gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans,
GstBuffer ** buffer, GstBufferList ** buffer_list, gboolean * is_rtp)
{
BackLogItem *item;
GstRTSPStreamTransportPrivate *priv;
g_return_val_if_fail (!gst_rtsp_stream_transport_backlog_is_empty (trans),
FALSE);
priv = trans->priv;
item = (BackLogItem *) gst_queue_array_pop_head_struct (priv->items);
priv->first_rtp_timestamp = get_first_backlog_timestamp (trans);
*buffer = item->buffer;
*buffer_list = item->buffer_list;
*is_rtp = item->is_rtp;
return TRUE;
}
/* Not MT-safe, caller should ensure consistent locking. */
gboolean
gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport * trans)
{
return gst_queue_array_is_empty (trans->priv->items);
}