/* GStreamer * Copyright (C) 2008 Wim Taymans * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ /** * SECTION:rtsp-stream-transport * @short_description: A media stream transport configuration * @see_also: #GstRTSPStream, #GstRTSPSessionMedia * * The #GstRTSPStreamTransport configures the transport used by a * #GstRTSPStream. It is usually manages by a #GstRTSPSessionMedia object. * * With gst_rtsp_stream_transport_set_callbacks(), callbacks can be configured * to handle the RTP and RTCP packets from the stream, for example when they * need to be sent over TCP. * * With gst_rtsp_stream_transport_set_active() the transports are added and * removed from the stream. * * A #GstRTSPStream will call gst_rtsp_stream_transport_keep_alive() when RTCP * is received from the client. It will also call * gst_rtsp_stream_transport_set_timed_out() when a receiver has timed out. * * A #GstRTSPClient will call gst_rtsp_stream_transport_message_sent() when it * has sent a data message for the transport. * * Last reviewed on 2013-07-16 (1.0.0) */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include "rtsp-stream-transport.h" #include "rtsp-server-internal.h" struct _GstRTSPStreamTransportPrivate { GstRTSPStream *stream; GstRTSPSendFunc send_rtp; GstRTSPSendFunc send_rtcp; gpointer user_data; GDestroyNotify notify; GstRTSPSendListFunc send_rtp_list; GstRTSPSendListFunc send_rtcp_list; gpointer list_user_data; GDestroyNotify list_notify; GstRTSPBackPressureFunc back_pressure_func; gpointer back_pressure_func_data; GDestroyNotify back_pressure_func_notify; GstRTSPKeepAliveFunc keep_alive; gpointer ka_user_data; GDestroyNotify ka_notify; gboolean timed_out; GstRTSPMessageSentFunc message_sent; gpointer ms_user_data; GDestroyNotify ms_notify; GstRTSPTransport *transport; GstRTSPUrl *url; GObject *rtpsource; /* TCP backlog */ GstClockTime first_rtp_timestamp; GstQueueArray *items; GRecMutex backlog_lock; }; #define MAX_BACKLOG_DURATION (10 * GST_SECOND) #define MAX_BACKLOG_SIZE 100 typedef struct { GstBuffer *buffer; GstBufferList *buffer_list; gboolean is_rtp; } BackLogItem; enum { PROP_0, PROP_LAST }; GST_DEBUG_CATEGORY_STATIC (rtsp_stream_transport_debug); #define GST_CAT_DEFAULT rtsp_stream_transport_debug static void gst_rtsp_stream_transport_finalize (GObject * obj); G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStreamTransport, gst_rtsp_stream_transport, G_TYPE_OBJECT); static void gst_rtsp_stream_transport_class_init (GstRTSPStreamTransportClass * klass) { GObjectClass *gobject_class; gobject_class = G_OBJECT_CLASS (klass); gobject_class->finalize = gst_rtsp_stream_transport_finalize; GST_DEBUG_CATEGORY_INIT (rtsp_stream_transport_debug, "rtspmediatransport", 0, "GstRTSPStreamTransport"); } static void clear_backlog_item (BackLogItem * item) { gst_clear_buffer (&item->buffer); gst_clear_buffer_list (&item->buffer_list); } static void gst_rtsp_stream_transport_init (GstRTSPStreamTransport * trans) { trans->priv = gst_rtsp_stream_transport_get_instance_private (trans); trans->priv->items = gst_queue_array_new_for_struct (sizeof (BackLogItem), 0); trans->priv->first_rtp_timestamp = GST_CLOCK_TIME_NONE; gst_queue_array_set_clear_func (trans->priv->items, (GDestroyNotify) clear_backlog_item); g_rec_mutex_init (&trans->priv->backlog_lock); } static void gst_rtsp_stream_transport_finalize (GObject * obj) { GstRTSPStreamTransportPrivate *priv; GstRTSPStreamTransport *trans; trans = GST_RTSP_STREAM_TRANSPORT (obj); priv = trans->priv; /* remove callbacks now */ gst_rtsp_stream_transport_set_callbacks (trans, NULL, NULL, NULL, NULL); gst_rtsp_stream_transport_set_keepalive (trans, NULL, NULL, NULL); gst_rtsp_stream_transport_set_message_sent (trans, NULL, NULL, NULL); if (priv->stream) g_object_unref (priv->stream); if (priv->transport) gst_rtsp_transport_free (priv->transport); if (priv->url) gst_rtsp_url_free (priv->url); gst_queue_array_free (priv->items); g_rec_mutex_clear (&priv->backlog_lock); G_OBJECT_CLASS (gst_rtsp_stream_transport_parent_class)->finalize (obj); } /** * gst_rtsp_stream_transport_new: * @stream: a #GstRTSPStream * @tr: (transfer full): a GstRTSPTransport * * Create a new #GstRTSPStreamTransport that can be used to manage * @stream with transport @tr. * * Returns: (transfer full): a new #GstRTSPStreamTransport */ GstRTSPStreamTransport * gst_rtsp_stream_transport_new (GstRTSPStream * stream, GstRTSPTransport * tr) { GstRTSPStreamTransportPrivate *priv; GstRTSPStreamTransport *trans; g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL); g_return_val_if_fail (tr != NULL, NULL); trans = g_object_new (GST_TYPE_RTSP_STREAM_TRANSPORT, NULL); priv = trans->priv; priv->stream = stream; priv->stream = g_object_ref (priv->stream); priv->transport = tr; return trans; } /** * gst_rtsp_stream_transport_get_stream: * @trans: a #GstRTSPStreamTransport * * Get the #GstRTSPStream used when constructing @trans. * * Returns: (transfer none) (nullable): the stream used when constructing @trans. */ GstRTSPStream * gst_rtsp_stream_transport_get_stream (GstRTSPStreamTransport * trans) { g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL); return trans->priv->stream; } /** * gst_rtsp_stream_transport_set_callbacks: * @trans: a #GstRTSPStreamTransport * @send_rtp: (scope notified): a callback called when RTP should be sent * @send_rtcp: (scope notified): a callback called when RTCP should be sent * @user_data: (closure): user data passed to callbacks * @notify: (allow-none): called with the user_data when no longer needed. * * Install callbacks that will be called when data for a stream should be sent * to a client. This is usually used when sending RTP/RTCP over TCP. */ void gst_rtsp_stream_transport_set_callbacks (GstRTSPStreamTransport * trans, GstRTSPSendFunc send_rtp, GstRTSPSendFunc send_rtcp, gpointer user_data, GDestroyNotify notify) { GstRTSPStreamTransportPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans)); priv = trans->priv; priv->send_rtp = send_rtp; priv->send_rtcp = send_rtcp; if (priv->notify) priv->notify (priv->user_data); priv->user_data = user_data; priv->notify = notify; } /** * gst_rtsp_stream_transport_set_list_callbacks: * @trans: a #GstRTSPStreamTransport * @send_rtp_list: (scope notified): a callback called when RTP should be sent * @send_rtcp_list: (scope notified): a callback called when RTCP should be sent * @user_data: (closure): user data passed to callbacks * @notify: (allow-none): called with the user_data when no longer needed. * * Install callbacks that will be called when data for a stream should be sent * to a client. This is usually used when sending RTP/RTCP over TCP. * * Since: 1.16 */ void gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport * trans, GstRTSPSendListFunc send_rtp_list, GstRTSPSendListFunc send_rtcp_list, gpointer user_data, GDestroyNotify notify) { GstRTSPStreamTransportPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans)); priv = trans->priv; priv->send_rtp_list = send_rtp_list; priv->send_rtcp_list = send_rtcp_list; if (priv->list_notify) priv->list_notify (priv->list_user_data); priv->list_user_data = user_data; priv->list_notify = notify; } void gst_rtsp_stream_transport_set_back_pressure_callback (GstRTSPStreamTransport * trans, GstRTSPBackPressureFunc back_pressure_func, gpointer user_data, GDestroyNotify notify) { GstRTSPStreamTransportPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans)); priv = trans->priv; priv->back_pressure_func = back_pressure_func; if (priv->back_pressure_func_notify) priv->back_pressure_func_notify (priv->back_pressure_func_data); priv->back_pressure_func_data = user_data; priv->back_pressure_func_notify = notify; } gboolean gst_rtsp_stream_transport_check_back_pressure (GstRTSPStreamTransport * trans, gboolean is_rtp) { GstRTSPStreamTransportPrivate *priv; gboolean ret = FALSE; guint8 channel; priv = trans->priv; if (is_rtp) channel = priv->transport->interleaved.min; else channel = priv->transport->interleaved.max; if (priv->back_pressure_func) ret = priv->back_pressure_func (channel, priv->back_pressure_func_data); return ret; } /** * gst_rtsp_stream_transport_set_keepalive: * @trans: a #GstRTSPStreamTransport * @keep_alive: (scope notified): a callback called when the receiver is active * @user_data: (closure): user data passed to callback * @notify: (allow-none): called with the user_data when no longer needed. * * Install callbacks that will be called when RTCP packets are received from the * receiver of @trans. */ void gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamTransport * trans, GstRTSPKeepAliveFunc keep_alive, gpointer user_data, GDestroyNotify notify) { GstRTSPStreamTransportPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans)); priv = trans->priv; priv->keep_alive = keep_alive; if (priv->ka_notify) priv->ka_notify (priv->ka_user_data); priv->ka_user_data = user_data; priv->ka_notify = notify; } /** * gst_rtsp_stream_transport_set_message_sent: * @trans: a #GstRTSPStreamTransport * @message_sent: (scope notified): a callback called when a message has been sent * @user_data: (closure): user data passed to callback * @notify: (allow-none): called with the user_data when no longer needed * * Install a callback that will be called when a message has been sent on @trans. */ void gst_rtsp_stream_transport_set_message_sent (GstRTSPStreamTransport * trans, GstRTSPMessageSentFunc message_sent, gpointer user_data, GDestroyNotify notify) { GstRTSPStreamTransportPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans)); priv = trans->priv; priv->message_sent = message_sent; if (priv->ms_notify) priv->ms_notify (priv->ms_user_data); priv->ms_user_data = user_data; priv->ms_notify = notify; } /** * gst_rtsp_stream_transport_set_transport: * @trans: a #GstRTSPStreamTransport * @tr: (transfer full): a client #GstRTSPTransport * * Set @tr as the client transport. This function takes ownership of the * passed @tr. */ void gst_rtsp_stream_transport_set_transport (GstRTSPStreamTransport * trans, GstRTSPTransport * tr) { GstRTSPStreamTransportPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans)); g_return_if_fail (tr != NULL); priv = trans->priv; /* keep track of the transports in the stream. */ if (priv->transport) gst_rtsp_transport_free (priv->transport); priv->transport = tr; } /** * gst_rtsp_stream_transport_get_transport: * @trans: a #GstRTSPStreamTransport * * Get the transport configured in @trans. * * Returns: (transfer none) (nullable): the transport configured in @trans. It remains * valid for as long as @trans is valid. */ const GstRTSPTransport * gst_rtsp_stream_transport_get_transport (GstRTSPStreamTransport * trans) { g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL); return trans->priv->transport; } /** * gst_rtsp_stream_transport_set_url: * @trans: a #GstRTSPStreamTransport * @url: (transfer none) (nullable): a client #GstRTSPUrl * * Set @url as the client url. */ void gst_rtsp_stream_transport_set_url (GstRTSPStreamTransport * trans, const GstRTSPUrl * url) { GstRTSPStreamTransportPrivate *priv; g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans)); priv = trans->priv; /* keep track of the transports in the stream. */ if (priv->url) gst_rtsp_url_free (priv->url); priv->url = (url ? gst_rtsp_url_copy (url) : NULL); } /** * gst_rtsp_stream_transport_get_url: * @trans: a #GstRTSPStreamTransport * * Get the url configured in @trans. * * Returns: (transfer none) (nullable): the url configured in @trans. * It remains valid for as long as @trans is valid. */ const GstRTSPUrl * gst_rtsp_stream_transport_get_url (GstRTSPStreamTransport * trans) { g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL); return trans->priv->url; } /** * gst_rtsp_stream_transport_get_rtpinfo: * @trans: a #GstRTSPStreamTransport * @start_time: a star time * * Get the RTP-Info string for @trans and @start_time. * * Returns: (transfer full) (nullable): the RTPInfo string for @trans * and @start_time or %NULL when the RTP-Info could not be * determined. g_free() after usage. */ gchar * gst_rtsp_stream_transport_get_rtpinfo (GstRTSPStreamTransport * trans, GstClockTime start_time) { GstRTSPStreamTransportPrivate *priv; gchar *url_str; GString *rtpinfo; guint rtptime, seq, clock_rate; GstClockTime running_time = GST_CLOCK_TIME_NONE; g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), NULL); priv = trans->priv; if (!gst_rtsp_stream_is_sender (priv->stream)) return NULL; if (!gst_rtsp_stream_get_rtpinfo (priv->stream, &rtptime, &seq, &clock_rate, &running_time)) return NULL; GST_DEBUG ("RTP time %u, seq %u, rate %u, running-time %" GST_TIME_FORMAT, rtptime, seq, clock_rate, GST_TIME_ARGS (running_time)); if (GST_CLOCK_TIME_IS_VALID (running_time) && GST_CLOCK_TIME_IS_VALID (start_time)) { if (running_time > start_time) { rtptime -= gst_util_uint64_scale_int (running_time - start_time, clock_rate, GST_SECOND); } else { rtptime += gst_util_uint64_scale_int (start_time - running_time, clock_rate, GST_SECOND); } } GST_DEBUG ("RTP time %u, for start-time %" GST_TIME_FORMAT, rtptime, GST_TIME_ARGS (start_time)); rtpinfo = g_string_new (""); url_str = gst_rtsp_url_get_request_uri (trans->priv->url); g_string_append_printf (rtpinfo, "url=%s;seq=%u;rtptime=%u", url_str, seq, rtptime); g_free (url_str); return g_string_free (rtpinfo, FALSE); } /** * gst_rtsp_stream_transport_set_active: * @trans: a #GstRTSPStreamTransport * @active: new state of @trans * * Activate or deactivate datatransfer configured in @trans. * * Returns: %TRUE when the state was changed. */ gboolean gst_rtsp_stream_transport_set_active (GstRTSPStreamTransport * trans, gboolean active) { GstRTSPStreamTransportPrivate *priv; gboolean res; g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); priv = trans->priv; if (active) res = gst_rtsp_stream_add_transport (priv->stream, trans); else res = gst_rtsp_stream_remove_transport (priv->stream, trans); return res; } /** * gst_rtsp_stream_transport_set_timed_out: * @trans: a #GstRTSPStreamTransport * @timedout: timed out value * * Set the timed out state of @trans to @timedout */ void gst_rtsp_stream_transport_set_timed_out (GstRTSPStreamTransport * trans, gboolean timedout) { g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans)); trans->priv->timed_out = timedout; } /** * gst_rtsp_stream_transport_is_timed_out: * @trans: a #GstRTSPStreamTransport * * Check if @trans is timed out. * * Returns: %TRUE if @trans timed out. */ gboolean gst_rtsp_stream_transport_is_timed_out (GstRTSPStreamTransport * trans) { g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE); return trans->priv->timed_out; } /** * gst_rtsp_stream_transport_send_rtp: * @trans: a #GstRTSPStreamTransport * @buffer: (transfer none): a #GstBuffer * * Send @buffer to the installed RTP callback for @trans. * * Returns: %TRUE on success */ gboolean gst_rtsp_stream_transport_send_rtp (GstRTSPStreamTransport * trans, GstBuffer * buffer) { GstRTSPStreamTransportPrivate *priv; gboolean res = FALSE; g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE); priv = trans->priv; if (priv->send_rtp) res = priv->send_rtp (buffer, priv->transport->interleaved.min, priv->user_data); if (res) gst_rtsp_stream_transport_keep_alive (trans); return res; } /** * gst_rtsp_stream_transport_send_rtcp: * @trans: a #GstRTSPStreamTransport * @buffer: (transfer none): a #GstBuffer * * Send @buffer to the installed RTCP callback for @trans. * * Returns: %TRUE on success */ gboolean gst_rtsp_stream_transport_send_rtcp (GstRTSPStreamTransport * trans, GstBuffer * buffer) { GstRTSPStreamTransportPrivate *priv; gboolean res = FALSE; g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE); priv = trans->priv; if (priv->send_rtcp) res = priv->send_rtcp (buffer, priv->transport->interleaved.max, priv->user_data); if (res) gst_rtsp_stream_transport_keep_alive (trans); return res; } /** * gst_rtsp_stream_transport_send_rtp_list: * @trans: a #GstRTSPStreamTransport * @buffer_list: (transfer none): a #GstBufferList * * Send @buffer_list to the installed RTP callback for @trans. * * Returns: %TRUE on success * * Since: 1.16 */ gboolean gst_rtsp_stream_transport_send_rtp_list (GstRTSPStreamTransport * trans, GstBufferList * buffer_list) { GstRTSPStreamTransportPrivate *priv; gboolean res = FALSE; g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE); priv = trans->priv; if (priv->send_rtp_list) { res = priv->send_rtp_list (buffer_list, priv->transport->interleaved.min, priv->list_user_data); } else if (priv->send_rtp) { guint n = gst_buffer_list_length (buffer_list), i; for (i = 0; i < n; i++) { GstBuffer *buffer = gst_buffer_list_get (buffer_list, i); res = priv->send_rtp (buffer, priv->transport->interleaved.min, priv->user_data); if (!res) break; } } if (res) gst_rtsp_stream_transport_keep_alive (trans); return res; } /** * gst_rtsp_stream_transport_send_rtcp_list: * @trans: a #GstRTSPStreamTransport * @buffer_list: (transfer none): a #GstBuffer * * Send @buffer_list to the installed RTCP callback for @trans. * * Returns: %TRUE on success * * Since: 1.16 */ gboolean gst_rtsp_stream_transport_send_rtcp_list (GstRTSPStreamTransport * trans, GstBufferList * buffer_list) { GstRTSPStreamTransportPrivate *priv; gboolean res = FALSE; g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE); priv = trans->priv; if (priv->send_rtcp_list) { res = priv->send_rtcp_list (buffer_list, priv->transport->interleaved.max, priv->list_user_data); } else if (priv->send_rtcp) { guint n = gst_buffer_list_length (buffer_list), i; for (i = 0; i < n; i++) { GstBuffer *buffer = gst_buffer_list_get (buffer_list, i); res = priv->send_rtcp (buffer, priv->transport->interleaved.max, priv->user_data); if (!res) break; } } if (res) gst_rtsp_stream_transport_keep_alive (trans); return res; } /** * gst_rtsp_stream_transport_keep_alive: * @trans: a #GstRTSPStreamTransport * * Signal the installed keep_alive callback for @trans. */ void gst_rtsp_stream_transport_keep_alive (GstRTSPStreamTransport * trans) { GstRTSPStreamTransportPrivate *priv; priv = trans->priv; if (priv->keep_alive) priv->keep_alive (priv->ka_user_data); } /** * gst_rtsp_stream_transport_message_sent: * @trans: a #GstRTSPStreamTransport * * Signal the installed message_sent callback for @trans. * * Since: 1.16 */ void gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport * trans) { GstRTSPStreamTransportPrivate *priv; priv = trans->priv; if (priv->message_sent) priv->message_sent (trans, priv->ms_user_data); } /** * gst_rtsp_stream_transport_recv_data: * @trans: a #GstRTSPStreamTransport * @channel: a channel * @buffer: (transfer full): a #GstBuffer * * Receive @buffer on @channel @trans. * * Returns: a #GstFlowReturn. Returns GST_FLOW_NOT_LINKED when @channel is not * configured in the transport of @trans. */ GstFlowReturn gst_rtsp_stream_transport_recv_data (GstRTSPStreamTransport * trans, guint channel, GstBuffer * buffer) { GstRTSPStreamTransportPrivate *priv; const GstRTSPTransport *tr; GstFlowReturn res; g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); priv = trans->priv; tr = priv->transport; if (tr->interleaved.min == channel) { res = gst_rtsp_stream_recv_rtp (priv->stream, buffer); } else if (tr->interleaved.max == channel) { res = gst_rtsp_stream_recv_rtcp (priv->stream, buffer); } else { res = GST_FLOW_NOT_LINKED; } return res; } static GstClockTime get_backlog_item_timestamp (BackLogItem * item) { GstClockTime ret = GST_CLOCK_TIME_NONE; if (item->buffer) { ret = GST_BUFFER_DTS_OR_PTS (item->buffer); } else if (item->buffer_list) { g_assert (gst_buffer_list_length (item->buffer_list) > 0); ret = GST_BUFFER_DTS_OR_PTS (gst_buffer_list_get (item->buffer_list, 0)); } return ret; } static GstClockTime get_first_backlog_timestamp (GstRTSPStreamTransport * trans) { GstRTSPStreamTransportPrivate *priv = trans->priv; GstClockTime ret = GST_CLOCK_TIME_NONE; guint i, l; l = gst_queue_array_get_length (priv->items); for (i = 0; i < l; i++) { BackLogItem *item = (BackLogItem *) gst_queue_array_peek_nth_struct (priv->items, i); if (item->is_rtp) { ret = get_backlog_item_timestamp (item); break; } } return ret; } /* Not MT-safe, caller should ensure consistent locking (see * gst_rtsp_stream_transport_lock_backlog()). Ownership * of @buffer and @buffer_list is transfered to the transport */ gboolean gst_rtsp_stream_transport_backlog_push (GstRTSPStreamTransport * trans, GstBuffer * buffer, GstBufferList * buffer_list, gboolean is_rtp) { gboolean ret = TRUE; BackLogItem item = { 0, }; GstClockTime item_timestamp; GstRTSPStreamTransportPrivate *priv; priv = trans->priv; if (buffer) item.buffer = buffer; if (buffer_list) item.buffer_list = buffer_list; item.is_rtp = is_rtp; gst_queue_array_push_tail_struct (priv->items, &item); item_timestamp = get_backlog_item_timestamp (&item); if (is_rtp && priv->first_rtp_timestamp != GST_CLOCK_TIME_NONE) { GstClockTimeDiff queue_duration; g_assert (GST_CLOCK_TIME_IS_VALID (item_timestamp)); queue_duration = GST_CLOCK_DIFF (priv->first_rtp_timestamp, item_timestamp); g_assert (queue_duration >= 0); if (queue_duration > MAX_BACKLOG_DURATION && gst_queue_array_get_length (priv->items) > MAX_BACKLOG_SIZE) { ret = FALSE; } } else if (is_rtp) { priv->first_rtp_timestamp = item_timestamp; } return ret; } /* Not MT-safe, caller should ensure consistent locking (see * gst_rtsp_stream_transport_lock_backlog()). Ownership * of @buffer and @buffer_list is transfered back to the caller, * if either of those is NULL the underlying object is unreffed */ gboolean gst_rtsp_stream_transport_backlog_pop (GstRTSPStreamTransport * trans, GstBuffer ** buffer, GstBufferList ** buffer_list, gboolean * is_rtp) { BackLogItem *item; GstRTSPStreamTransportPrivate *priv; g_return_val_if_fail (!gst_rtsp_stream_transport_backlog_is_empty (trans), FALSE); priv = trans->priv; item = (BackLogItem *) gst_queue_array_pop_head_struct (priv->items); priv->first_rtp_timestamp = get_first_backlog_timestamp (trans); if (buffer) *buffer = item->buffer; else if (item->buffer) gst_buffer_unref (item->buffer); if (buffer_list) *buffer_list = item->buffer_list; else if (item->buffer_list) gst_buffer_list_unref (item->buffer_list); if (is_rtp) *is_rtp = item->is_rtp; return TRUE; } /* Not MT-safe, caller should ensure consistent locking. * See gst_rtsp_stream_transport_lock_backlog() */ gboolean gst_rtsp_stream_transport_backlog_is_empty (GstRTSPStreamTransport * trans) { return gst_queue_array_is_empty (trans->priv->items); } /* Not MT-safe, caller should ensure consistent locking. * See gst_rtsp_stream_transport_lock_backlog() */ void gst_rtsp_stream_transport_clear_backlog (GstRTSPStreamTransport * trans) { while (!gst_rtsp_stream_transport_backlog_is_empty (trans)) { gst_rtsp_stream_transport_backlog_pop (trans, NULL, NULL, NULL); } } /* Internal API, protects access to the TCP backlog. Safe to * call recursively */ void gst_rtsp_stream_transport_lock_backlog (GstRTSPStreamTransport * trans) { g_rec_mutex_lock (&trans->priv->backlog_lock); } /* See gst_rtsp_stream_transport_lock_backlog() */ void gst_rtsp_stream_transport_unlock_backlog (GstRTSPStreamTransport * trans) { g_rec_mutex_unlock (&trans->priv->backlog_lock); }