gstreamer/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c
Matthew Waters 206021e4d4 rtpmanager/rtx: implement initial support for reading/writing rid extensions
Two RTP Header extensions are very relevant for rtprtxsend/receive.
1. "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id": will always be removed
2. "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id": will be written
    instead of the "rtp-stream-id" header extension.

Currently it's only a simple replacement of one header extension for
another however a future change would only add the relevant extension
based on some heuristics (like, video frames only on one of the rtp key
frame buffers, or only until the rtx ssrc has been validated by the peer)
in order to reduce the required bandwidth.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1759>
2022-03-21 03:18:18 +00:00

1296 lines
41 KiB
C

/* RTP Retransmission sender element for GStreamer
*
* gstrtprtxsend.c:
*
* Copyright (C) 2013 Collabora Ltd.
* @author Julien Isorce <julien.isorce@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-rtprtxsend
* @title: rtprtxsend
*
* See #GstRtpRtxReceive for examples
*
* The purpose of the sender RTX object is to keep a history of RTP packets up
* to a configurable limit (max-size-time or max-size-packets). It will listen
* for upstream custom retransmission events (GstRTPRetransmissionRequest) that
* comes from downstream (#GstRtpSession). When receiving a request it will
* look up the requested seqnum in its list of stored packets. If the packet
* is available, it will create a RTX packet according to RFC 4588 and send
* this as an auxiliary stream. RTX is SSRC-multiplexed
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include <string.h>
#include <stdlib.h>
#include "gstrtprtxsend.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_send_debug);
#define GST_CAT_DEFAULT gst_rtp_rtx_send_debug
#define DEFAULT_RTX_PAYLOAD_TYPE 0
#define DEFAULT_MAX_SIZE_TIME 0
#define DEFAULT_MAX_SIZE_PACKETS 100
enum
{
PROP_0,
PROP_SSRC_MAP,
PROP_PAYLOAD_TYPE_MAP,
PROP_MAX_SIZE_TIME,
PROP_MAX_SIZE_PACKETS,
PROP_NUM_RTX_REQUESTS,
PROP_NUM_RTX_PACKETS,
PROP_CLOCK_RATE_MAP,
};
enum
{
SIGNAL_0,
SIGNAL_ADD_EXTENSION,
SIGNAL_CLEAR_EXTENSIONS,
LAST_SIGNAL
};
static guint gst_rtp_rtx_send_signals[LAST_SIGNAL] = { 0, };
#define RTPHDREXT_BUNDLE_MID GST_RTP_HDREXT_BASE "sdes:mid"
#define RTPHDREXT_STREAM_ID GST_RTP_HDREXT_BASE "sdes:rtp-stream-id"
#define RTPHDREXT_REPAIRED_STREAM_ID GST_RTP_HDREXT_BASE "sdes:repaired-rtp-stream-id"
static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-rtp")
);
static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-rtp")
);
static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
guint visible, guint bytes, guint64 time, gpointer checkdata);
static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
GstObject * parent, GstBufferList * list);
static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active);
static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
element, GstStateChange transition);
static void gst_rtp_rtx_send_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_rtp_rtx_send_finalize (GObject * object);
static void
gst_rtp_rtx_send_add_extension (GstRtpRtxSend * rtx,
GstRTPHeaderExtension * ext)
{
g_return_if_fail (GST_IS_RTP_HEADER_EXTENSION (ext));
g_return_if_fail (gst_rtp_header_extension_get_id (ext) > 0);
GST_OBJECT_LOCK (rtx);
if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext),
RTPHDREXT_STREAM_ID) == 0) {
gst_clear_object (&rtx->rid_stream);
rtx->rid_stream = gst_object_ref (ext);
} else if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext),
RTPHDREXT_REPAIRED_STREAM_ID) == 0) {
gst_clear_object (&rtx->rid_repaired);
rtx->rid_repaired = gst_object_ref (ext);
} else {
g_warning ("rtprtxsend (%s) doesn't know how to deal with the "
"RTP Header Extension with URI \'%s\'", GST_OBJECT_NAME (rtx),
gst_rtp_header_extension_get_uri (ext));
}
/* XXX: check for other duplicate ids? */
GST_OBJECT_UNLOCK (rtx);
}
static void
gst_rtp_rtx_send_clear_extensions (GstRtpRtxSend * rtx)
{
GST_OBJECT_LOCK (rtx);
gst_clear_object (&rtx->rid_stream);
gst_clear_object (&rtx->rid_repaired);
GST_OBJECT_UNLOCK (rtx);
}
G_DEFINE_TYPE_WITH_CODE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT,
GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0,
"rtp retransmission sender"));
GST_ELEMENT_REGISTER_DEFINE (rtprtxsend, "rtprtxsend", GST_RANK_NONE,
GST_TYPE_RTP_RTX_SEND);
#define IS_RTX_ENABLED(rtx) (g_hash_table_size ((rtx)->rtx_pt_map) > 0)
typedef struct
{
guint16 seqnum;
guint32 timestamp;
GstBuffer *buffer;
} BufferQueueItem;
static void
buffer_queue_item_free (BufferQueueItem * item)
{
gst_buffer_unref (item->buffer);
g_slice_free (BufferQueueItem, item);
}
typedef struct
{
guint32 rtx_ssrc;
guint16 seqnum_base, next_seqnum;
gint clock_rate;
/* history of rtp packets */
GSequence *queue;
} SSRCRtxData;
static SSRCRtxData *
ssrc_rtx_data_new (guint32 rtx_ssrc)
{
SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
data->rtx_ssrc = rtx_ssrc;
data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
return data;
}
static void
ssrc_rtx_data_free (SSRCRtxData * data)
{
g_sequence_free (data->queue);
g_slice_free (SSRCRtxData, data);
}
typedef enum
{
RTX_TASK_START,
RTX_TASK_PAUSE,
RTX_TASK_STOP,
} RtxTaskState;
static void
gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
{
GST_OBJECT_LOCK (rtx);
gst_data_queue_set_flushing (rtx->queue, flush);
gst_data_queue_flush (rtx->queue);
GST_OBJECT_UNLOCK (rtx);
}
static gboolean
gst_rtp_rtx_send_set_task_state (GstRtpRtxSend * rtx, RtxTaskState task_state)
{
GstTask *task = GST_PAD_TASK (rtx->srcpad);
GstPadMode mode = GST_PAD_MODE (rtx->srcpad);
gboolean ret = TRUE;
switch (task_state) {
case RTX_TASK_START:
{
gboolean active = task && GST_TASK_STATE (task) == GST_TASK_STARTED;
if (IS_RTX_ENABLED (rtx) && mode != GST_PAD_MODE_NONE && !active) {
GST_DEBUG_OBJECT (rtx, "Starting RTX task");
gst_rtp_rtx_send_set_flushing (rtx, FALSE);
ret = gst_pad_start_task (rtx->srcpad,
(GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
}
break;
}
case RTX_TASK_PAUSE:
if (task) {
GST_DEBUG_OBJECT (rtx, "Pausing RTX task");
gst_rtp_rtx_send_set_flushing (rtx, TRUE);
ret = gst_pad_pause_task (rtx->srcpad);
}
break;
case RTX_TASK_STOP:
if (task) {
GST_DEBUG_OBJECT (rtx, "Stopping RTX task");
gst_rtp_rtx_send_set_flushing (rtx, TRUE);
ret = gst_pad_stop_task (rtx->srcpad);
}
break;
}
return ret;
}
static void
gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
gobject_class->get_property = gst_rtp_rtx_send_get_property;
gobject_class->set_property = gst_rtp_rtx_send_set_property;
gobject_class->finalize = gst_rtp_rtx_send_finalize;
g_object_class_install_property (gobject_class, PROP_SSRC_MAP,
g_param_spec_boxed ("ssrc-map", "SSRC Map",
"Map of SSRCs to their retransmission SSRCs for SSRC-multiplexed mode"
" (default = random)", GST_TYPE_STRUCTURE,
G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
g_param_spec_boxed ("payload-type-map", "Payload Type Map",
"Map of original payload types to their retransmission payload types",
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
g_param_spec_uint ("max-size-time", "Max Size Time",
"Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
g_param_spec_uint ("max-size-packets", "Max Size Packets",
"Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
DEFAULT_MAX_SIZE_PACKETS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
"Number of retransmission events received", 0, G_MAXUINT,
0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
" Number of retransmission packets sent", 0, G_MAXUINT,
0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_CLOCK_RATE_MAP,
g_param_spec_boxed ("clock-rate-map", "Clock Rate Map",
"Map of payload types to their clock rates",
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* rtprtxsend::add-extension:
*
* Add @ext as an extension for writing part of an RTP header extension onto
* outgoing RTP packets. Currently only supports using the following
* extension URIs. All other RTP header extensions are copied as-is.
* - "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id": will be removed
* - "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id": will be
* written instead of the "rtp-stream-id" header extension.
*
* Since: 1.22
*/
gst_rtp_rtx_send_signals[SIGNAL_ADD_EXTENSION] =
g_signal_new_class_handler ("add-extension", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_CALLBACK (gst_rtp_rtx_send_add_extension), NULL, NULL, NULL,
G_TYPE_NONE, 1, GST_TYPE_RTP_HEADER_EXTENSION);
/**
* rtprtxsend::clear-extensions:
* @object: the #GstRTPBasePayload
*
* Clear all RTP header extensions used by this rtprtxsend.
*
* Since: 1.22
*/
gst_rtp_rtx_send_signals[SIGNAL_CLEAR_EXTENSIONS] =
g_signal_new_class_handler ("clear-extensions", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_CALLBACK (gst_rtp_rtx_send_clear_extensions), NULL, NULL, NULL,
G_TYPE_NONE, 0);
gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
gst_element_class_set_static_metadata (gstelement_class,
"RTP Retransmission Sender", "Codec",
"Retransmit RTP packets when needed, according to RFC4588",
"Julien Isorce <julien.isorce@collabora.co.uk>");
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_change_state);
}
static void
gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
{
GST_OBJECT_LOCK (rtx);
gst_data_queue_flush (rtx->queue);
g_hash_table_remove_all (rtx->ssrc_data);
g_hash_table_remove_all (rtx->rtx_ssrcs);
rtx->num_rtx_requests = 0;
rtx->num_rtx_packets = 0;
GST_OBJECT_UNLOCK (rtx);
}
static void
gst_rtp_rtx_send_finalize (GObject * object)
{
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (object);
g_hash_table_unref (rtx->ssrc_data);
g_hash_table_unref (rtx->rtx_ssrcs);
if (rtx->external_ssrc_map)
gst_structure_free (rtx->external_ssrc_map);
g_hash_table_unref (rtx->rtx_pt_map);
if (rtx->rtx_pt_map_structure)
gst_structure_free (rtx->rtx_pt_map_structure);
g_hash_table_unref (rtx->clock_rate_map);
if (rtx->clock_rate_map_structure)
gst_structure_free (rtx->clock_rate_map_structure);
g_object_unref (rtx->queue);
gst_clear_object (&rtx->rid_stream);
gst_clear_object (&rtx->rid_repaired);
gst_clear_buffer (&rtx->dummy_writable);
G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
}
static void
gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
{
GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
rtx->srcpad =
gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
"src"), "src");
GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
gst_pad_set_event_function (rtx->srcpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
gst_pad_set_activatemode_function (rtx->srcpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
rtx->sinkpad =
gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
"sink"), "sink");
GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
gst_pad_set_event_function (rtx->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
gst_pad_set_chain_function (rtx->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
gst_pad_set_chain_list_function (rtx->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
NULL, rtx);
rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, (GDestroyNotify) ssrc_rtx_data_free);
rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->clock_rate_map = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
rtx->dummy_writable = gst_buffer_new ();
}
static gboolean
gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
guint visible, guint bytes, guint64 time, gpointer checkdata)
{
return FALSE;
}
static void
gst_rtp_rtx_data_queue_item_free (gpointer item)
{
GstDataQueueItem *data = item;
if (data->object)
gst_mini_object_unref (data->object);
g_slice_free (GstDataQueueItem, data);
}
static gboolean
gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
{
GstDataQueueItem *data;
gboolean success;
data = g_slice_new0 (GstDataQueueItem);
data->object = GST_MINI_OBJECT (object);
data->size = 1;
data->duration = 1;
data->visible = TRUE;
data->destroy = gst_rtp_rtx_data_queue_item_free;
success = gst_data_queue_push (rtx->queue, data);
if (!success)
data->destroy (data);
return success;
}
static guint32
gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
gboolean consider_choice)
{
guint32 ssrc = consider_choice ? choice : g_random_int ();
/* make sure to be different than any other */
while (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)) ||
g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
ssrc = g_random_int ();
}
return ssrc;
}
static SSRCRtxData *
gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
{
SSRCRtxData *data;
guint32 rtx_ssrc = 0;
gboolean consider = FALSE;
if (G_UNLIKELY (!g_hash_table_contains (rtx->ssrc_data,
GUINT_TO_POINTER (ssrc)))) {
if (rtx->external_ssrc_map) {
gchar *ssrc_str;
ssrc_str = g_strdup_printf ("%" G_GUINT32_FORMAT, ssrc);
consider = gst_structure_get_uint (rtx->external_ssrc_map, ssrc_str,
&rtx_ssrc);
g_free (ssrc_str);
}
rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, rtx_ssrc, consider);
data = ssrc_rtx_data_new (rtx_ssrc);
g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
GUINT_TO_POINTER (ssrc));
} else {
data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
}
return data;
}
static GstMemory *
rewrite_header_extensions (GstRtpRtxSend * rtx, GstRTPBuffer * rtp)
{
gsize out_size = rtp->size[1] + 32;
guint16 bit_pattern;
guint8 *pdata;
guint wordlen;
GstMemory *mem;
GstMapInfo map;
mem = gst_allocator_alloc (NULL, out_size, NULL);
gst_memory_map (mem, &map, GST_MAP_READWRITE);
if (gst_rtp_buffer_get_extension_data (rtp, &bit_pattern, (gpointer) & pdata,
&wordlen)) {
GstRTPHeaderExtensionFlags ext_flags = 0;
gsize bytelen = wordlen * 4;
guint hdr_unit_bytes;
gsize read_offset = 0, write_offset = 4;
if (bit_pattern == 0xBEDE) {
/* one byte extensions */
hdr_unit_bytes = 1;
ext_flags |= GST_RTP_HEADER_EXTENSION_ONE_BYTE;
} else if (bit_pattern >> 4 == 0x100) {
/* two byte extensions */
hdr_unit_bytes = 2;
ext_flags |= GST_RTP_HEADER_EXTENSION_TWO_BYTE;
} else {
GST_DEBUG_OBJECT (rtx, "unknown extension bit pattern 0x%02x%02x",
bit_pattern >> 8, bit_pattern & 0xff);
goto copy_as_is;
}
GST_WRITE_UINT16_BE (map.data, bit_pattern);
while (TRUE) {
guint8 read_id, read_len;
if (read_offset + hdr_unit_bytes >= bytelen)
/* not enough remaning data */
break;
if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) {
read_id = GST_READ_UINT8 (pdata + read_offset) >> 4;
read_len = (GST_READ_UINT8 (pdata + read_offset) & 0x0F) + 1;
read_offset += 1;
if (read_id == 0)
/* padding */
continue;
if (read_id == 15)
/* special id for possible future expansion */
break;
} else {
read_id = GST_READ_UINT8 (pdata + read_offset);
read_offset += 1;
if (read_id == 0)
/* padding */
continue;
read_len = GST_READ_UINT8 (pdata + read_offset);
read_offset += 1;
}
GST_TRACE_OBJECT (rtx, "found rtp header extension with id %u and "
"length %u", read_id, read_len);
/* Ignore extension headers where the size does not fit */
if (read_offset + read_len > bytelen) {
GST_WARNING_OBJECT (rtx, "Extension length extends past the "
"size of the extension data");
break;
}
/* rewrite the rtp-stream-id into a repaired-stream-id */
if (rtx->rid_stream
&& read_id == gst_rtp_header_extension_get_id (rtx->rid_stream)) {
if (!gst_rtp_header_extension_read (rtx->rid_stream, ext_flags,
&pdata[read_offset], read_len, rtx->dummy_writable)) {
GST_WARNING_OBJECT (rtx, "RTP header extension (%s) could "
"not read payloaded data", GST_OBJECT_NAME (rtx->rid_stream));
goto copy_as_is;
}
if (rtx->rid_repaired) {
guint8 write_id = gst_rtp_header_extension_get_id (rtx->rid_repaired);
gsize written;
char *rid;
g_object_get (rtx->rid_stream, "rid", &rid, NULL);
g_object_set (rtx->rid_repaired, "rid", rid, NULL);
g_clear_pointer (&rid, g_free);
written =
gst_rtp_header_extension_write (rtx->rid_repaired, rtp->buffer,
ext_flags, rtx->dummy_writable,
&map.data[write_offset + hdr_unit_bytes],
map.size - write_offset - hdr_unit_bytes);
GST_TRACE_OBJECT (rtx->rid_repaired, "wrote %" G_GSIZE_FORMAT,
written);
if (written <= 0) {
GST_WARNING_OBJECT (rtx, "Failed to rewrite RID for RTX");
goto copy_as_is;
} else {
if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) {
map.data[write_offset] =
((write_id & 0x0F) << 4) | ((written - 1) & 0x0F);
} else if (ext_flags & GST_RTP_HEADER_EXTENSION_TWO_BYTE) {
map.data[write_offset] = write_id & 0xFF;
map.data[write_offset + 1] = written & 0xFF;
} else {
g_assert_not_reached ();
goto copy_as_is;
}
write_offset += written + hdr_unit_bytes;
}
}
} else {
/* TODO: may need to write mid at different times to the original
* buffer to account for the difference in timing of acknowledgement
* of the rtx ssrc from the original ssrc. This may add extra data to
* the header extension space that needs to be accounted for.
*/
memcpy (&map.data[write_offset],
&map.data[read_offset - hdr_unit_bytes], read_len + hdr_unit_bytes);
write_offset += read_len + hdr_unit_bytes;
}
read_offset += read_len;
}
/* subtract the ext header */
wordlen = write_offset / 4 + ((write_offset % 4) ? 1 : 0);
/* wordlen in the ext data doesn't include the 4-byte header */
GST_WRITE_UINT16_BE (map.data + 2, wordlen - 1);
if (wordlen * 4 > write_offset)
memset (&map.data[write_offset], 0, wordlen * 4 - write_offset);
GST_MEMDUMP_OBJECT (rtx, "generated ext data", map.data, wordlen * 4);
} else {
copy_as_is:
wordlen = rtp->size[1] / 4;
memcpy (map.data, rtp->data[1], rtp->size[1]);
GST_LOG_OBJECT (rtx, "copying data as-is");
}
gst_memory_unmap (mem, &map);
gst_memory_resize (mem, 0, wordlen * 4);
return mem;
}
/* Copy fixed header and extension. Add OSN before to copy payload
* Copy memory to avoid to manually copy each rtp buffer field.
*/
static GstBuffer *
gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
{
GstMemory *mem = NULL;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
GstBuffer *new_buffer = gst_buffer_new ();
GstMapInfo map;
guint payload_len = 0;
SSRCRtxData *data;
guint32 ssrc;
guint16 seqnum;
guint8 fmtp;
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
/* get needed data from GstRtpRtxSend */
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
ssrc = data->rtx_ssrc;
seqnum = data->next_seqnum++;
fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
GST_DEBUG_OBJECT (rtx, "creating rtx buffer, orig seqnum: %u, "
"rtx seqnum: %u, rtx ssrc: %X", gst_rtp_buffer_get_seq (&rtp),
seqnum, ssrc);
/* gst_rtp_buffer_map does not map the payload so do it now */
gst_rtp_buffer_get_payload (&rtp);
/* copy fixed header */
mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
gst_buffer_append_memory (new_buffer, mem);
/* copy extension if any */
if (rtp.size[1]) {
mem = rewrite_header_extensions (rtx, &rtp);
gst_buffer_append_memory (new_buffer, mem);
}
/* copy payload and add OSN just before */
payload_len = 2 + rtp.size[2];
mem = gst_allocator_alloc (NULL, payload_len, NULL);
gst_memory_map (mem, &map, GST_MAP_WRITE);
GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
if (rtp.size[2])
memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
gst_memory_unmap (mem, &map);
gst_buffer_append_memory (new_buffer, mem);
/* everything needed is copied */
gst_rtp_buffer_unmap (&rtp);
/* set ssrc, seqnum and fmtp */
gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
gst_rtp_buffer_set_seq (&new_rtp, seqnum);
gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
/* RFC 4588: let other elements do the padding, as normal */
gst_rtp_buffer_set_padding (&new_rtp, FALSE);
gst_rtp_buffer_unmap (&new_rtp);
/* Copy over timestamps */
gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
return new_buffer;
}
static gint
buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
gpointer user_data)
{
/* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
* it returns negative when seqnum1 > seqnum2 and we want negative
* when b > a, i.e. a is smaller, so it comes first in the sequence */
return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
}
static gboolean
gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
gboolean res;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CUSTOM_UPSTREAM:
{
const GstStructure *s = gst_event_get_structure (event);
/* This event usually comes from the downstream gstrtpsession */
if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
guint seqnum = 0;
guint ssrc = 0;
GstBuffer *rtx_buf = NULL;
/* retrieve seqnum of the packet that need to be retransmitted */
if (!gst_structure_get_uint (s, "seqnum", &seqnum))
seqnum = -1;
/* retrieve ssrc of the packet that need to be retransmitted */
if (!gst_structure_get_uint (s, "ssrc", &ssrc))
ssrc = -1;
GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
seqnum, ssrc);
GST_OBJECT_LOCK (rtx);
/* check if request is for us */
if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
SSRCRtxData *data;
GSequenceIter *iter;
BufferQueueItem search_item;
/* update statistics */
++rtx->num_rtx_requests;
data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
search_item.seqnum = seqnum;
iter = g_sequence_lookup (data->queue, &search_item,
(GCompareDataFunc) buffer_queue_items_cmp, NULL);
if (iter) {
BufferQueueItem *item = g_sequence_get (iter);
GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
}
#ifndef GST_DISABLE_DEBUG
else {
BufferQueueItem *item = NULL;
iter = g_sequence_get_begin_iter (data->queue);
if (!g_sequence_iter_is_end (iter))
item = g_sequence_get (iter);
if (item && seqnum < item->seqnum) {
GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
"removed from the rtx queue; the first available is %u",
seqnum, item->seqnum);
} else {
GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
"transmitted yet in the original stream; either the remote end "
"is not configured correctly, or the source is too slow",
seqnum);
}
}
#endif
}
GST_OBJECT_UNLOCK (rtx);
if (rtx_buf)
gst_rtp_rtx_send_push_out (rtx, rtx_buf);
gst_event_unref (event);
res = TRUE;
/* This event usually comes from the downstream gstrtpsession */
} else if (gst_structure_has_name (s, "GstRTPCollision")) {
guint ssrc = 0;
if (!gst_structure_get_uint (s, "ssrc", &ssrc))
ssrc = -1;
GST_DEBUG_OBJECT (rtx, "got ssrc collision, ssrc: %X", ssrc);
GST_OBJECT_LOCK (rtx);
/* choose another ssrc for our retransmitted stream */
if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
guint master_ssrc;
SSRCRtxData *data;
master_ssrc = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_ssrcs,
GUINT_TO_POINTER (ssrc)));
data = gst_rtp_rtx_send_get_ssrc_data (rtx, master_ssrc);
/* change rtx_ssrc and update the reverse map */
data->rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, 0, FALSE);
g_hash_table_remove (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc));
g_hash_table_insert (rtx->rtx_ssrcs,
GUINT_TO_POINTER (data->rtx_ssrc),
GUINT_TO_POINTER (master_ssrc));
GST_OBJECT_UNLOCK (rtx);
/* no need to forward to payloader because we make sure to have
* a different ssrc
*/
gst_event_unref (event);
res = TRUE;
} else {
/* if master ssrc has collided, remove it from our data, as it
* is not going to be used any longer */
if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
SSRCRtxData *data;
data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
g_hash_table_remove (rtx->rtx_ssrcs,
GUINT_TO_POINTER (data->rtx_ssrc));
g_hash_table_remove (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
}
GST_OBJECT_UNLOCK (rtx);
/* forward event to payloader in case collided ssrc is
* master stream */
res = gst_pad_event_default (pad, parent, event);
}
} else {
res = gst_pad_event_default (pad, parent, event);
}
break;
}
default:
res = gst_pad_event_default (pad, parent, event);
break;
}
return res;
}
static gboolean
gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
gst_pad_push_event (rtx->srcpad, event);
gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE);
return TRUE;
case GST_EVENT_FLUSH_STOP:
gst_pad_push_event (rtx->srcpad, event);
gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
return TRUE;
case GST_EVENT_EOS:
GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
gst_rtp_rtx_send_push_out (rtx, event);
return TRUE;
case GST_EVENT_CAPS:
{
GstCaps *caps;
GstStructure *s;
guint ssrc;
gint payload;
gpointer rtx_payload;
SSRCRtxData *data;
gst_event_parse_caps (event, &caps);
s = gst_caps_get_structure (caps, 0);
if (!gst_structure_get_uint (s, "ssrc", &ssrc))
ssrc = -1;
if (!gst_structure_get_int (s, "payload", &payload))
payload = -1;
if (payload == -1 || ssrc == G_MAXUINT)
break;
if (payload == -1)
GST_WARNING_OBJECT (rtx, "No payload in caps");
GST_OBJECT_LOCK (rtx);
data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
if (!g_hash_table_lookup_extended (rtx->rtx_pt_map,
GUINT_TO_POINTER (payload), NULL, &rtx_payload))
rtx_payload = GINT_TO_POINTER (-1);
if (rtx->rtx_pt_map_structure && GPOINTER_TO_INT (rtx_payload) == -1
&& payload != -1)
GST_WARNING_OBJECT (rtx, "Payload %d not in rtx-pt-map", payload);
GST_DEBUG_OBJECT (rtx,
"got caps for payload: %d->%d, ssrc: %u->%u : %" GST_PTR_FORMAT,
payload, GPOINTER_TO_INT (rtx_payload), ssrc, data->rtx_ssrc, caps);
gst_structure_get_int (s, "clock-rate", &data->clock_rate);
caps = gst_caps_copy (caps);
/* The session might need to know the RTX ssrc */
if (GPOINTER_TO_INT (rtx_payload) != -1) {
gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
"rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
gst_caps_set_simple (caps, "rtx-payload", G_TYPE_INT,
GPOINTER_TO_INT (rtx_payload), NULL);
}
GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
data->clock_rate, ssrc);
GST_OBJECT_UNLOCK (rtx);
gst_event_unref (event);
event = gst_event_new_caps (caps);
gst_caps_unref (caps);
break;
}
default:
break;
}
return gst_pad_event_default (pad, parent, event);
}
/* like rtp_jitter_buffer_get_ts_diff() */
static guint32
gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
{
guint64 high_ts, low_ts;
BufferQueueItem *high_buf, *low_buf;
guint32 result;
high_buf =
g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
(data->queue)));
low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
if (!high_buf || !low_buf || high_buf == low_buf)
return 0;
if (data->clock_rate) {
high_ts = high_buf->timestamp;
low_ts = low_buf->timestamp;
/* it needs to work if ts wraps */
if (high_ts >= low_ts) {
result = (guint32) (high_ts - low_ts);
} else {
result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
}
result = gst_util_uint64_scale_int (result, 1000, data->clock_rate);
} else {
high_ts = GST_BUFFER_PTS (high_buf->buffer);
low_ts = GST_BUFFER_PTS (low_buf->buffer);
result = gst_util_uint64_scale_int_round (high_ts - low_ts, 1, GST_MSECOND);
}
return result;
}
/* Must be called with lock */
static void
process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
{
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
BufferQueueItem *item;
SSRCRtxData *data;
guint16 seqnum;
guint8 payload_type;
guint32 ssrc, rtptime;
/* read the information we want from the buffer */
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
seqnum = gst_rtp_buffer_get_seq (&rtp);
payload_type = gst_rtp_buffer_get_payload_type (&rtp);
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
rtptime = gst_rtp_buffer_get_timestamp (&rtp);
gst_rtp_buffer_unmap (&rtp);
GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
ssrc);
/* do not store the buffer if it's payload type is unknown */
if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
if (data->clock_rate == 0 && rtx->clock_rate_map_structure) {
data->clock_rate =
GPOINTER_TO_INT (g_hash_table_lookup (rtx->clock_rate_map,
GUINT_TO_POINTER (payload_type)));
}
/* add current rtp buffer to queue history */
item = g_slice_new0 (BufferQueueItem);
item->seqnum = seqnum;
item->timestamp = rtptime;
item->buffer = gst_buffer_ref (buffer);
g_sequence_append (data->queue, item);
/* remove oldest packets from history if they are too many */
if (rtx->max_size_packets) {
while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
g_sequence_remove (g_sequence_get_begin_iter (data->queue));
}
if (rtx->max_size_time) {
while (gst_rtp_rtx_send_get_ts_diff (data) > rtx->max_size_time)
g_sequence_remove (g_sequence_get_begin_iter (data->queue));
}
}
}
static GstFlowReturn
gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
GstFlowReturn ret;
GST_OBJECT_LOCK (rtx);
if (rtx->rtx_pt_map_structure)
process_buffer (rtx, buffer);
GST_OBJECT_UNLOCK (rtx);
ret = gst_pad_push (rtx->srcpad, buffer);
return ret;
}
static gboolean
process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
{
process_buffer (user_data, *buffer);
return TRUE;
}
static GstFlowReturn
gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
GstBufferList * list)
{
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
GstFlowReturn ret;
GST_OBJECT_LOCK (rtx);
gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
GST_OBJECT_UNLOCK (rtx);
ret = gst_pad_push_list (rtx->srcpad, list);
return ret;
}
static void
gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
{
GstDataQueueItem *data;
if (gst_data_queue_pop (rtx->queue, &data)) {
GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
if (G_LIKELY (GST_IS_BUFFER (data->object))) {
GST_OBJECT_LOCK (rtx);
/* Update statistics just before pushing. */
rtx->num_rtx_packets++;
GST_OBJECT_UNLOCK (rtx);
gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
} else if (GST_IS_EVENT (data->object)) {
gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
/* after EOS, we should not send any more buffers,
* even if there are more requests coming in */
if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
gst_rtp_rtx_send_set_flushing (rtx, TRUE);
}
} else {
g_assert_not_reached ();
}
data->object = NULL; /* we no longer own that object */
data->destroy (data);
} else {
GST_LOG_OBJECT (rtx, "flushing");
gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE);
}
}
static gboolean
gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
GstPadMode mode, gboolean active)
{
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
gboolean ret = FALSE;
switch (mode) {
case GST_PAD_MODE_PUSH:
if (active) {
ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
} else {
ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP);
}
GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
break;
default:
break;
}
return ret;
}
static void
gst_rtp_rtx_send_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (object);
switch (prop_id) {
case PROP_PAYLOAD_TYPE_MAP:
GST_OBJECT_LOCK (rtx);
g_value_set_boxed (value, rtx->rtx_pt_map_structure);
GST_OBJECT_UNLOCK (rtx);
break;
case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (rtx);
g_value_set_uint (value, rtx->max_size_time);
GST_OBJECT_UNLOCK (rtx);
break;
case PROP_MAX_SIZE_PACKETS:
GST_OBJECT_LOCK (rtx);
g_value_set_uint (value, rtx->max_size_packets);
GST_OBJECT_UNLOCK (rtx);
break;
case PROP_NUM_RTX_REQUESTS:
GST_OBJECT_LOCK (rtx);
g_value_set_uint (value, rtx->num_rtx_requests);
GST_OBJECT_UNLOCK (rtx);
break;
case PROP_NUM_RTX_PACKETS:
GST_OBJECT_LOCK (rtx);
g_value_set_uint (value, rtx->num_rtx_packets);
GST_OBJECT_UNLOCK (rtx);
break;
case PROP_CLOCK_RATE_MAP:
GST_OBJECT_LOCK (rtx);
g_value_set_boxed (value, rtx->clock_rate_map_structure);
GST_OBJECT_UNLOCK (rtx);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static gboolean
structure_to_hash_table (GQuark field_id, const GValue * value, gpointer hash)
{
const gchar *field_str;
guint field_uint;
guint value_uint;
field_str = g_quark_to_string (field_id);
field_uint = atoi (field_str);
value_uint = g_value_get_uint (value);
g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (field_uint),
GUINT_TO_POINTER (value_uint));
return TRUE;
}
static void
gst_rtp_rtx_send_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (object);
switch (prop_id) {
case PROP_SSRC_MAP:
GST_OBJECT_LOCK (rtx);
if (rtx->external_ssrc_map)
gst_structure_free (rtx->external_ssrc_map);
rtx->external_ssrc_map = g_value_dup_boxed (value);
GST_OBJECT_UNLOCK (rtx);
break;
case PROP_PAYLOAD_TYPE_MAP:
GST_OBJECT_LOCK (rtx);
if (rtx->rtx_pt_map_structure)
gst_structure_free (rtx->rtx_pt_map_structure);
rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
g_hash_table_remove_all (rtx->rtx_pt_map);
gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
rtx->rtx_pt_map);
GST_OBJECT_UNLOCK (rtx);
if (IS_RTX_ENABLED (rtx))
gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
else
gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP);
break;
case PROP_MAX_SIZE_TIME:
GST_OBJECT_LOCK (rtx);
rtx->max_size_time = g_value_get_uint (value);
GST_OBJECT_UNLOCK (rtx);
break;
case PROP_MAX_SIZE_PACKETS:
GST_OBJECT_LOCK (rtx);
rtx->max_size_packets = g_value_get_uint (value);
GST_OBJECT_UNLOCK (rtx);
break;
case PROP_CLOCK_RATE_MAP:
GST_OBJECT_LOCK (rtx);
if (rtx->clock_rate_map_structure)
gst_structure_free (rtx->clock_rate_map_structure);
rtx->clock_rate_map_structure = g_value_dup_boxed (value);
g_hash_table_remove_all (rtx->clock_rate_map);
gst_structure_foreach (rtx->clock_rate_map_structure,
structure_to_hash_table, rtx->clock_rate_map);
GST_OBJECT_UNLOCK (rtx);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static GstStateChangeReturn
gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition)
{
GstStateChangeReturn ret;
GstRtpRtxSend *rtx;
rtx = GST_RTP_RTX_SEND_CAST (element);
switch (transition) {
default:
break;
}
ret =
GST_ELEMENT_CLASS (gst_rtp_rtx_send_parent_class)->change_state (element,
transition);
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_rtp_rtx_send_reset (rtx);
break;
default:
break;
}
return ret;
}