From 591af0f38aa5504e2a9d7a200b6ef52c2733d152 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 6 Oct 2020 03:03:13 +0200 Subject: [PATCH] rtpmanager: implement SMPTE 2022-1 FEC encoder + improve integration of FEC encoders in rtpbin Part-of: --- docs/gst_plugins_cache.json | 116 ++++ gst/rtpmanager/gstrtpbin.c | 183 +++++- gst/rtpmanager/gstrtpbin.h | 3 + gst/rtpmanager/gstrtpmanager.c | 5 + gst/rtpmanager/gstrtpst2022-1-fecenc.c | 766 ++++++++++++++++++++++ gst/rtpmanager/gstrtpst2022-1-fecenc.h | 37 ++ gst/rtpmanager/meson.build | 3 +- tests/check/elements/rtpst2022-1-fecenc.c | 229 +++++++ tests/check/meson.build | 1 + 9 files changed, 1341 insertions(+), 2 deletions(-) create mode 100644 gst/rtpmanager/gstrtpst2022-1-fecenc.c create mode 100644 gst/rtpmanager/gstrtpst2022-1-fecenc.h create mode 100644 tests/check/elements/rtpst2022-1-fecenc.c diff --git a/docs/gst_plugins_cache.json b/docs/gst_plugins_cache.json index b0f18c2333..8d92bbedea 100644 --- a/docs/gst_plugins_cache.json +++ b/docs/gst_plugins_cache.json @@ -16088,6 +16088,11 @@ "direction": "src", "presence": "sometimes" }, + "send_fec_src_%%u_%%u": { + "caps": "application/x-rtp:\n", + "direction": "src", + "presence": "sometimes" + }, "send_rtcp_src_%%u": { "caps": "application/x-rtcp:\napplication/x-srtcp:\n", "direction": "src", @@ -16189,6 +16194,18 @@ "type": "GstStructure", "writable": true }, + "fec-encoders": { + "blurb": "GstStructure mapping from session index to FEC encoder factory, eg fec-encoders='fec,0=\"rtpst2022-1-fecenc\\ rows\\=5\\ columns\\=5\";'", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "application/x-rtp-fec-encoders;", + "mutable": "null", + "readable": true, + "type": "GstStructure", + "writable": true + }, "ignore-pt": { "blurb": "Do not demultiplex based on PT values", "conditionally-available": false, @@ -18291,6 +18308,105 @@ } }, "rank": "none" + }, + "rtpst2022-1-fecenc": { + "author": "Mathieu Duponchelle ", + "description": "performs FEC as described by SMPTE 2022-1", + "hierarchy": [ + "GstRTPST_2022_1_FecEnc", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "SMPTE 2022-1 FEC encoding", + "long-name": "SMPTE 2022-1 FEC encoder", + "pad-templates": { + "fec_%%u": { + "caps": "application/x-rtp:\n", + "direction": "src", + "presence": "sometimes" + }, + "sink": { + "caps": "application/x-rtp:\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "columns": { + "blurb": "Number of columns to apply row FEC on, 0=disabled", + "conditionally-available": false, + "construct": true, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "255", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + }, + "enable-column-fec": { + "blurb": "Whether the encoder should compute and send column FEC", + "conditionally-available": false, + "construct": true, + "construct-only": false, + "controllable": false, + "default": "true", + "mutable": "playing", + "readable": true, + "type": "gboolean", + "writable": true + }, + "enable-row-fec": { + "blurb": "Whether the encoder should compute and send row FEC", + "conditionally-available": false, + "construct": true, + "construct-only": false, + "controllable": false, + "default": "true", + "mutable": "playing", + "readable": true, + "type": "gboolean", + "writable": true + }, + "pt": { + "blurb": "The payload type of FEC packets", + "conditionally-available": false, + "construct": true, + "construct-only": false, + "controllable": false, + "default": "96", + "max": "255", + "min": "96", + "mutable": "ready", + "readable": true, + "type": "gint", + "writable": true + }, + "rows": { + "blurb": "Number of rows to apply column FEC on, 0=disabled", + "conditionally-available": false, + "construct": true, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "255", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + } + }, + "rank": "none" } }, "filename": "gstrtpmanager", diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 444a66de58..6b558c0a41 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -182,6 +182,23 @@ GST_STATIC_PAD_TEMPLATE ("recv_fec_sink_%u_%u", GST_STATIC_CAPS ("application/x-rtp") ); +/** + * GstRtpBin!send_fec_src_%u_%u: + * + * Src template for sending Forward Error Correction packets, + * in the form send_fec_src__ + * + * See #GstRTPST_2022_1_FecEnc for example usage + * + * Since: 1.20 + */ +static GstStaticPadTemplate rtpbin_send_fec_src_template = +GST_STATIC_PAD_TEMPLATE ("send_fec_src_%u_%u", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtp") + ); + static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template = GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u", GST_PAD_SINK, @@ -362,6 +379,7 @@ enum PROP_MAX_TS_OFFSET_ADJUSTMENT, PROP_MAX_TS_OFFSET, PROP_FEC_DECODERS, + PROP_FEC_ENCODERS, }; #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type()) @@ -397,6 +415,7 @@ static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session); +static void remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void free_client (GstRtpBinClient * client, GstRtpBin * bin); static void free_stream (GstRtpBinStream * stream, GstRtpBin * bin); @@ -509,6 +528,8 @@ struct _GstRtpBinSession GSList *recv_fec_sinks; GSList *recv_fec_sink_ghosts; GstElement *fec_decoder; + + GSList *send_fec_src_ghosts; }; /* Manages the RTP streams that come from one client and should therefore be @@ -881,6 +902,7 @@ free_session (GstRtpBinSession * sess, GstRtpBin * bin) remove_recv_rtcp (bin, sess); remove_recv_fec (bin, sess); remove_send_rtp (bin, sess); + remove_send_fec (bin, sess); remove_rtcp (bin, sess); gst_bin_remove (GST_BIN_CAST (bin), sess->session); @@ -2688,6 +2710,24 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) "fec-decoders='fec,0=\"rtpst2022-1-fecdec\\ size-time\\=1000000000\";'", GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstRtpBin:fec-encoders: + * + * Used to provide a factory used to build the FEC encoder for a + * given session, as a command line alternative to + * #GstRtpBin::request-fec-encoder. + * + * Expects a GstStructure in the form session_id (gint) -> factory (string) + * + * Since: 1.20 + */ + g_object_class_install_property (gobject_class, PROP_FEC_ENCODERS, + g_param_spec_boxed ("fec-encoders", "Fec Encoders", + "GstStructure mapping from session index to FEC encoder " + "factory, eg " + "fec-encoders='fec,0=\"rtpst2022-1-fecenc\\ rows\\=5\\ columns\\=5\";'", + GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); @@ -2710,6 +2750,8 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) &rtpbin_send_rtcp_src_template); gst_element_class_add_static_pad_template (gstelement_class, &rtpbin_send_rtp_src_template); + gst_element_class_add_static_pad_template (gstelement_class, + &rtpbin_send_fec_src_template); gst_element_class_set_static_metadata (gstelement_class, "RTP Bin", "Filter/Network/RTP", @@ -2778,6 +2820,8 @@ gst_rtp_bin_init (GstRtpBin * rtpbin) "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL); rtpbin->fec_decoders = gst_structure_new_empty ("application/x-rtp-fec-decoders"); + rtpbin->fec_encoders = + gst_structure_new_empty ("application/x-rtp-fec-encoders"); g_free (cname); } @@ -2811,6 +2855,9 @@ gst_rtp_bin_finalize (GObject * object) if (rtpbin->fec_decoders) gst_structure_free (rtpbin->fec_decoders); + if (rtpbin->fec_encoders) + gst_structure_free (rtpbin->fec_encoders); + g_mutex_clear (&rtpbin->priv->bin_lock); g_mutex_clear (&rtpbin->priv->dyn_lock); @@ -2862,6 +2909,25 @@ gst_rtp_bin_set_fec_decoders_struct (GstRtpBin * bin, GST_RTP_BIN_UNLOCK (bin); } +static void +gst_rtp_bin_set_fec_encoders_struct (GstRtpBin * bin, + const GstStructure * encoders) +{ + if (encoders == NULL) + return; + + GST_RTP_BIN_LOCK (bin); + + GST_OBJECT_LOCK (bin); + if (bin->fec_encoders) + gst_structure_free (bin->fec_encoders); + bin->fec_encoders = gst_structure_copy (encoders); + + GST_OBJECT_UNLOCK (bin); + + GST_RTP_BIN_UNLOCK (bin); +} + static GstStructure * gst_rtp_bin_get_sdes_struct (GstRtpBin * bin) { @@ -2886,6 +2952,18 @@ gst_rtp_bin_get_fec_decoders_struct (GstRtpBin * bin) return result; } +static GstStructure * +gst_rtp_bin_get_fec_encoders_struct (GstRtpBin * bin) +{ + GstStructure *result; + + GST_OBJECT_LOCK (bin); + result = gst_structure_copy (bin->fec_encoders); + GST_OBJECT_UNLOCK (bin); + + return result; +} + static void gst_rtp_bin_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) @@ -3052,6 +3130,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, case PROP_FEC_DECODERS: gst_rtp_bin_set_fec_decoders_struct (rtpbin, g_value_get_boxed (value)); break; + case PROP_FEC_ENCODERS: + gst_rtp_bin_set_fec_encoders_struct (rtpbin, g_value_get_boxed (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3149,6 +3230,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id, case PROP_FEC_DECODERS: g_value_take_boxed (value, gst_rtp_bin_get_fec_decoders_struct (rtpbin)); break; + case PROP_FEC_ENCODERS: + g_value_take_boxed (value, gst_rtp_bin_get_fec_encoders_struct (rtpbin)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -4564,6 +4648,88 @@ setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session, return res == GST_ITERATOR_DONE; } +static void +fec_encoder_pad_added_cb (GstElement * encoder, GstPad * pad, + GstRtpBinSession * session) +{ + GstElementClass *klass; + gchar *gname; + GstPadTemplate *templ; + guint fec_idx; + GstPad *ghost; + + if (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) != 1) { + GST_WARNING_OBJECT (session->bin, + "FEC encoder added pad with name not matching fec_%%u (%s)", + GST_PAD_NAME (pad)); + goto done; + } + + GST_INFO_OBJECT (session->bin, "FEC encoder for session %u exposed new pad", + session->id); + + GST_RTP_BIN_LOCK (session->bin); + klass = GST_ELEMENT_GET_CLASS (session->bin); + gname = g_strdup_printf ("send_fec_src_%u_%u", session->id, fec_idx); + templ = gst_element_class_get_pad_template (klass, "send_fec_src_%u_%u"); + ghost = gst_ghost_pad_new_from_template (gname, pad, templ); + session->send_fec_src_ghosts = + g_slist_prepend (session->send_fec_src_ghosts, ghost); + gst_pad_set_active (ghost, TRUE); + gst_pad_sticky_events_foreach (pad, copy_sticky_events, ghost); + gst_element_add_pad (GST_ELEMENT (session->bin), ghost); + g_free (gname); + GST_RTP_BIN_UNLOCK (session->bin); + +done: + return; +} + +static GstElement * +request_fec_encoder (GstRtpBin * rtpbin, GstRtpBinSession * session, + guint sessid) +{ + GstElement *ret = NULL; + const gchar *factory; + gchar *sess_id_str; + + sess_id_str = g_strdup_printf ("%u", sessid); + factory = gst_structure_get_string (rtpbin->fec_encoders, sess_id_str); + g_free (sess_id_str); + + /* First try the property */ + if (factory) { + GError *err = NULL; + + ret = + gst_parse_bin_from_description_full (factory, TRUE, NULL, + GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS, + &err); + if (!ret) { + GST_ERROR_OBJECT (rtpbin, "Failed to build encoder from factory: %s", + err->message); + goto done; + } + + bin_manage_element (session->bin, ret); + session->elements = g_slist_prepend (session->elements, ret); + GST_INFO_OBJECT (rtpbin, "Built FEC encoder: %" GST_PTR_FORMAT + " for session %u", ret, sessid); + } + + /* Fallback to the signal */ + if (!ret) + ret = session_request_element (session, SIGNAL_REQUEST_FEC_ENCODER); + + if (ret) { + g_signal_connect (ret, "pad-added", G_CALLBACK (fec_encoder_pad_added_cb), + session); + } + +done: + return ret; +} + /* Create a pad for sending RTP for the session in @name. Must be called with * RTP_BIN_LOCK. */ @@ -4599,7 +4765,7 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->send_rtp_sink != NULL) goto existing_session; - encoder = session_request_element (session, SIGNAL_REQUEST_FEC_ENCODER); + encoder = request_fec_encoder (rtpbin, session, sessid); if (encoder) { GST_DEBUG_OBJECT (rtpbin, "Linking FEC encoder"); @@ -4751,6 +4917,21 @@ remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) } } +static void +remove_send_fec (GstRtpBin * rtpbin, GstRtpBinSession * session) +{ + GSList *tmp; + + for (tmp = session->send_fec_src_ghosts; tmp; tmp = tmp->next) { + GstPad *ghost = GST_PAD (tmp->data); + gst_pad_set_active (ghost, FALSE); + gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost); + } + + g_slist_free (session->send_fec_src_ghosts); + session->send_fec_src_ghosts = NULL; +} + /* Create a pad for sending RTCP for the session in @name. Must be called with * RTP_BIN_LOCK. */ diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index 58de860a4a..1185c40e84 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -91,6 +91,9 @@ struct _GstRtpBin { /* the default FEC decoder factories for sessions */ GstStructure *fec_decoders; + /* the default FEC encoder factories for sessions */ + GstStructure *fec_encoders; + /*< private >*/ GstRtpBinPrivate *priv; }; diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c index 91b6b653ac..35f131b4d5 100644 --- a/gst/rtpmanager/gstrtpmanager.c +++ b/gst/rtpmanager/gstrtpmanager.c @@ -33,6 +33,7 @@ #include "gstrtpmux.h" #include "gstrtpfunnel.h" #include "gstrtpst2022-1-fecdec.h" +#include "gstrtpst2022-1-fecenc.h" static gboolean plugin_init (GstPlugin * plugin) @@ -79,6 +80,10 @@ plugin_init (GstPlugin * plugin) GST_TYPE_RTPST_2022_1_FECDEC)) return FALSE; + if (!gst_element_register (plugin, "rtpst2022-1-fecenc", GST_RANK_NONE, + GST_TYPE_RTPST_2022_1_FECENC)) + return FALSE; + return TRUE; } diff --git a/gst/rtpmanager/gstrtpst2022-1-fecenc.c b/gst/rtpmanager/gstrtpst2022-1-fecenc.c new file mode 100644 index 0000000000..a93f4186af --- /dev/null +++ b/gst/rtpmanager/gstrtpst2022-1-fecenc.c @@ -0,0 +1,766 @@ +/* GStreamer + * Copyright (C) <2020> Mathieu Duponchelle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-rtpst2022-1-fecenc + * @see_also: #element-rtpst2022-1-fecdec + * + * This element takes as input a media stream and up to two FEC + * streams as described in SMPTE 2022-1: Forward Error Correction + * for Real-Time Video/Audio Transport Over IP Networks, and makes + * use of the FEC packets to recover media packets that may have + * gotten lost. + * + * ## sender / receiver example + * + * ``` shell + * gst-launch-1.0 \ + * rtpbin name=rtp fec-encoders='fec,0="rtpst2022-1-fecenc\ rows\=5\ columns\=5";' \ + * uridecodebin uri=file:///path/to/video/file ! x264enc key-int-max=60 tune=zerolatency ! \ + * queue ! mpegtsmux ! rtpmp2tpay ssrc=0 ! rtp.send_rtp_sink_0 \ + * rtp.send_rtp_src_0 ! udpsink host=127.0.0.1 port=5000 \ + * rtp.send_fec_src_0_0 ! udpsink host=127.0.0.1 port=5002 async=false \ + * rtp.send_fec_src_0_1 ! udpsink host=127.0.0.1 port=5004 async=false + * ``` + * + * ``` shell + * gst-launch-1.0 \ + * rtpbin latency=500 fec-decoders='fec,0="rtpst2022-1-fecdec\ size-time\=1000000000";' name=rtp \ + * udpsrc address=127.0.0.1 port=5002 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_0 \ + * udpsrc address=127.0.0.1 port=5004 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_1 \ + * udpsrc address=127.0.0.1 port=5000 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=mp2t, payload=33" ! \ + * queue ! netsim drop-probability=0.05 ! rtp.recv_rtp_sink_0 \ + * rtp. ! decodebin ! videoconvert ! queue ! autovideosink + * ``` + * + * With the above command line, as the media packet size is constant, + * the fec overhead can be approximated to the number of fec packets + * per 2-d matrix of media packet, here 10 fec packets for each 25 + * media packets. + * + * Increasing the number of rows and columns will decrease the overhead, + * but obviously increase the likelihood of recovery failure for lost + * packets on the receiver side. + * + * Since: 1.20 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +#include "gstrtpst2022-1-fecenc.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtpst_2022_1_fecenc_debug); +#define GST_CAT_DEFAULT gst_rtpst_2022_1_fecenc_debug + +enum +{ + PROP_0, + PROP_COLUMNS, + PROP_ROWS, + PROP_PT, + PROP_ENABLE_COLUMN, + PROP_ENABLE_ROW, +}; + +#define DEFAULT_ROWS 0 +#define DEFAULT_COLUMNS 0 +#define DEFAULT_PT 96 +#define DEFAULT_ENABLE_COLUMN TRUE +#define DEFAULT_ENABLE_ROW TRUE + +typedef struct +{ + guint16 target_media_seq; /* The media seqnum we want to send that packet alongside */ + guint16 seq_base; /* Only used for logging purposes */ + GstBuffer *buffer; +} Item; + +typedef struct +{ + guint8 *xored_payload; + guint32 xored_timestamp; + guint8 xored_pt; + guint16 xored_payload_len; + + guint16 seq_base; + + guint16 payload_len; + guint n_packets; +} FecPacket; + +struct _GstRTPST_2022_1_FecEncClass +{ + GstElementClass class; +}; + +struct _GstRTPST_2022_1_FecEnc +{ + GstElement element; + + GstPad *srcpad; + GstPad *sinkpad; + + /* These pads do not participate in the flow return of the element, + * which should continue working even if the sending of FEC packets + * fails + */ + GstPad *row_fec_srcpad; + GstPad *column_fec_srcpad; + + /* The following fields are only accessed on state change or from the + * streaming thread, and only settable in state < PAUSED */ + + /* N columns */ + guint l; + /* N rows */ + guint d; + + /* Whether we have pushed initial events on the column FEC source pad */ + gboolean column_events_pushed; + + /* The current row FEC packet */ + FecPacket *row; + /* Tracks the row seqnum */ + guint16 row_seq; + /* Whether we have pushed initial events on the row FEC source pad */ + gboolean row_events_pushed; + + /* These two fields are used to enforce input seqnum consecutiveness, + * and to determine when column FEC packets should be pushed */ + gboolean last_media_seqnum_set; + guint16 last_media_seqnum; + + /* This field is used to timestamp our FEC packets, we just piggy back */ + guint32 last_media_timestamp; + + /* The payload type of the FEC packets */ + gint pt; + + /* The following fields can be changed while PLAYING, and are + * protected with the OBJECT_LOCK + */ + /* Tracks the property, can be changed while PLAYING */ + gboolean enable_row; + /* Tracks the property, can be changed while PLAYING */ + gboolean enable_column; + + /* Array of FecPackets, with size enc->l */ + GPtrArray *columns; + /* Index of the current column in the array above */ + guint current_column; + /* Tracks the column seqnum */ + guint16 column_seq; + /* Column FEC packets must be delayed to make them more resilient + * to loss bursts, we store them here */ + GQueue queued_column_packets; +}; + +#define RTP_CAPS "application/x-rtp" + +static GstStaticPadTemplate fec_src_template = +GST_STATIC_PAD_TEMPLATE ("fec_%u", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS (RTP_CAPS)); + +static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (RTP_CAPS)); + +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (RTP_CAPS)); + +#define gst_rtpst_2022_1_fecenc_parent_class parent_class +G_DEFINE_TYPE (GstRTPST_2022_1_FecEnc, gst_rtpst_2022_1_fecenc, + GST_TYPE_ELEMENT); + +static void +free_item (Item * item) +{ + if (item->buffer) + gst_buffer_unref (item->buffer); + + g_free (item); +} + +static void +free_fec_packet (FecPacket * packet) +{ + if (packet->xored_payload) + g_free (packet->xored_payload); + g_free (packet); +} + +static void +_xor_mem (guint8 * restrict dst, const guint8 * restrict src, gsize length) +{ + guint i; + + for (i = 0; i < (length / sizeof (guint64)); ++i) { +#if G_BYTE_ORDER == G_LITTLE_ENDIAN + GST_WRITE_UINT64_LE (dst, + GST_READ_UINT64_LE (dst) ^ GST_READ_UINT64_LE (src)); +#else + GST_WRITE_UINT64_BE (dst, + GST_READ_UINT64_BE (dst) ^ GST_READ_UINT64_BE (src)); +#endif + dst += sizeof (guint64); + src += sizeof (guint64); + } + for (i = 0; i < (length % sizeof (guint64)); ++i) + dst[i] ^= src[i]; +} + +static void +fec_packet_update (FecPacket * fec, GstRTPBuffer * rtp) +{ + if (fec->n_packets == 0) { + fec->seq_base = gst_rtp_buffer_get_seq (rtp); + fec->payload_len = gst_rtp_buffer_get_payload_len (rtp); + fec->xored_payload_len = gst_rtp_buffer_get_payload_len (rtp); + fec->xored_pt = gst_rtp_buffer_get_payload_type (rtp); + fec->xored_timestamp = gst_rtp_buffer_get_timestamp (rtp); + fec->xored_payload = g_malloc (sizeof (guint8) * fec->payload_len); + memcpy (fec->xored_payload, gst_rtp_buffer_get_payload (rtp), + fec->payload_len); + } else { + if (fec->payload_len < gst_rtp_buffer_get_payload_len (rtp)) { + fec->payload_len = gst_rtp_buffer_get_payload_len (rtp); + fec->xored_payload = + g_realloc (fec->xored_payload, sizeof (guint8) * fec->payload_len); + } + + fec->xored_payload_len ^= gst_rtp_buffer_get_payload_len (rtp); + fec->xored_pt ^= gst_rtp_buffer_get_payload_type (rtp); + fec->xored_timestamp ^= gst_rtp_buffer_get_timestamp (rtp); + _xor_mem (fec->xored_payload, gst_rtp_buffer_get_payload (rtp), + gst_rtp_buffer_get_payload_len (rtp)); + } + + fec->n_packets += 1; +} + +static void +push_initial_events (GstRTPST_2022_1_FecEnc * enc, GstPad * pad, + const gchar * id) +{ + gchar *stream_id; + GstCaps *caps; + GstSegment segment; + + stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT (enc), id); + gst_pad_push_event (pad, gst_event_new_stream_start (stream_id)); + g_free (stream_id); + + caps = gst_caps_new_simple ("application/x-rtp", + "payload", G_TYPE_UINT, enc->pt, "ssrc", G_TYPE_UINT, 0, NULL); + gst_pad_push_event (pad, gst_event_new_caps (caps)); + gst_caps_unref (caps); + + gst_segment_init (&segment, GST_FORMAT_TIME); + gst_pad_push_event (pad, gst_event_new_segment (&segment)); +} + +static void +queue_fec_packet (GstRTPST_2022_1_FecEnc * enc, FecPacket * fec, gboolean row) +{ + GstBuffer *buffer = gst_rtp_buffer_new_allocate (fec->payload_len + 16, 0, 0); + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + GstBitWriter bits; + guint8 *data; + + gst_rtp_buffer_map (buffer, GST_MAP_WRITE, &rtp); + data = gst_rtp_buffer_get_payload (&rtp); + memset (data, 0x00, 16); + + gst_bit_writer_init_with_data (&bits, data, 17, FALSE); + + gst_bit_writer_put_bits_uint16 (&bits, fec->seq_base, 16); /* SNBase low bits */ + gst_bit_writer_put_bits_uint16 (&bits, fec->xored_payload_len, 16); /* Length Recovery */ + gst_bit_writer_put_bits_uint8 (&bits, 1, 1); /* E */ + gst_bit_writer_put_bits_uint8 (&bits, fec->xored_pt, 7); /* PT recovery */ + gst_bit_writer_put_bits_uint32 (&bits, 0, 24); /* Mask */ + gst_bit_writer_put_bits_uint32 (&bits, fec->xored_timestamp, 32); /* TS recovery */ + gst_bit_writer_put_bits_uint8 (&bits, 0, 1); /* N */ + gst_bit_writer_put_bits_uint8 (&bits, row ? 1 : 0, 1); /* D */ + gst_bit_writer_put_bits_uint8 (&bits, 0, 3); /* type */ + gst_bit_writer_put_bits_uint8 (&bits, 0, 3); /* index */ + gst_bit_writer_put_bits_uint8 (&bits, row ? 1 : enc->l, 8); /* Offset */ + gst_bit_writer_put_bits_uint8 (&bits, fec->n_packets, 8); /* NA */ + gst_bit_writer_put_bits_uint8 (&bits, 0, 8); /* SNBase ext bits */ + + memcpy (data + 16, fec->xored_payload, fec->payload_len); + + gst_bit_writer_reset (&bits); + + gst_rtp_buffer_set_payload_type (&rtp, enc->pt); + gst_rtp_buffer_set_seq (&rtp, row ? enc->row_seq++ : enc->column_seq++); + + /* We're sending it out immediately */ + if (row) + gst_rtp_buffer_set_timestamp (&rtp, enc->last_media_timestamp); + + gst_rtp_buffer_unmap (&rtp); + + /* We can send row FEC packets immediately, column packets need + * delaying by L <= delay < L * D + */ + if (row) { + GstFlowReturn ret; + + GST_LOG_OBJECT (enc, + "Pushing row FEC packet, seq base: %u, media seqnum: %u", + fec->seq_base, enc->last_media_seqnum); + + /* Safe to unlock here */ + GST_OBJECT_UNLOCK (enc); + ret = gst_pad_push (enc->row_fec_srcpad, buffer); + GST_OBJECT_LOCK (enc); + + if (ret != GST_FLOW_OK && ret != GST_FLOW_FLUSHING) + GST_WARNING_OBJECT (enc->row_fec_srcpad, + "Failed to push row FEC packet: %s", gst_flow_get_name (ret)); + } else { + Item *item = g_malloc0 (sizeof (Item)); + + item->buffer = buffer; + item->seq_base = fec->seq_base; + /* Let's get cute and linearize */ + item->target_media_seq = + enc->last_media_seqnum + enc->l - enc->current_column + + enc->d * enc->current_column; + + g_queue_push_tail (&enc->queued_column_packets, item); + } +} + +static GstFlowReturn +gst_rtpst_2022_1_fecenc_sink_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (parent); + GstFlowReturn ret = GST_FLOW_OK; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) { + GST_ERROR_OBJECT (enc, "Chained buffer isn't valid RTP"); + goto error; + } + + if (gst_rtp_buffer_get_ssrc (&rtp) != 0) { + GST_ERROR_OBJECT (enc, "Chained buffer must have SSRC == 0"); + goto error; + } + + if (enc->last_media_seqnum_set + && enc->last_media_seqnum + 1 != gst_rtp_buffer_get_seq (&rtp)) { + GST_ERROR_OBJECT (enc, "consecutive sequence numbers are required"); + goto error; + } + + if (!enc->row_events_pushed) { + push_initial_events (enc, enc->row_fec_srcpad, "row-fec"); + enc->row_events_pushed = TRUE; + } + + if (!enc->column_events_pushed) { + push_initial_events (enc, enc->column_fec_srcpad, "column-fec"); + enc->column_events_pushed = TRUE; + } + + enc->last_media_timestamp = gst_rtp_buffer_get_timestamp (&rtp); + enc->last_media_seqnum = gst_rtp_buffer_get_seq (&rtp); + enc->last_media_seqnum_set = TRUE; + + GST_OBJECT_LOCK (enc); + if (enc->enable_row && enc->l) { + g_assert (enc->row->n_packets < enc->l); + fec_packet_update (enc->row, &rtp); + if (enc->row->n_packets == enc->l) { + queue_fec_packet (enc, enc->row, TRUE); + g_free (enc->row->xored_payload); + memset (enc->row, 0x00, sizeof (FecPacket)); + } + } + + if (enc->enable_column && enc->l && enc->d) { + FecPacket *column = g_ptr_array_index (enc->columns, enc->current_column); + + fec_packet_update (column, &rtp); + if (column->n_packets == enc->d) { + queue_fec_packet (enc, column, FALSE); + g_free (column->xored_payload); + memset (column, 0x00, sizeof (FecPacket)); + } + + enc->current_column++; + enc->current_column %= enc->l; + } + + gst_rtp_buffer_unmap (&rtp); + + if (g_queue_get_length (&enc->queued_column_packets) > 0) { + Item *item = g_queue_peek_head (&enc->queued_column_packets); + + if (item->target_media_seq == enc->last_media_seqnum) { + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + g_queue_pop_head (&enc->queued_column_packets); + GST_LOG_OBJECT (enc, + "Pushing column FEC packet, seq base: %u, media seqnum: %u", + item->seq_base, enc->last_media_seqnum); + gst_rtp_buffer_map (item->buffer, GST_MAP_WRITE, &rtp); + gst_rtp_buffer_set_timestamp (&rtp, enc->last_media_timestamp); + gst_rtp_buffer_unmap (&rtp); + GST_OBJECT_UNLOCK (enc); + ret = + gst_pad_push (enc->column_fec_srcpad, gst_buffer_ref (item->buffer)); + GST_OBJECT_LOCK (enc); + + if (ret != GST_FLOW_OK && ret != GST_FLOW_FLUSHING) + GST_WARNING_OBJECT (enc->column_fec_srcpad, + "Failed to push column FEC packet: %s", gst_flow_get_name (ret)); + + free_item (item); + } + } + GST_OBJECT_UNLOCK (enc); + + ret = gst_pad_push (enc->srcpad, buffer); + +done: + return ret; + +error: + if (rtp.buffer) + gst_rtp_buffer_unmap (&rtp); + gst_buffer_unref (buffer); + ret = GST_FLOW_ERROR; + goto done; +} + +static GstIterator * +gst_rtpst_2022_1_fecenc_iterate_linked_pads (GstPad * pad, GstObject * parent) +{ + GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (parent); + GstPad *otherpad = NULL; + GstIterator *it = NULL; + GValue val = { 0, }; + + if (pad == enc->srcpad) + otherpad = enc->sinkpad; + else if (pad == enc->sinkpad) + otherpad = enc->srcpad; + + if (otherpad) { + g_value_init (&val, GST_TYPE_PAD); + g_value_set_object (&val, otherpad); + it = gst_iterator_new_single (GST_TYPE_PAD, &val); + g_value_unset (&val); + } + + return it; +} + +static void +gst_rtpst_2022_1_fecenc_reset (GstRTPST_2022_1_FecEnc * enc, gboolean allocate) +{ + if (enc->row) { + free_fec_packet (enc->row); + enc->row = NULL; + } + + if (enc->columns) { + g_ptr_array_unref (enc->columns); + enc->columns = NULL; + } + + if (enc->row_fec_srcpad) { + gst_element_remove_pad (GST_ELEMENT (enc), enc->row_fec_srcpad); + enc->row_fec_srcpad = NULL; + } + + if (enc->column_fec_srcpad) { + gst_element_remove_pad (GST_ELEMENT (enc), enc->column_fec_srcpad); + enc->column_fec_srcpad = NULL; + } + + g_queue_clear_full (&enc->queued_column_packets, (GDestroyNotify) free_item); + + if (allocate) { + guint i; + + enc->row = g_malloc0 (sizeof (FecPacket)); + enc->columns = + g_ptr_array_new_full (enc->l, (GDestroyNotify) free_fec_packet); + + for (i = 0; i < enc->l; i++) { + g_ptr_array_add (enc->columns, g_malloc0 (sizeof (FecPacket))); + } + + g_queue_init (&enc->queued_column_packets); + + enc->column_fec_srcpad = + gst_pad_new_from_static_template (&fec_src_template, "fec_0"); + gst_pad_set_active (enc->column_fec_srcpad, TRUE); + gst_pad_set_iterate_internal_links_function (enc->column_fec_srcpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_iterate_linked_pads)); + gst_element_add_pad (GST_ELEMENT (enc), enc->column_fec_srcpad); + + enc->row_fec_srcpad = + gst_pad_new_from_static_template (&fec_src_template, "fec_1"); + gst_pad_set_active (enc->row_fec_srcpad, TRUE); + gst_pad_set_iterate_internal_links_function (enc->row_fec_srcpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_iterate_linked_pads)); + gst_element_add_pad (GST_ELEMENT (enc), enc->row_fec_srcpad); + + gst_element_no_more_pads (GST_ELEMENT (enc)); + } + + enc->current_column = 0; + enc->last_media_seqnum_set = FALSE; +} + +static GstStateChangeReturn +gst_rtpst_2022_1_fecenc_change_state (GstElement * element, + GstStateChange transition) +{ + GstStateChangeReturn ret; + GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (element); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_rtpst_2022_1_fecenc_reset (enc, TRUE); + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rtpst_2022_1_fecenc_reset (enc, FALSE); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + return ret; +} + +static void +gst_rtpst_2022_1_fecenc_finalize (GObject * object) +{ + GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (object); + + gst_rtpst_2022_1_fecenc_reset (enc, FALSE); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_rtpst_2022_1_fecenc_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (object); + + if (GST_STATE (enc) > GST_STATE_READY) { + GST_ERROR_OBJECT (enc, + "rtpst2022-1-fecenc properties can't be changed in PLAYING or PAUSED state"); + return; + } + + switch (prop_id) { + case PROP_COLUMNS: + enc->l = g_value_get_uint (value); + break; + case PROP_ROWS: + enc->d = g_value_get_uint (value); + break; + case PROP_PT: + enc->pt = g_value_get_int (value); + break; + case PROP_ENABLE_COLUMN: + GST_OBJECT_LOCK (enc); + enc->enable_column = g_value_get_boolean (value); + if (!enc->enable_column) { + guint i; + + if (enc->columns) { + for (i = 0; i < enc->l; i++) { + FecPacket *column = g_ptr_array_index (enc->columns, i); + g_free (column->xored_payload); + memset (column, 0x00, sizeof (FecPacket)); + } + } + enc->current_column = 0; + enc->column_seq = 0; + g_queue_clear_full (&enc->queued_column_packets, + (GDestroyNotify) free_item); + } + GST_OBJECT_UNLOCK (enc); + break; + case PROP_ENABLE_ROW: + GST_OBJECT_LOCK (enc); + enc->enable_row = g_value_get_boolean (value); + GST_OBJECT_UNLOCK (enc); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtpst_2022_1_fecenc_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (object); + + switch (prop_id) { + case PROP_COLUMNS: + g_value_set_uint (value, enc->l); + break; + case PROP_ROWS: + g_value_set_uint (value, enc->d); + break; + case PROP_PT: + g_value_set_int (value, enc->pt); + break; + case PROP_ENABLE_COLUMN: + GST_OBJECT_LOCK (enc); + g_value_set_boolean (value, enc->enable_column); + GST_OBJECT_UNLOCK (enc); + break; + case PROP_ENABLE_ROW: + GST_OBJECT_LOCK (enc); + g_value_set_boolean (value, enc->enable_row); + GST_OBJECT_UNLOCK (enc); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gboolean +gst_2d_fec_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (parent); + gboolean ret; + + if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) + gst_rtpst_2022_1_fecenc_reset (enc, TRUE); + + ret = gst_pad_event_default (pad, parent, event); + + return ret; +} + +static void +gst_rtpst_2022_1_fecenc_class_init (GstRTPST_2022_1_FecEncClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + gobject_class->set_property = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_set_property); + gobject_class->get_property = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_get_property); + gobject_class->finalize = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_finalize); + + g_object_class_install_property (gobject_class, PROP_COLUMNS, + g_param_spec_uint ("columns", "Columns", + "Number of columns to apply row FEC on, 0=disabled", 0, + 255, DEFAULT_COLUMNS, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); + + g_object_class_install_property (gobject_class, PROP_ROWS, + g_param_spec_uint ("rows", "Rows", + "Number of rows to apply column FEC on, 0=disabled", 0, + 255, DEFAULT_ROWS, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); + + g_object_class_install_property (gobject_class, PROP_PT, + g_param_spec_int ("pt", "Payload Type", + "The payload type of FEC packets", 96, + 255, DEFAULT_PT, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); + + g_object_class_install_property (gobject_class, PROP_ENABLE_COLUMN, + g_param_spec_boolean ("enable-column-fec", "Enable Column FEC", + "Whether the encoder should compute and send column FEC", + DEFAULT_ENABLE_COLUMN, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_PLAYING)); + + g_object_class_install_property (gobject_class, PROP_ENABLE_ROW, + g_param_spec_boolean ("enable-row-fec", "Enable Row FEC", + "Whether the encoder should compute and send row FEC", + DEFAULT_ENABLE_ROW, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_PLAYING)); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_change_state); + + gst_element_class_set_static_metadata (gstelement_class, + "SMPTE 2022-1 FEC encoder", "SMPTE 2022-1 FEC encoding", + "performs FEC as described by SMPTE 2022-1", + "Mathieu Duponchelle "); + + gst_element_class_add_static_pad_template (gstelement_class, &sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &fec_src_template); + gst_element_class_add_static_pad_template (gstelement_class, &src_template); + + GST_DEBUG_CATEGORY_INIT (gst_rtpst_2022_1_fecenc_debug, + "rtpst2022-1-fecenc", 0, "SMPTE 2022-1 FEC encoder element"); +} + +static void +gst_rtpst_2022_1_fecenc_init (GstRTPST_2022_1_FecEnc * enc) +{ + enc->srcpad = gst_pad_new_from_static_template (&src_template, "src"); + gst_pad_use_fixed_caps (enc->srcpad); + GST_PAD_SET_PROXY_CAPS (enc->srcpad); + gst_pad_set_iterate_internal_links_function (enc->srcpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_iterate_linked_pads)); + gst_element_add_pad (GST_ELEMENT (enc), enc->srcpad); + + enc->sinkpad = gst_pad_new_from_static_template (&sink_template, "sink"); + GST_PAD_SET_PROXY_CAPS (enc->sinkpad); + gst_pad_set_chain_function (enc->sinkpad, gst_rtpst_2022_1_fecenc_sink_chain); + gst_pad_set_event_function (enc->sinkpad, + GST_DEBUG_FUNCPTR (gst_2d_fec_sink_event)); + gst_pad_set_iterate_internal_links_function (enc->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_iterate_linked_pads)); + gst_element_add_pad (GST_ELEMENT (enc), enc->sinkpad); + + enc->d = 0; + enc->l = 0; +} diff --git a/gst/rtpmanager/gstrtpst2022-1-fecenc.h b/gst/rtpmanager/gstrtpst2022-1-fecenc.h new file mode 100644 index 0000000000..cfc8702718 --- /dev/null +++ b/gst/rtpmanager/gstrtpst2022-1-fecenc.h @@ -0,0 +1,37 @@ +/* GStreamer + * Copyright (C) <2020> Mathieu Duponchelle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_RTPST_2022_1_FECENC_H__ +#define __GST_RTPST_2022_1_FECENC_H__ + +#include + +G_BEGIN_DECLS + +typedef struct _GstRTPST_2022_1_FecEncClass GstRTPST_2022_1_FecEncClass; +typedef struct _GstRTPST_2022_1_FecEnc GstRTPST_2022_1_FecEnc; + +#define GST_TYPE_RTPST_2022_1_FECENC (gst_rtpst_2022_1_fecenc_get_type()) +#define GST_RTPST_2022_1_FECENC_CAST(obj) ((GstRTPST_2022_1_FecEnc *)(obj)) + +GType gst_rtpst_2022_1_fecenc_get_type (void); + +G_END_DECLS + +#endif /* __GST_RTPST_2022_1_FECENC_H__ */ diff --git a/gst/rtpmanager/meson.build b/gst/rtpmanager/meson.build index 5cb6084ccb..334b10a210 100644 --- a/gst/rtpmanager/meson.build +++ b/gst/rtpmanager/meson.build @@ -17,7 +17,8 @@ rtpmanager_sources = [ 'rtptwcc.c', 'gstrtpsession.c', 'gstrtpfunnel.c', - 'gstrtpst2022-1-fecdec.c' + 'gstrtpst2022-1-fecdec.c', + 'gstrtpst2022-1-fecenc.c' ] gstrtpmanager = library('gstrtpmanager', diff --git a/tests/check/elements/rtpst2022-1-fecenc.c b/tests/check/elements/rtpst2022-1-fecenc.c new file mode 100644 index 0000000000..9516b64585 --- /dev/null +++ b/tests/check/elements/rtpst2022-1-fecenc.c @@ -0,0 +1,229 @@ +/* GStreamer + * Copyright (C) <2020> Mathieu Duponchelle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include + +typedef struct +{ + guint16 seq; + guint16 len; + guint8 E; + guint8 pt; + guint32 mask; + guint32 timestamp; + guint8 N; + guint8 D; + guint8 type; + guint8 index; + guint8 offset; + guint8 NA; + guint8 seq_ext; + guint8 *payload; + guint payload_len; +} Rtp2DFecHeader; + +static void +parse_header (Rtp2DFecHeader * fec, guint8 * data, guint len) +{ + GstBitReader bits; + + fail_unless (len >= 16); + + gst_bit_reader_init (&bits, data, len); + + fec->seq = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16); + fec->len = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16); + fec->E = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1); + fec->pt = gst_bit_reader_get_bits_uint8_unchecked (&bits, 7); + fec->mask = gst_bit_reader_get_bits_uint32_unchecked (&bits, 24); + fec->timestamp = gst_bit_reader_get_bits_uint32_unchecked (&bits, 32); + fec->N = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1); + fec->D = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1); + fec->type = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3); + fec->index = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3); + fec->offset = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8); + fec->NA = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8); + fec->seq_ext = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8); + fec->payload = data + 16; + fec->payload_len = len - 16; +} + +static GstBuffer * +make_media_sample (guint16 seq, guint32 ts, guint8 * payload, guint payload_len) +{ + GstBuffer *ret; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + guint8 *data; + + ret = gst_rtp_buffer_new_allocate (payload_len, 0, 0); + + gst_rtp_buffer_map (ret, GST_MAP_WRITE, &rtp); + gst_rtp_buffer_set_payload_type (&rtp, 33); + gst_rtp_buffer_set_seq (&rtp, seq); + gst_rtp_buffer_set_timestamp (&rtp, ts); + data = gst_rtp_buffer_get_payload (&rtp); + memcpy (data, payload, payload_len); + gst_rtp_buffer_unmap (&rtp); + + return ret; +} + +static void +pull_and_check (GstHarness * h, guint n_packets, guint16 seq, + guint length_recovery, guint8 pt_recovery, guint32 ts_recovery, + gboolean row, guint8 offset, guint8 NA, guint8 * payload, guint payload_len) +{ + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + GstBuffer *buffer; + Rtp2DFecHeader fec; + + fail_unless_equals_int (gst_harness_buffers_in_queue (h), n_packets); + buffer = gst_harness_pull (h); + fail_unless (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)); + + parse_header (&fec, gst_rtp_buffer_get_payload (&rtp), + gst_rtp_buffer_get_payload_len (&rtp)); + + fail_unless_equals_int (fec.seq, seq); + fail_unless_equals_int (fec.pt, pt_recovery); + fail_unless_equals_int (fec.timestamp, ts_recovery); + fail_unless_equals_int (fec.D, row ? 1 : 0); + fail_unless_equals_int (fec.offset, offset); + fail_unless_equals_int (fec.NA, NA); + fail_unless_equals_int (fec.payload_len, payload_len); + fail_unless_equals_int (memcmp (fec.payload, payload, fec.payload_len), 0); + + gst_rtp_buffer_unmap (&rtp); + gst_buffer_unref (buffer); +} + +GST_START_TEST (test_row) +{ + GstHarness *h, *h_fec_1; + guint8 payload; + GstElement *enc = gst_element_factory_make ("rtpst2022-1-fecenc", NULL); + + g_object_set (enc, "columns", 3, "enable-column-fec", FALSE, NULL); + h = gst_harness_new_with_element (enc, "sink", "src"); + h_fec_1 = gst_harness_new_with_element (h->element, NULL, "fec_1"); + + gst_harness_set_src_caps_str (h, "application/x-rtp"); + + payload = 0x37; + gst_harness_push (h, make_media_sample (0, 0, &payload, 1)); + payload = 0x28; + gst_harness_push (h, make_media_sample (1, 0, &payload, 1)); + payload = 0xff; + gst_harness_push (h, make_media_sample (2, 0, &payload, 1)); + + payload = 0x37 ^ 0x28 ^ 0xff; + pull_and_check (h_fec_1, 1, 0, 1, 33, 0, TRUE, 1, 3, &payload, 1); + + gst_object_unref (enc); + gst_harness_teardown (h); + gst_harness_teardown (h_fec_1); +} + +GST_END_TEST; + +GST_START_TEST (test_columns) +{ + GstHarness *h, *h_fec_0; + guint8 payload; + GstElement *enc = gst_element_factory_make ("rtpst2022-1-fecenc", NULL); + + g_object_set (enc, "columns", 3, "rows", 3, "enable-row-fec", FALSE, NULL); + h = gst_harness_new_with_element (enc, "sink", "src"); + h_fec_0 = gst_harness_new_with_element (h->element, NULL, "fec_0"); + + gst_harness_set_src_caps_str (h, "application/x-rtp"); + + payload = 0x37; + gst_harness_push (h, make_media_sample (0, 0, &payload, 1)); + payload = 0x28; + gst_harness_push (h, make_media_sample (1, 0, &payload, 1)); + payload = 0xff; + gst_harness_push (h, make_media_sample (2, 0, &payload, 1)); + payload = 0xde; + gst_harness_push (h, make_media_sample (3, 0, &payload, 1)); + payload = 0xad; + gst_harness_push (h, make_media_sample (4, 0, &payload, 1)); + payload = 0xbe; + gst_harness_push (h, make_media_sample (5, 0, &payload, 1)); + payload = 0xef; + gst_harness_push (h, make_media_sample (6, 0, &payload, 1)); + payload = 0x58; + gst_harness_push (h, make_media_sample (7, 0, &payload, 1)); + payload = 0x92; + gst_harness_push (h, make_media_sample (8, 0, &payload, 1)); + + /* Let's check distribution of the column FEC over the repair window + * We should receive column FEC packets upon pushing buffers with + * seqnums 9, 12 and 15 + */ + + /* At this point no column FEC should have been put out */ + fail_unless_equals_int (gst_harness_buffers_in_queue (h_fec_0), 0); + + /* Now push the first buffer in the second 3 x 3 grid, it's at + * this point we expect to receive our first column FEC packet + */ + gst_harness_push (h, make_media_sample (9, 0, &payload, 1)); + payload = 0x37 ^ 0xde ^ 0xef; + pull_and_check (h_fec_0, 1, 0, 1, 33, 0, FALSE, 3, 3, &payload, 1); + + gst_harness_push (h, make_media_sample (10, 0, &payload, 1)); + gst_harness_push (h, make_media_sample (11, 0, &payload, 1)); + fail_unless_equals_int (gst_harness_buffers_in_queue (h_fec_0), 0); + gst_harness_push (h, make_media_sample (12, 0, &payload, 1)); + payload = 0x28 ^ 0xad ^ 0x58; + pull_and_check (h_fec_0, 1, 1, 1, 33, 0, FALSE, 3, 3, &payload, 1); + + gst_harness_push (h, make_media_sample (13, 0, &payload, 1)); + gst_harness_push (h, make_media_sample (14, 0, &payload, 1)); + fail_unless_equals_int (gst_harness_buffers_in_queue (h_fec_0), 0); + gst_harness_push (h, make_media_sample (15, 0, &payload, 1)); + payload = 0xff ^ 0xbe ^ 0x92; + pull_and_check (h_fec_0, 1, 2, 1, 33, 0, FALSE, 3, 3, &payload, 1); + + gst_object_unref (enc); + gst_harness_teardown (h); + gst_harness_teardown (h_fec_0); +} + +GST_END_TEST; + +static Suite * +st2022_1_dec_suite (void) +{ + Suite *s = suite_create ("rtpst2022-1-fecdec"); + TCase *tc_chain = tcase_create ("general"); + + suite_add_tcase (s, tc_chain); + + tcase_add_test (tc_chain, test_row); + tcase_add_test (tc_chain, test_columns); + + return s; +} + +GST_CHECK_MAIN (st2022_1_dec) diff --git a/tests/check/meson.build b/tests/check/meson.build index 6f8dd9cdb7..2244a03674 100644 --- a/tests/check/meson.build +++ b/tests/check/meson.build @@ -89,6 +89,7 @@ good_tests = [ [ 'elements/rtpssrcdemux' ], [ 'elements/rtp-payloading' ], [ 'elements/rtpst2022-1-fecdec' ], + [ 'elements/rtpst2022-1-fecenc' ], [ 'elements/spectrum', false, [gstfft_dep] ], [ 'elements/shapewipe' ], [ 'elements/udpsink' ],