rtp/redenc|ulpfecenc: add support for TWCC

In redenc, when input buffers have a header for the TWCC extension,
we now add one to our wrapper buffers.

In ulpfecenc we add one in that case to our protection buffers.

This makes TWCC functional when UlpRed is used in webrtcbin.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1414>
This commit is contained in:
Mathieu Duponchelle 2021-12-03 02:52:06 +01:00 committed by GStreamer Marge Bot
parent 49055f1cd5
commit 5dc280de9f
5 changed files with 211 additions and 6 deletions

View file

@ -185,6 +185,27 @@ _alloc_red_packet_and_fill_headers (GstRtpRedEnc * self,
rtp_red_block_set_payload_type (red_block_header, rtp_red_block_set_payload_type (red_block_header,
gst_rtp_buffer_get_payload_type (inp_rtp)); gst_rtp_buffer_get_payload_type (inp_rtp));
/* FIXME: remove that logic once https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/923
* has been addressed. */
if (self->twcc_ext_id != 0) {
guint8 appbits;
gpointer inp_data;
guint inp_size;
guint16 data;
/* If the input buffer was meant to hold a TWCC seqnum, we also do that
* for our wrapper */
if (gst_rtp_buffer_get_extension_onebyte_header (inp_rtp, self->twcc_ext_id,
0, &inp_data, &inp_size)) {
gst_rtp_buffer_add_extension_onebyte_header (&red_rtp, 1, &data,
sizeof (guint16));
} else if (gst_rtp_buffer_get_extension_twobytes_header (inp_rtp, &appbits,
self->twcc_ext_id, 0, &inp_data, &inp_size)) {
gst_rtp_buffer_add_extension_twobytes_header (&red_rtp, appbits,
self->twcc_ext_id, &data, sizeof (guint16));
}
}
gst_rtp_buffer_unmap (&red_rtp); gst_rtp_buffer_unmap (&red_rtp);
gst_buffer_copy_into (red, inp_rtp->buffer, GST_BUFFER_COPY_METADATA, 0, -1); gst_buffer_copy_into (red, inp_rtp->buffer, GST_BUFFER_COPY_METADATA, 0, -1);
@ -360,6 +381,31 @@ gst_rtp_red_enc_chain (GstPad G_GNUC_UNUSED * pad, GstObject * parent,
return _push_red_packet (self, &rtp, buffer, redundant_block, distance); return _push_red_packet (self, &rtp, buffer, redundant_block, distance);
} }
static guint8
_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
{
guint i;
guint8 extmap_id = 0;
guint n_fields = gst_structure_n_fields (s);
for (i = 0; i < n_fields; i++) {
const gchar *field_name = gst_structure_nth_field_name (s, i);
if (g_str_has_prefix (field_name, "extmap-")) {
const gchar *str = gst_structure_get_string (s, field_name);
if (str && g_strcmp0 (str, ext_name) == 0) {
gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
if (id > 0 && id < 15) {
extmap_id = id;
break;
}
}
}
}
return extmap_id;
}
#define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
static gboolean static gboolean
gst_rtp_red_enc_event_sink (GstPad * pad, GstObject * parent, GstEvent * event) gst_rtp_red_enc_event_sink (GstPad * pad, GstObject * parent, GstEvent * event)
{ {
@ -368,12 +414,18 @@ gst_rtp_red_enc_event_sink (GstPad * pad, GstObject * parent, GstEvent * event)
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS: case GST_EVENT_CAPS:
{ {
GstCaps *caps;
GstStructure *s;
gboolean replace_with_red_caps = gboolean replace_with_red_caps =
self->is_current_caps_red || self->allow_no_red_blocks; self->is_current_caps_red || self->allow_no_red_blocks;
if (replace_with_red_caps) {
GstCaps *caps;
gst_event_parse_caps (event, &caps); gst_event_parse_caps (event, &caps);
s = gst_caps_get_structure (caps, 0);
self->twcc_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
GST_INFO_OBJECT (self, "TWCC extension ID: %u", self->twcc_ext_id);
if (replace_with_red_caps) {
gst_event_take (&event, _create_caps_event (caps, self->pt)); gst_event_take (&event, _create_caps_event (caps, self->pt));
self->is_current_caps_red = TRUE; self->is_current_caps_red = TRUE;

View file

@ -56,6 +56,7 @@ struct _GstRtpRedEnc {
GQueue *rtp_history; GQueue *rtp_history;
gboolean send_caps; gboolean send_caps;
gboolean is_current_caps_red; gboolean is_current_caps_red;
guint8 twcc_ext_id;
}; };
GType gst_rtp_red_enc_get_type (void); GType gst_rtp_red_enc_get_type (void);

View file

@ -331,7 +331,8 @@ gst_rtp_ulpfec_enc_stream_ctx_prepend_to_fec_buffer (GstRtpUlpFecEncStreamCtx *
static GstFlowReturn static GstFlowReturn
gst_rtp_ulpfec_enc_stream_ctx_push_fec_packets (GstRtpUlpFecEncStreamCtx * ctx, gst_rtp_ulpfec_enc_stream_ctx_push_fec_packets (GstRtpUlpFecEncStreamCtx * ctx,
guint8 pt, guint16 seq, guint32 timestamp, guint32 ssrc) guint8 pt, guint16 seq, guint32 timestamp, guint32 ssrc, guint8 twcc_ext_id,
GstRTPHeaderExtensionFlags twcc_ext_flags, guint8 twcc_appbits)
{ {
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
guint fec_packets_num = guint fec_packets_num =
@ -351,6 +352,31 @@ gst_rtp_ulpfec_enc_stream_ctx_push_fec_packets (GstRtpUlpFecEncStreamCtx * ctx,
gst_buffer_copy_into (fec, latest_packet, GST_BUFFER_COPY_TIMESTAMPS, 0, gst_buffer_copy_into (fec, latest_packet, GST_BUFFER_COPY_TIMESTAMPS, 0,
-1); -1);
/* If buffers in the stream we are protecting were meant to hold a TWCC seqnum,
* we also indicate that our protection buffers need one. At this point no seqnum
* has actually been set, we thus don't need to rewrite seqnums, simply indicate
* to RTPSession that the FEC buffers need one too */
/* FIXME: remove this logic once https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/923
* is addressed */
if (twcc_ext_id != 0) {
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
guint16 data;
if (!gst_rtp_buffer_map (fec, GST_MAP_READWRITE, &rtp))
g_assert_not_reached ();
if (twcc_ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) {
gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id,
&data, sizeof (guint16));
} else if (twcc_ext_flags & GST_RTP_HEADER_EXTENSION_TWO_BYTE) {
gst_rtp_buffer_add_extension_twobytes_header (&rtp, twcc_appbits,
twcc_ext_id, &data, sizeof (guint16));
}
gst_rtp_buffer_unmap (&rtp);
}
ret = gst_pad_push (ctx->srcpad, fec); ret = gst_pad_push (ctx->srcpad, fec);
if (GST_FLOW_OK == ret) if (GST_FLOW_OK == ret)
++fec_packets_pushed; ++fec_packets_pushed;
@ -468,12 +494,14 @@ gst_rtp_ulpfec_enc_stream_ctx_free (GstRtpUlpFecEncStreamCtx * ctx)
static GstFlowReturn static GstFlowReturn
gst_rtp_ulpfec_enc_stream_ctx_process (GstRtpUlpFecEncStreamCtx * ctx, gst_rtp_ulpfec_enc_stream_ctx_process (GstRtpUlpFecEncStreamCtx * ctx,
GstBuffer * buffer) GstBuffer * buffer, guint8 twcc_ext_id)
{ {
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
GstFlowReturn ret; GstFlowReturn ret;
gboolean push_fec = FALSE; gboolean push_fec = FALSE;
gboolean empty_packet_buffer = FALSE; gboolean empty_packet_buffer = FALSE;
GstRTPHeaderExtensionFlags twcc_ext_flags = 0;
guint8 twcc_appbits = 0;
ctx->num_packets_received++; ctx->num_packets_received++;
@ -490,6 +518,21 @@ gst_rtp_ulpfec_enc_stream_ctx_process (GstRtpUlpFecEncStreamCtx * ctx,
g_assert_not_reached (); g_assert_not_reached ();
} }
if (twcc_ext_id != 0) {
gpointer data;
guint size;
if (gst_rtp_buffer_get_extension_onebyte_header (&rtp, twcc_ext_id, 0,
&data, &size)) {
twcc_ext_flags |= GST_RTP_HEADER_EXTENSION_ONE_BYTE;
} else if (gst_rtp_buffer_get_extension_twobytes_header (&rtp,
&twcc_appbits, twcc_ext_id, 0, &data, &size)) {
twcc_ext_flags |= GST_RTP_HEADER_EXTENSION_TWO_BYTE;
} else {
twcc_ext_id = 0;
}
}
gst_rtp_ulpfec_enc_stream_ctx_cache_packet (ctx, &rtp, &empty_packet_buffer, gst_rtp_ulpfec_enc_stream_ctx_cache_packet (ctx, &rtp, &empty_packet_buffer,
&push_fec); &push_fec);
@ -504,7 +547,7 @@ gst_rtp_ulpfec_enc_stream_ctx_process (GstRtpUlpFecEncStreamCtx * ctx,
if (GST_FLOW_OK == ret) if (GST_FLOW_OK == ret)
ret = ret =
gst_rtp_ulpfec_enc_stream_ctx_push_fec_packets (ctx, ctx->pt, fec_seq, gst_rtp_ulpfec_enc_stream_ctx_push_fec_packets (ctx, ctx->pt, fec_seq,
fec_timestamp, fec_ssrc); fec_timestamp, fec_ssrc, twcc_ext_id, twcc_ext_flags, twcc_appbits);
} else { } else {
gst_rtp_buffer_unmap (&rtp); gst_rtp_buffer_unmap (&rtp);
ret = gst_pad_push (ctx->srcpad, buffer); ret = gst_pad_push (ctx->srcpad, buffer);
@ -558,7 +601,7 @@ gst_rtp_ulpfec_enc_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
ctx = gst_rtp_ulpfec_enc_aquire_ctx (fec, ssrc); ctx = gst_rtp_ulpfec_enc_aquire_ctx (fec, ssrc);
ret = gst_rtp_ulpfec_enc_stream_ctx_process (ctx, buffer); ret = gst_rtp_ulpfec_enc_stream_ctx_process (ctx, buffer, fec->twcc_ext_id);
/* FIXME: does not work for multiple ssrcs */ /* FIXME: does not work for multiple ssrcs */
fec->num_packets_protected = ctx->num_packets_protected; fec->num_packets_protected = ctx->num_packets_protected;
@ -577,6 +620,58 @@ gst_rtp_ulpfec_enc_configure_ctx (gpointer key, gpointer value,
fec->percentage, fec->percentage_important, fec->multipacket); fec->percentage, fec->percentage_important, fec->multipacket);
} }
static guint8
_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
{
guint i;
guint8 extmap_id = 0;
guint n_fields = gst_structure_n_fields (s);
for (i = 0; i < n_fields; i++) {
const gchar *field_name = gst_structure_nth_field_name (s, i);
if (g_str_has_prefix (field_name, "extmap-")) {
const gchar *str = gst_structure_get_string (s, field_name);
if (str && g_strcmp0 (str, ext_name) == 0) {
gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
if (id > 0 && id < 15) {
extmap_id = id;
break;
}
}
}
}
return extmap_id;
}
#define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
static gboolean
gst_rtp_ulpfec_enc_event_sink (GstPad * pad, GstObject * parent,
GstEvent * event)
{
GstRtpUlpFecEnc *self = GST_RTP_ULPFEC_ENC (parent);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
{
GstCaps *caps;
GstStructure *s;
gst_event_parse_caps (event, &caps);
s = gst_caps_get_structure (caps, 0);
self->twcc_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
GST_INFO_OBJECT (self, "TWCC extension ID: %u", self->twcc_ext_id);
break;
}
default:
break;
}
return gst_pad_event_default (pad, parent, event);
}
static void static void
gst_rtp_ulpfec_enc_set_property (GObject * object, guint prop_id, gst_rtp_ulpfec_enc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec) const GValue * value, GParamSpec * pspec)
@ -655,6 +750,8 @@ gst_rtp_ulpfec_enc_init (GstRtpUlpFecEnc * fec)
GST_PAD_SET_PROXY_ALLOCATION (fec->sinkpad); GST_PAD_SET_PROXY_ALLOCATION (fec->sinkpad);
gst_pad_set_chain_function (fec->sinkpad, gst_pad_set_chain_function (fec->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_ulpfec_enc_chain)); GST_DEBUG_FUNCPTR (gst_rtp_ulpfec_enc_chain));
gst_pad_set_event_function (fec->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_ulpfec_enc_event_sink));
gst_element_add_pad (GST_ELEMENT (fec), fec->sinkpad); gst_element_add_pad (GST_ELEMENT (fec), fec->sinkpad);
fec->ssrc_to_ctx = g_hash_table_new_full (NULL, NULL, NULL, fec->ssrc_to_ctx = g_hash_table_new_full (NULL, NULL, NULL,

View file

@ -47,6 +47,7 @@ struct _GstRtpUlpFecEnc {
GstElement parent; GstElement parent;
GstPad *srcpad; GstPad *srcpad;
GstPad *sinkpad; GstPad *sinkpad;
guint8 twcc_ext_id;
GHashTable *ssrc_to_ctx; GHashTable *ssrc_to_ctx;

View file

@ -31,6 +31,7 @@
#define xstr(s) str(s) #define xstr(s) str(s)
#define str(s) #s #define str(s) #s
#define GST_RTP_RED_ENC_CAPS_STR "application/x-rtp, payload=" xstr(PT_MEDIA) #define GST_RTP_RED_ENC_CAPS_STR "application/x-rtp, payload=" xstr(PT_MEDIA)
#define GST_RTP_RED_ENC_TWCC_CAPS_STR "application/x-rtp, extmap-1=http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01, payload=" xstr(PT_MEDIA)
#define _check_red_received(h, expected) \ #define _check_red_received(h, expected) \
G_STMT_START { \ G_STMT_START { \
@ -662,6 +663,58 @@ GST_START_TEST (rtpredenc_with_redundant_block)
GST_END_TEST; GST_END_TEST;
GST_START_TEST (rtpredenc_transport_cc)
{
GstHarness *h = gst_harness_new ("rtpredenc");
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
GstBuffer *bufin;
GstBuffer *bufout;
guint16 data;
gpointer out_data;
guint out_size;
g_object_set (h->element, "pt", PT_RED, "allow-no-red-blocks", TRUE, NULL);
gst_harness_set_src_caps_str (h, GST_RTP_RED_ENC_TWCC_CAPS_STR);
/* When we push in a media buffer with a transport-cc extension, the output
* RED buffer must hold one too */
bufin =
_new_rtp_buffer (TRUE, 0, PT_MEDIA, 0, TIMESTAMP_NTH (0), 0xabe2b0b, 0);
fail_unless (gst_rtp_buffer_map (bufin, GST_MAP_READ, &rtp));
gst_rtp_buffer_add_extension_onebyte_header (&rtp, 1, &data, sizeof (data));
gst_rtp_buffer_unmap (&rtp);
bufout = gst_harness_push_and_pull (h, bufin);
fail_unless (gst_rtp_buffer_map (bufout, GST_MAP_READ, &rtp));
fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp), PT_RED);
fail_unless (gst_rtp_buffer_get_extension_onebyte_header (&rtp, 1, 0,
&out_data, &out_size));
gst_rtp_buffer_unmap (&rtp);
gst_buffer_unref (bufout);
/* And when the input media buffer doesn't hold the extension,
* the output buffer shouldn't either */
bufin =
_new_rtp_buffer (TRUE, 0, PT_MEDIA, 1, TIMESTAMP_NTH (1), 0xabe2b0b, 0);
bufout = gst_harness_push_and_pull (h, bufin);
fail_unless (gst_rtp_buffer_map (bufout, GST_MAP_READ, &rtp));
fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp), PT_RED);
fail_if (gst_rtp_buffer_get_extension_onebyte_header (&rtp, 1, 0, &out_data,
&out_size));
gst_rtp_buffer_unmap (&rtp);
gst_buffer_unref (bufout);
_check_red_sent (h, 2);
gst_harness_teardown (h);
}
GST_END_TEST;
static void static void
rtpredenc_cant_create_red_packet_base_test (GstBuffer * buffer0, rtpredenc_cant_create_red_packet_base_test (GstBuffer * buffer0,
GstBuffer * buffer1) GstBuffer * buffer1)
@ -833,6 +886,7 @@ rtpred_suite (void)
tcase_add_loop_test (tc_chain, rtpredenc_negative_timestamp_offset, 0, 2); tcase_add_loop_test (tc_chain, rtpredenc_negative_timestamp_offset, 0, 2);
tcase_add_loop_test (tc_chain, rtpredenc_too_large_timestamp_offset, 0, 2); tcase_add_loop_test (tc_chain, rtpredenc_too_large_timestamp_offset, 0, 2);
tcase_add_loop_test (tc_chain, rtpredenc_too_large_length, 0, 2); tcase_add_loop_test (tc_chain, rtpredenc_too_large_length, 0, 2);
tcase_add_test (tc_chain, rtpredenc_transport_cc);
return s; return s;
} }