rtpmanager/rtx: implement initial support for reading/writing rid extensions

Two RTP Header extensions are very relevant for rtprtxsend/receive.
1. "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id": will always be removed
2. "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id": will be written
    instead of the "rtp-stream-id" header extension.

Currently it's only a simple replacement of one header extension for
another however a future change would only add the relevant extension
based on some heuristics (like, video frames only on one of the rtp key
frame buffers, or only until the rtx ssrc has been validated by the peer)
in order to reduce the required bandwidth.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1759>
This commit is contained in:
Matthew Waters 2021-09-09 23:43:33 +10:00 committed by GStreamer Marge Bot
parent 33be3e5936
commit 206021e4d4
6 changed files with 690 additions and 16 deletions

View file

@ -18375,7 +18375,26 @@
"writable": true "writable": true
} }
}, },
"rank": "none" "rank": "none",
"signals": {
"add-extension": {
"action": true,
"args": [
{
"name": "arg0",
"type": "GstRTPHeaderExtension"
}
],
"return-type": "void",
"when": "last"
},
"clear-extensions": {
"action": true,
"args": [],
"return-type": "void",
"when": "last"
}
}
}, },
"rtprtxsend": { "rtprtxsend": {
"author": "Julien Isorce <julien.isorce@collabora.co.uk>", "author": "Julien Isorce <julien.isorce@collabora.co.uk>",
@ -18492,7 +18511,26 @@
"writable": true "writable": true
} }
}, },
"rank": "none" "rank": "none",
"signals": {
"add-extension": {
"action": true,
"args": [
{
"name": "arg0",
"type": "GstRTPHeaderExtension"
}
],
"return-type": "void",
"when": "last"
},
"clear-extensions": {
"action": true,
"args": [],
"return-type": "void",
"when": "last"
}
}
}, },
"rtpsession": { "rtpsession": {
"author": "Wim Taymans <wim.taymans@gmail.com>", "author": "Wim Taymans <wim.taymans@gmail.com>",

View file

@ -145,7 +145,7 @@
#endif #endif
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/rtp/gstrtpbuffer.h> #include <gst/rtp/rtp.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -166,6 +166,19 @@ enum
PROP_NUM_RTX_ASSOC_PACKETS PROP_NUM_RTX_ASSOC_PACKETS
}; };
enum
{
SIGNAL_0,
SIGNAL_ADD_EXTENSION,
SIGNAL_CLEAR_EXTENSIONS,
LAST_SIGNAL
};
static guint gst_rtp_rtx_receive_signals[LAST_SIGNAL] = { 0, };
#define RTPHDREXT_STREAM_ID GST_RTP_HDREXT_BASE "sdes:rtp-stream-id"
#define RTPHDREXT_REPAIRED_STREAM_ID GST_RTP_HDREXT_BASE "sdes:repaired-rtp-stream-id"
static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC, GST_PAD_SRC,
GST_PAD_ALWAYS, GST_PAD_ALWAYS,
@ -198,6 +211,40 @@ G_DEFINE_TYPE_WITH_CODE (GstRtpRtxReceive, gst_rtp_rtx_receive,
GST_ELEMENT_REGISTER_DEFINE (rtprtxreceive, "rtprtxreceive", GST_RANK_NONE, GST_ELEMENT_REGISTER_DEFINE (rtprtxreceive, "rtprtxreceive", GST_RANK_NONE,
GST_TYPE_RTP_RTX_RECEIVE); GST_TYPE_RTP_RTX_RECEIVE);
static void
gst_rtp_rtx_receive_add_extension (GstRtpRtxReceive * rtx,
GstRTPHeaderExtension * ext)
{
g_return_if_fail (GST_IS_RTP_HEADER_EXTENSION (ext));
g_return_if_fail (gst_rtp_header_extension_get_id (ext) > 0);
GST_OBJECT_LOCK (rtx);
if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext),
RTPHDREXT_STREAM_ID) == 0) {
gst_clear_object (&rtx->rid_stream);
rtx->rid_stream = gst_object_ref (ext);
} else if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext),
RTPHDREXT_REPAIRED_STREAM_ID) == 0) {
gst_clear_object (&rtx->rid_repaired);
rtx->rid_repaired = gst_object_ref (ext);
} else {
g_warning ("rtprtxsend (%s) doesn't know how to deal with the "
"RTP Header Extension with URI \'%s\'", GST_OBJECT_NAME (rtx),
gst_rtp_header_extension_get_uri (ext));
}
/* XXX: check for other duplicate ids? */
GST_OBJECT_UNLOCK (rtx);
}
static void
gst_rtp_rtx_receive_clear_extensions (GstRtpRtxReceive * rtx)
{
GST_OBJECT_LOCK (rtx);
gst_clear_object (&rtx->rid_stream);
gst_clear_object (&rtx->rid_repaired);
GST_OBJECT_UNLOCK (rtx);
}
static void static void
gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass) gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass)
{ {
@ -248,6 +295,38 @@ gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass)
"correctly associated with retransmission requests", 0, G_MAXUINT, "correctly associated with retransmission requests", 0, G_MAXUINT,
0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* rtprtxreceive::add-extension:
*
* Add @ext as an extension for writing part of an RTP header extension onto
* outgoing RTP packets. Currently only supports using the following
* extension URIs. All other RTP header extensions are copied as-is.
* - "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id": will be removed
* - "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id": will be
* written instead of the "rtp-stream-id" header extension.
*
* Since: 1.22
*/
gst_rtp_rtx_receive_signals[SIGNAL_ADD_EXTENSION] =
g_signal_new_class_handler ("add-extension", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_CALLBACK (gst_rtp_rtx_receive_add_extension), NULL, NULL, NULL,
G_TYPE_NONE, 1, GST_TYPE_RTP_HEADER_EXTENSION);
/**
* rtprtxreceive::clear-extensions:
* @object: the #GstRTPBasePayload
*
* Clear all RTP header extensions used by rtprtxreceive.
*
* Since: 1.22
*/
gst_rtp_rtx_receive_signals[SIGNAL_CLEAR_EXTENSIONS] =
g_signal_new_class_handler ("clear-extensions", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_CALLBACK (gst_rtp_rtx_receive_clear_extensions), NULL, NULL, NULL,
G_TYPE_NONE, 0);
gst_element_class_add_static_pad_template (gstelement_class, &src_factory); gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
gst_element_class_add_static_pad_template (gstelement_class, &sink_factory); gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
@ -285,6 +364,11 @@ gst_rtp_rtx_receive_finalize (GObject * object)
if (rtx->rtx_pt_map_structure) if (rtx->rtx_pt_map_structure)
gst_structure_free (rtx->rtx_pt_map_structure); gst_structure_free (rtx->rtx_pt_map_structure);
gst_clear_object (&rtx->rid_stream);
gst_clear_object (&rtx->rid_repaired);
gst_clear_buffer (&rtx->dummy_writable);
G_OBJECT_CLASS (gst_rtp_rtx_receive_parent_class)->finalize (object); G_OBJECT_CLASS (gst_rtp_rtx_receive_parent_class)->finalize (object);
} }
@ -339,6 +423,8 @@ gst_rtp_rtx_receive_init (GstRtpRtxReceive * rtx)
NULL, (GDestroyNotify) ssrc_assoc_free); NULL, (GDestroyNotify) ssrc_assoc_free);
rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal); rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
rtx->dummy_writable = gst_buffer_new ();
} }
static gboolean static gboolean
@ -465,13 +551,169 @@ gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
return res; return res;
} }
static GstMemory *
rewrite_header_extensions (GstRtpRtxReceive * rtx, GstRTPBuffer * rtp)
{
gsize out_size = rtp->size[1] + 32;
guint16 bit_pattern;
guint8 *pdata;
guint wordlen;
GstMemory *mem;
GstMapInfo map;
mem = gst_allocator_alloc (NULL, out_size, NULL);
gst_memory_map (mem, &map, GST_MAP_READWRITE);
if (gst_rtp_buffer_get_extension_data (rtp, &bit_pattern, (gpointer) & pdata,
&wordlen)) {
GstRTPHeaderExtensionFlags ext_flags = 0;
gsize bytelen = wordlen * 4;
guint hdr_unit_bytes;
gsize read_offset = 0, write_offset = 4;
if (bit_pattern == 0xBEDE) {
/* one byte extensions */
hdr_unit_bytes = 1;
ext_flags |= GST_RTP_HEADER_EXTENSION_ONE_BYTE;
} else if (bit_pattern >> 4 == 0x100) {
/* two byte extensions */
hdr_unit_bytes = 2;
ext_flags |= GST_RTP_HEADER_EXTENSION_TWO_BYTE;
} else {
GST_DEBUG_OBJECT (rtx, "unknown extension bit pattern 0x%02x%02x",
bit_pattern >> 8, bit_pattern & 0xff);
goto copy_as_is;
}
GST_WRITE_UINT16_BE (map.data, bit_pattern);
while (TRUE) {
guint8 read_id, read_len;
if (read_offset + hdr_unit_bytes >= bytelen)
/* not enough remaning data */
break;
if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) {
read_id = GST_READ_UINT8 (pdata + read_offset) >> 4;
read_len = (GST_READ_UINT8 (pdata + read_offset) & 0x0F) + 1;
read_offset += 1;
if (read_id == 0)
/* padding */
continue;
if (read_id == 15)
/* special id for possible future expansion */
break;
} else {
read_id = GST_READ_UINT8 (pdata + read_offset);
read_offset += 1;
if (read_id == 0)
/* padding */
continue;
read_len = GST_READ_UINT8 (pdata + read_offset);
read_offset += 1;
}
GST_TRACE_OBJECT (rtx, "found rtp header extension with id %u and "
"length %u", read_id, read_len);
/* Ignore extension headers where the size does not fit */
if (read_offset + read_len > bytelen) {
GST_WARNING_OBJECT (rtx, "Extension length extends past the "
"size of the extension data");
break;
}
/* rewrite the rtp-stream-id into a repaired-stream-id */
if (rtx->rid_stream
&& read_id == gst_rtp_header_extension_get_id (rtx->rid_repaired)) {
if (!gst_rtp_header_extension_read (rtx->rid_repaired, ext_flags,
&pdata[read_offset], read_len, rtx->dummy_writable)) {
GST_WARNING_OBJECT (rtx, "RTP header extension (%s) could "
"not read payloaded data", GST_OBJECT_NAME (rtx->rid_stream));
goto copy_as_is;
}
if (rtx->rid_repaired) {
guint8 write_id = gst_rtp_header_extension_get_id (rtx->rid_stream);
gsize written;
char *rid;
g_object_get (rtx->rid_repaired, "rid", &rid, NULL);
g_object_set (rtx->rid_stream, "rid", rid, NULL);
g_clear_pointer (&rid, g_free);
written =
gst_rtp_header_extension_write (rtx->rid_stream, rtp->buffer,
ext_flags, rtx->dummy_writable,
&map.data[write_offset + hdr_unit_bytes],
map.size - write_offset - hdr_unit_bytes);
GST_TRACE_OBJECT (rtx->rid_repaired, "wrote %" G_GSIZE_FORMAT,
written);
if (written <= 0) {
GST_WARNING_OBJECT (rtx, "Failed to rewrite RID for RTX");
goto copy_as_is;
} else {
if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) {
map.data[write_offset] =
((write_id & 0x0F) << 4) | ((written - 1) & 0x0F);
} else if (ext_flags & GST_RTP_HEADER_EXTENSION_TWO_BYTE) {
map.data[write_offset] = write_id & 0xFF;
map.data[write_offset + 1] = written & 0xFF;
} else {
g_assert_not_reached ();
goto copy_as_is;
}
write_offset += written + hdr_unit_bytes;
}
}
} else {
/* TODO: may need to write mid at different times to the original
* buffer to account for the difference in timing of acknowledgement
* of the rtx ssrc from the original ssrc. This may add extra data to
* the header extension space that needs to be accounted for.
*/
memcpy (&map.data[write_offset],
&map.data[read_offset - hdr_unit_bytes], read_len + hdr_unit_bytes);
write_offset += read_len + hdr_unit_bytes;
}
read_offset += read_len;
}
/* subtract the ext header */
wordlen = write_offset / 4 + ((write_offset % 4) ? 1 : 0);
/* wordlen in the ext data doesn't include the 4-byte header */
GST_WRITE_UINT16_BE (map.data + 2, wordlen - 1);
if (wordlen * 4 > write_offset)
memset (&map.data[write_offset], 0, wordlen * 4 - write_offset);
GST_MEMDUMP_OBJECT (rtx, "generated ext data", map.data, wordlen * 4);
} else {
copy_as_is:
wordlen = rtp->size[1] / 4;
memcpy (map.data, rtp->data[1], rtp->size[1]);
GST_LOG_OBJECT (rtx, "copying data as-is");
}
gst_memory_unmap (mem, &map);
gst_memory_resize (mem, 0, wordlen * 4);
return mem;
}
/* Copy fixed header and extension. Replace current ssrc by ssrc1, /* Copy fixed header and extension. Replace current ssrc by ssrc1,
* remove OSN and replace current seq num by OSN. * remove OSN and replace current seq num by OSN.
* Copy memory to avoid to manually copy each rtp buffer field. * Copy memory to avoid to manually copy each rtp buffer field.
*/ */
static GstBuffer * static GstBuffer *
_gst_rtp_buffer_new_from_rtx (GstRTPBuffer * rtp, guint32 ssrc1, _gst_rtp_buffer_new_from_rtx (GstRtpRtxReceive * rtx, GstRTPBuffer * rtp,
guint16 orign_seqnum, guint8 origin_payload_type) guint32 ssrc1, guint16 orign_seqnum, guint8 origin_payload_type)
{ {
GstMemory *mem = NULL; GstMemory *mem = NULL;
GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT; GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
@ -486,8 +728,7 @@ _gst_rtp_buffer_new_from_rtx (GstRTPBuffer * rtp, guint32 ssrc1,
/* copy extension if any */ /* copy extension if any */
if (rtp->size[1]) { if (rtp->size[1]) {
mem = gst_memory_copy (rtp->map[1].memory, mem = rewrite_header_extensions (rtx, rtp);
(guint8 *) rtp->data[1] - rtp->map[1].data, rtp->size[1]);
gst_buffer_append_memory (new_buffer, mem); gst_buffer_append_memory (new_buffer, mem);
} }
@ -556,6 +797,10 @@ gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
goto invalid_buffer; goto invalid_buffer;
GST_MEMDUMP_OBJECT (rtx, "rtp header", rtp.map[0].data, rtp.map[0].size);
GST_MEMDUMP_OBJECT (rtx, "rtp ext", rtp.map[1].data, rtp.map[1].size);
GST_MEMDUMP_OBJECT (rtx, "rtp payload", rtp.map[2].data, rtp.map[2].size);
ssrc = gst_rtp_buffer_get_ssrc (&rtp); ssrc = gst_rtp_buffer_get_ssrc (&rtp);
seqnum = gst_rtp_buffer_get_seq (&rtp); seqnum = gst_rtp_buffer_get_seq (&rtp);
payload_type = gst_rtp_buffer_get_payload_type (&rtp); payload_type = gst_rtp_buffer_get_payload_type (&rtp);
@ -690,7 +935,7 @@ gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
/* create the retransmission packet */ /* create the retransmission packet */
if (is_rtx) if (is_rtx)
new_buffer = new_buffer =
_gst_rtp_buffer_new_from_rtx (&rtp, GPOINTER_TO_UINT (ssrc1), _gst_rtp_buffer_new_from_rtx (rtx, &rtp, GPOINTER_TO_UINT (ssrc1),
orign_seqnum, origin_payload_type); orign_seqnum, origin_payload_type);
gst_rtp_buffer_unmap (&rtp); gst_rtp_buffer_unmap (&rtp);

View file

@ -25,7 +25,7 @@
#define __GST_RTP_RTX_RECEIVE_H__ #define __GST_RTP_RTX_RECEIVE_H__
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/rtp/gstrtpbuffer.h> #include <gst/rtp/rtp.h>
G_BEGIN_DECLS G_BEGIN_DECLS
typedef struct _GstRtpRtxReceive GstRtpRtxReceive; typedef struct _GstRtpRtxReceive GstRtpRtxReceive;
@ -69,6 +69,11 @@ struct _GstRtpRtxReceive
guint num_rtx_assoc_packets; guint num_rtx_assoc_packets;
GstClockTime last_time; GstClockTime last_time;
GstRTPHeaderExtension *rid_stream;
GstRTPHeaderExtension *rid_repaired;
GstBuffer *dummy_writable;
}; };
struct _GstRtpRtxReceiveClass struct _GstRtpRtxReceiveClass

View file

@ -41,7 +41,6 @@
#endif #endif
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -66,6 +65,20 @@ enum
PROP_CLOCK_RATE_MAP, PROP_CLOCK_RATE_MAP,
}; };
enum
{
SIGNAL_0,
SIGNAL_ADD_EXTENSION,
SIGNAL_CLEAR_EXTENSIONS,
LAST_SIGNAL
};
static guint gst_rtp_rtx_send_signals[LAST_SIGNAL] = { 0, };
#define RTPHDREXT_BUNDLE_MID GST_RTP_HDREXT_BASE "sdes:mid"
#define RTPHDREXT_STREAM_ID GST_RTP_HDREXT_BASE "sdes:rtp-stream-id"
#define RTPHDREXT_REPAIRED_STREAM_ID GST_RTP_HDREXT_BASE "sdes:repaired-rtp-stream-id"
static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC, GST_PAD_SRC,
GST_PAD_ALWAYS, GST_PAD_ALWAYS,
@ -103,6 +116,40 @@ static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec); GValue * value, GParamSpec * pspec);
static void gst_rtp_rtx_send_finalize (GObject * object); static void gst_rtp_rtx_send_finalize (GObject * object);
static void
gst_rtp_rtx_send_add_extension (GstRtpRtxSend * rtx,
GstRTPHeaderExtension * ext)
{
g_return_if_fail (GST_IS_RTP_HEADER_EXTENSION (ext));
g_return_if_fail (gst_rtp_header_extension_get_id (ext) > 0);
GST_OBJECT_LOCK (rtx);
if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext),
RTPHDREXT_STREAM_ID) == 0) {
gst_clear_object (&rtx->rid_stream);
rtx->rid_stream = gst_object_ref (ext);
} else if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext),
RTPHDREXT_REPAIRED_STREAM_ID) == 0) {
gst_clear_object (&rtx->rid_repaired);
rtx->rid_repaired = gst_object_ref (ext);
} else {
g_warning ("rtprtxsend (%s) doesn't know how to deal with the "
"RTP Header Extension with URI \'%s\'", GST_OBJECT_NAME (rtx),
gst_rtp_header_extension_get_uri (ext));
}
/* XXX: check for other duplicate ids? */
GST_OBJECT_UNLOCK (rtx);
}
static void
gst_rtp_rtx_send_clear_extensions (GstRtpRtxSend * rtx)
{
GST_OBJECT_LOCK (rtx);
gst_clear_object (&rtx->rid_stream);
gst_clear_object (&rtx->rid_repaired);
GST_OBJECT_UNLOCK (rtx);
}
G_DEFINE_TYPE_WITH_CODE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT, G_DEFINE_TYPE_WITH_CODE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT,
GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0, GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0,
"rtp retransmission sender")); "rtp retransmission sender"));
@ -258,6 +305,38 @@ gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
"Map of payload types to their clock rates", "Map of payload types to their clock rates",
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* rtprtxsend::add-extension:
*
* Add @ext as an extension for writing part of an RTP header extension onto
* outgoing RTP packets. Currently only supports using the following
* extension URIs. All other RTP header extensions are copied as-is.
* - "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id": will be removed
* - "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id": will be
* written instead of the "rtp-stream-id" header extension.
*
* Since: 1.22
*/
gst_rtp_rtx_send_signals[SIGNAL_ADD_EXTENSION] =
g_signal_new_class_handler ("add-extension", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_CALLBACK (gst_rtp_rtx_send_add_extension), NULL, NULL, NULL,
G_TYPE_NONE, 1, GST_TYPE_RTP_HEADER_EXTENSION);
/**
* rtprtxsend::clear-extensions:
* @object: the #GstRTPBasePayload
*
* Clear all RTP header extensions used by this rtprtxsend.
*
* Since: 1.22
*/
gst_rtp_rtx_send_signals[SIGNAL_CLEAR_EXTENSIONS] =
g_signal_new_class_handler ("clear-extensions", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_CALLBACK (gst_rtp_rtx_send_clear_extensions), NULL, NULL, NULL,
G_TYPE_NONE, 0);
gst_element_class_add_static_pad_template (gstelement_class, &src_factory); gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
gst_element_class_add_static_pad_template (gstelement_class, &sink_factory); gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
@ -299,6 +378,11 @@ gst_rtp_rtx_send_finalize (GObject * object)
gst_structure_free (rtx->clock_rate_map_structure); gst_structure_free (rtx->clock_rate_map_structure);
g_object_unref (rtx->queue); g_object_unref (rtx->queue);
gst_clear_object (&rtx->rid_stream);
gst_clear_object (&rtx->rid_repaired);
gst_clear_buffer (&rtx->dummy_writable);
G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object); G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
} }
@ -341,6 +425,8 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
rtx->max_size_time = DEFAULT_MAX_SIZE_TIME; rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS; rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
rtx->dummy_writable = gst_buffer_new ();
} }
static gboolean static gboolean
@ -422,6 +508,162 @@ gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
return data; return data;
} }
static GstMemory *
rewrite_header_extensions (GstRtpRtxSend * rtx, GstRTPBuffer * rtp)
{
gsize out_size = rtp->size[1] + 32;
guint16 bit_pattern;
guint8 *pdata;
guint wordlen;
GstMemory *mem;
GstMapInfo map;
mem = gst_allocator_alloc (NULL, out_size, NULL);
gst_memory_map (mem, &map, GST_MAP_READWRITE);
if (gst_rtp_buffer_get_extension_data (rtp, &bit_pattern, (gpointer) & pdata,
&wordlen)) {
GstRTPHeaderExtensionFlags ext_flags = 0;
gsize bytelen = wordlen * 4;
guint hdr_unit_bytes;
gsize read_offset = 0, write_offset = 4;
if (bit_pattern == 0xBEDE) {
/* one byte extensions */
hdr_unit_bytes = 1;
ext_flags |= GST_RTP_HEADER_EXTENSION_ONE_BYTE;
} else if (bit_pattern >> 4 == 0x100) {
/* two byte extensions */
hdr_unit_bytes = 2;
ext_flags |= GST_RTP_HEADER_EXTENSION_TWO_BYTE;
} else {
GST_DEBUG_OBJECT (rtx, "unknown extension bit pattern 0x%02x%02x",
bit_pattern >> 8, bit_pattern & 0xff);
goto copy_as_is;
}
GST_WRITE_UINT16_BE (map.data, bit_pattern);
while (TRUE) {
guint8 read_id, read_len;
if (read_offset + hdr_unit_bytes >= bytelen)
/* not enough remaning data */
break;
if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) {
read_id = GST_READ_UINT8 (pdata + read_offset) >> 4;
read_len = (GST_READ_UINT8 (pdata + read_offset) & 0x0F) + 1;
read_offset += 1;
if (read_id == 0)
/* padding */
continue;
if (read_id == 15)
/* special id for possible future expansion */
break;
} else {
read_id = GST_READ_UINT8 (pdata + read_offset);
read_offset += 1;
if (read_id == 0)
/* padding */
continue;
read_len = GST_READ_UINT8 (pdata + read_offset);
read_offset += 1;
}
GST_TRACE_OBJECT (rtx, "found rtp header extension with id %u and "
"length %u", read_id, read_len);
/* Ignore extension headers where the size does not fit */
if (read_offset + read_len > bytelen) {
GST_WARNING_OBJECT (rtx, "Extension length extends past the "
"size of the extension data");
break;
}
/* rewrite the rtp-stream-id into a repaired-stream-id */
if (rtx->rid_stream
&& read_id == gst_rtp_header_extension_get_id (rtx->rid_stream)) {
if (!gst_rtp_header_extension_read (rtx->rid_stream, ext_flags,
&pdata[read_offset], read_len, rtx->dummy_writable)) {
GST_WARNING_OBJECT (rtx, "RTP header extension (%s) could "
"not read payloaded data", GST_OBJECT_NAME (rtx->rid_stream));
goto copy_as_is;
}
if (rtx->rid_repaired) {
guint8 write_id = gst_rtp_header_extension_get_id (rtx->rid_repaired);
gsize written;
char *rid;
g_object_get (rtx->rid_stream, "rid", &rid, NULL);
g_object_set (rtx->rid_repaired, "rid", rid, NULL);
g_clear_pointer (&rid, g_free);
written =
gst_rtp_header_extension_write (rtx->rid_repaired, rtp->buffer,
ext_flags, rtx->dummy_writable,
&map.data[write_offset + hdr_unit_bytes],
map.size - write_offset - hdr_unit_bytes);
GST_TRACE_OBJECT (rtx->rid_repaired, "wrote %" G_GSIZE_FORMAT,
written);
if (written <= 0) {
GST_WARNING_OBJECT (rtx, "Failed to rewrite RID for RTX");
goto copy_as_is;
} else {
if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) {
map.data[write_offset] =
((write_id & 0x0F) << 4) | ((written - 1) & 0x0F);
} else if (ext_flags & GST_RTP_HEADER_EXTENSION_TWO_BYTE) {
map.data[write_offset] = write_id & 0xFF;
map.data[write_offset + 1] = written & 0xFF;
} else {
g_assert_not_reached ();
goto copy_as_is;
}
write_offset += written + hdr_unit_bytes;
}
}
} else {
/* TODO: may need to write mid at different times to the original
* buffer to account for the difference in timing of acknowledgement
* of the rtx ssrc from the original ssrc. This may add extra data to
* the header extension space that needs to be accounted for.
*/
memcpy (&map.data[write_offset],
&map.data[read_offset - hdr_unit_bytes], read_len + hdr_unit_bytes);
write_offset += read_len + hdr_unit_bytes;
}
read_offset += read_len;
}
/* subtract the ext header */
wordlen = write_offset / 4 + ((write_offset % 4) ? 1 : 0);
/* wordlen in the ext data doesn't include the 4-byte header */
GST_WRITE_UINT16_BE (map.data + 2, wordlen - 1);
if (wordlen * 4 > write_offset)
memset (&map.data[write_offset], 0, wordlen * 4 - write_offset);
GST_MEMDUMP_OBJECT (rtx, "generated ext data", map.data, wordlen * 4);
} else {
copy_as_is:
wordlen = rtp->size[1] / 4;
memcpy (map.data, rtp->data[1], rtp->size[1]);
GST_LOG_OBJECT (rtx, "copying data as-is");
}
gst_memory_unmap (mem, &map);
gst_memory_resize (mem, 0, wordlen * 4);
return mem;
}
/* Copy fixed header and extension. Add OSN before to copy payload /* Copy fixed header and extension. Add OSN before to copy payload
* Copy memory to avoid to manually copy each rtp buffer field. * Copy memory to avoid to manually copy each rtp buffer field.
*/ */
@ -462,10 +704,7 @@ gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
/* copy extension if any */ /* copy extension if any */
if (rtp.size[1]) { if (rtp.size[1]) {
mem = gst_allocator_alloc (NULL, rtp.size[1], NULL); mem = rewrite_header_extensions (rtx, &rtp);
gst_memory_map (mem, &map, GST_MAP_WRITE);
memcpy (map.data, rtp.data[1], rtp.size[1]);
gst_memory_unmap (mem, &map);
gst_buffer_append_memory (new_buffer, mem); gst_buffer_append_memory (new_buffer, mem);
} }

View file

@ -25,7 +25,7 @@
#define __GST_RTP_RTX_SEND_H__ #define __GST_RTP_RTX_SEND_H__
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/rtp/gstrtpbuffer.h> #include <gst/rtp/rtp.h>
#include <gst/base/gstdataqueue.h> #include <gst/base/gstdataqueue.h>
G_BEGIN_DECLS G_BEGIN_DECLS
@ -77,6 +77,12 @@ struct _GstRtpRtxSend
/* statistics */ /* statistics */
guint num_rtx_requests; guint num_rtx_requests;
guint num_rtx_packets; guint num_rtx_packets;
/* list of relevant RTP Header Extensions */
GstRTPHeaderExtension *rid_stream;
GstRTPHeaderExtension *rid_repaired;
GstBuffer *dummy_writable;
}; };
struct _GstRtpRtxSendClass struct _GstRtpRtxSendClass

View file

@ -20,7 +20,7 @@
*/ */
#include <gst/check/gstcheck.h> #include <gst/check/gstcheck.h>
#include <gst/check/gstharness.h> #include <gst/check/gstharness.h>
#include <gst/rtp/gstrtpbuffer.h> #include <gst/rtp/rtp.h>
#define verify_buf(buf, is_rtx, expected_ssrc, expted_pt, expected_seqnum) \ #define verify_buf(buf, is_rtx, expected_ssrc, expted_pt, expected_seqnum) \
G_STMT_START { \ G_STMT_START { \
@ -94,6 +94,20 @@ compare_rtp_packets (GstBuffer * a, GstBuffer * b)
gst_rtp_buffer_get_payload (&rtp_b), gst_rtp_buffer_get_payload (&rtp_b),
gst_rtp_buffer_get_payload_len (&rtp_a)), 0); gst_rtp_buffer_get_payload_len (&rtp_a)), 0);
if (gst_rtp_buffer_get_extension (&rtp_a)) {
guint16 ext_bits_a, ext_bits_b;
guint8 *ext_data_a, *ext_data_b;
guint wordlen_a, wordlen_b;
fail_unless_equals_int (TRUE, gst_rtp_buffer_get_extension_data (&rtp_a,
&ext_bits_a, (gpointer) & ext_data_a, &wordlen_a));
fail_unless_equals_int (TRUE, gst_rtp_buffer_get_extension_data (&rtp_b,
&ext_bits_b, (gpointer) & ext_data_b, &wordlen_b));
fail_unless_equals_int (ext_bits_a, ext_bits_b);
fail_unless_equals_int (wordlen_a, wordlen_b);
fail_unless_equals_int (0, memcmp (ext_data_a, ext_data_b, wordlen_a * 4));
}
gst_rtp_buffer_unmap (&rtp_a); gst_rtp_buffer_unmap (&rtp_a);
gst_rtp_buffer_unmap (&rtp_b); gst_rtp_buffer_unmap (&rtp_b);
} }
@ -913,6 +927,132 @@ GST_START_TEST (test_rtxsender_clock_rate_map)
GST_END_TEST; GST_END_TEST;
#define RTPHDREXT_STREAM_ID GST_RTP_HDREXT_BASE "sdes:rtp-stream-id"
#define RTPHDREXT_REPAIRED_STREAM_ID GST_RTP_HDREXT_BASE "sdes:repaired-rtp-stream-id"
GST_START_TEST (test_rtxsend_header_extensions)
{
const guint packets_num = 5;
guint master_ssrc = 1234567;
guint master_pt = 96;
guint rtx_pt = 99;
GstStructure *pt_map;
GstBuffer *inbufs[5];
GstHarness *hrecv = gst_harness_new ("rtprtxreceive");
GstHarness *hsend = gst_harness_new ("rtprtxsend");
GstRTPHeaderExtension *send_stream_id, *send_repaired_stream_id;
GstRTPHeaderExtension *recv_stream_id, *recv_repaired_stream_id;
guint stream_hdr_id = 1, repaired_hdr_id = 2;
gint i;
pt_map = gst_structure_new ("application/x-rtp-pt-map",
"96", G_TYPE_UINT, rtx_pt, NULL);
g_object_set (hrecv->element, "payload-type-map", pt_map, NULL);
g_object_set (hsend->element, "payload-type-map", pt_map, NULL);
gst_harness_set_src_caps_str (hsend, "application/x-rtp, "
"media = (string)video, payload = (int)96, "
"ssrc = (uint)1234567, clock-rate = (int)90000, "
"encoding-name = (string)RAW");
gst_harness_set_src_caps_str (hrecv, "application/x-rtp, "
"media = (string)video, payload = (int)96, "
"ssrc = (uint)1234567, clock-rate = (int)90000, "
"encoding-name = (string)RAW");
send_stream_id =
gst_rtp_header_extension_create_from_uri (RTPHDREXT_STREAM_ID);
gst_rtp_header_extension_set_id (send_stream_id, stream_hdr_id);
g_object_set (send_stream_id, "rid", "0", NULL);
fail_unless (send_stream_id != NULL);
g_signal_emit_by_name (hsend->element, "add-extension", send_stream_id);
gst_clear_object (&send_stream_id);
send_repaired_stream_id =
gst_rtp_header_extension_create_from_uri (RTPHDREXT_REPAIRED_STREAM_ID);
g_object_set (send_repaired_stream_id, "rid", "0", NULL);
gst_rtp_header_extension_set_id (send_repaired_stream_id, repaired_hdr_id);
fail_unless (send_repaired_stream_id != NULL);
g_signal_emit_by_name (hsend->element, "add-extension",
send_repaired_stream_id);
gst_clear_object (&send_repaired_stream_id);
recv_stream_id =
gst_rtp_header_extension_create_from_uri (RTPHDREXT_STREAM_ID);
gst_rtp_header_extension_set_id (recv_stream_id, stream_hdr_id);
fail_unless (recv_stream_id != NULL);
g_signal_emit_by_name (hrecv->element, "add-extension", recv_stream_id);
gst_clear_object (&recv_stream_id);
recv_repaired_stream_id =
gst_rtp_header_extension_create_from_uri (RTPHDREXT_REPAIRED_STREAM_ID);
gst_rtp_header_extension_set_id (recv_repaired_stream_id, repaired_hdr_id);
fail_unless (recv_repaired_stream_id != NULL);
g_signal_emit_by_name (hrecv->element, "add-extension",
recv_repaired_stream_id);
gst_clear_object (&recv_repaired_stream_id);
/* Push 'packets_num' packets through rtxsend to rtxreceive */
for (i = 0; i < packets_num; ++i) {
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
inbufs[i] = create_rtp_buffer (master_ssrc, master_pt, 100 + i);
fail_unless (gst_rtp_buffer_map (inbufs[i], GST_MAP_READWRITE, &rtp));
fail_unless (gst_rtp_buffer_add_extension_onebyte_header (&rtp,
stream_hdr_id, "0", 1));
gst_rtp_buffer_unmap (&rtp);
gst_harness_push (hsend, gst_buffer_ref (inbufs[i]));
gst_harness_push (hrecv, gst_harness_pull (hsend));
pull_and_verify (hrecv, FALSE, master_ssrc, master_pt, 100 + i);
}
/* Getting rid of reconfigure event. Preparation before the next step */
gst_event_unref (gst_harness_pull_upstream_event (hrecv));
fail_unless_equals_int (gst_harness_upstream_events_in_queue (hrecv), 0);
/* Push 'packets_num' RTX events through rtxreceive to rtxsend.
Push RTX packets from rtxsend to rtxreceive and
check that the packet produced out of RTX packet is the same
as an original packet */
for (i = 0; i < packets_num; ++i) {
GstBuffer *outbuf;
gst_harness_push_upstream_event (hrecv,
create_rtx_event (master_ssrc, master_pt, 100 + i));
gst_harness_push_upstream_event (hsend,
gst_harness_pull_upstream_event (hrecv));
gst_harness_push (hrecv, gst_harness_pull (hsend));
outbuf = gst_harness_pull (hrecv);
compare_rtp_packets (inbufs[i], outbuf);
gst_buffer_unref (inbufs[i]);
gst_buffer_unref (outbuf);
}
/* Check RTX stats */
{
guint rtx_requests;
guint rtx_packets;
guint rtx_assoc_packets;
g_object_get (G_OBJECT (hsend->element),
"num-rtx-requests", &rtx_requests,
"num-rtx-packets", &rtx_packets, NULL);
fail_unless_equals_int (rtx_packets, packets_num);
fail_unless_equals_int (rtx_requests, packets_num);
g_object_get (G_OBJECT (hrecv->element),
"num-rtx-requests", &rtx_requests,
"num-rtx-packets", &rtx_packets,
"num-rtx-assoc-packets", &rtx_assoc_packets, NULL);
fail_unless_equals_int (rtx_packets, packets_num);
fail_unless_equals_int (rtx_requests, packets_num);
fail_unless_equals_int (rtx_assoc_packets, packets_num);
}
gst_structure_free (pt_map);
gst_harness_teardown (hrecv);
gst_harness_teardown (hsend);
}
GST_END_TEST;
static Suite * static Suite *
rtprtx_suite (void) rtprtx_suite (void)
{ {
@ -938,6 +1078,7 @@ rtprtx_suite (void)
tcase_add_test (tc_chain, test_rtxqueue_max_size_packets); tcase_add_test (tc_chain, test_rtxqueue_max_size_packets);
tcase_add_test (tc_chain, test_rtxqueue_max_size_time); tcase_add_test (tc_chain, test_rtxqueue_max_size_time);
tcase_add_test (tc_chain, test_rtxsender_clock_rate_map); tcase_add_test (tc_chain, test_rtxsender_clock_rate_map);
tcase_add_test (tc_chain, test_rtxsend_header_extensions);
return s; return s;
} }