From f0d04b39ddb23e1f27272971a3dc8ad3f89921bc Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Mon, 21 Jan 2019 11:44:10 -0500 Subject: [PATCH] rist: Add a plugin implenting RIST TR-06-1 Simple Profile RIST TR-06-1 is a specification for video streaming made by the VSF group. It is using a subset of RTP specification to which some modification has been made to improve RTX behaviour and avoid any need for signaling. The plugin implement ristrtxsend / ristrtxreceive element which are the RIST specific equivalent of rtprtxsend/rtprtxreceive and ristsink / ristsrc which implement rist transmitter and receiver. The RIST protocol is meant to be used in unidirectional way. Typically, MPEG TS over RTP is used. Currently we support unicast and multicast streaming according to the specification. This patch does not include any bonding support yet. The ristsrc element introduce rist:// URI handling in parallel to it's property configuration interface. --- configure.ac | 2 + gst/meson.build | 4 +- gst/rist/Makefile.am | 23 + gst/rist/gstrist.h | 59 ++ gst/rist/gstristplugin.c | 50 ++ gst/rist/gstristrtxreceive.c | 302 ++++++++++ gst/rist/gstristrtxsend.c | 772 +++++++++++++++++++++++++ gst/rist/gstristsink.c | 838 ++++++++++++++++++++++++++++ gst/rist/gstristsrc.c | 1021 ++++++++++++++++++++++++++++++++++ gst/rist/meson.build | 17 + meson_options.txt | 1 + 11 files changed, 3087 insertions(+), 2 deletions(-) create mode 100644 gst/rist/Makefile.am create mode 100644 gst/rist/gstrist.h create mode 100644 gst/rist/gstristplugin.c create mode 100644 gst/rist/gstristrtxreceive.c create mode 100644 gst/rist/gstristrtxsend.c create mode 100644 gst/rist/gstristsink.c create mode 100644 gst/rist/gstristsrc.c create mode 100644 gst/rist/meson.build diff --git a/configure.ac b/configure.ac index 5114e36fc9..cb72ee8537 100644 --- a/configure.ac +++ b/configure.ac @@ -478,6 +478,7 @@ AG_GST_CHECK_PLUGIN(pnm) AG_GST_CHECK_PLUGIN(proxy) AG_GST_CHECK_PLUGIN(rawparse) AG_GST_CHECK_PLUGIN(removesilence) +AG_GST_CHECK_PLUGIN(rist) AG_GST_CHECK_PLUGIN(sdp) AG_GST_CHECK_PLUGIN(segmentclip) AG_GST_CHECK_PLUGIN(siren) @@ -2556,6 +2557,7 @@ gst/pnm/Makefile gst/proxy/Makefile gst/rawparse/Makefile gst/removesilence/Makefile +gst/rist/Makefile gst/sdp/Makefile gst/segmentclip/Makefile gst/siren/Makefile diff --git a/gst/meson.build b/gst/meson.build index 8c8349802d..f6b306c1ed 100644 --- a/gst/meson.build +++ b/gst/meson.build @@ -8,8 +8,8 @@ foreach plugin : ['accurip', 'adpcmdec', 'adpcmenc', 'aiff', 'asfmux', 'ivfparse', 'ivtc', 'jp2kdecimator', 'jpegformat', 'librfb', 'midi', 'mpegdemux', 'mpegpsmux', 'mpegtsdemux', 'mpegtsmux', 'mxf', 'netsim', 'onvif', 'pcapparse', 'pnm', 'proxy', - 'rawparse', 'removesilence', 'sdp', 'segmentclip', 'siren', - 'smooth', 'speed', 'subenc', 'timecode', + 'rawparse', 'removesilence', 'rist', 'sdp', 'segmentclip', + 'siren', 'smooth', 'speed', 'subenc', 'timecode', 'videofilters', 'videoframe_audiolevel', 'videoparsers', 'videosignal', 'vmnc', 'y4m', 'yadif'] if not get_option(plugin).disabled() diff --git a/gst/rist/Makefile.am b/gst/rist/Makefile.am new file mode 100644 index 0000000000..d198f3f8e6 --- /dev/null +++ b/gst/rist/Makefile.am @@ -0,0 +1,23 @@ +plugin_LTLIBRARIES = libgstrist.la + +libgstrist_la_SOURCES = \ + gstristsrc.c \ + gstristsink.c \ + gstristrtxsend.c \ + gstristrtxreceive.c \ + gstristplugin.c + +noinst_HEADERS = \ + gstrist.h + +libgstrist_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) \ + $(GST_CFLAGS) \ + $(GIO_CFLAGS) +libgstrist_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) \ + $(GST_BASE_LIBS) \ + -lgstrtp-@GST_API_VERSION@ \ + $(GST_NET_LIBS) \ + $(GST_LIBS) \ + $(GIO_LIBS) +libgstrist_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) +libgstrist_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS) diff --git a/gst/rist/gstrist.h b/gst/rist/gstrist.h new file mode 100644 index 0000000000..b4bcb6f049 --- /dev/null +++ b/gst/rist/gstrist.h @@ -0,0 +1,59 @@ +/* GStreamer RIST plugin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include + +#ifndef __GST_RIST_H__ +#define __GST_RIST_H__ + +#define GST_TYPE_RIST_RTX_RECEIVE (gst_rist_rtx_receive_get_type()) +#define GST_RIST_RTX_RECEIVE(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_RTX_RECEIVE, GstRistRtxReceive)) +typedef struct _GstRistRtxReceive GstRistRtxReceive; +typedef struct { + GstElementClass parent_class; +} GstRistRtxReceiveClass; +GType gst_rist_rtx_receive_get_type (void); + +#define GST_TYPE_RIST_RTX_SEND (gst_rist_rtx_send_get_type()) +#define GST_RIST_RTX_SEND(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_RTX_SEND, GstRistRtxSend)) +typedef struct _GstRistRtxSend GstRistRtxSend; +typedef struct { + GstElementClass parent_class; +} GstRistRtxSendClass; +GType gst_rist_rtx_send_get_type (void); + +#define GST_TYPE_RIST_SRC (gst_rist_src_get_type()) +#define GST_RIST_SRC(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_SRC,GstRistSrc)) +typedef struct _GstRistSrc GstRistSrc; +typedef struct { + GstBinClass parent; +} GstRistSrcClass; +GType gst_rist_src_get_type (void); + + +#define GST_TYPE_RIST_SINK (gst_rist_sink_get_type()) +#define GST_RIST_SINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RIST_SINK,GstRistSink)) +typedef struct _GstRistSink GstRistSink; +typedef struct { + GstBinClass parent; +} GstRistSinkClass; +GType gst_rist_sink_get_type (void); + +#endif diff --git a/gst/rist/gstristplugin.c b/gst/rist/gstristplugin.c new file mode 100644 index 0000000000..cb6dcd361e --- /dev/null +++ b/gst/rist/gstristplugin.c @@ -0,0 +1,50 @@ +/* GStreamer RIST plugin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstrist.h" + +static gboolean +plugin_init (GstPlugin * plugin) +{ + if (!gst_element_register (plugin, "ristsrc", GST_RANK_PRIMARY, + GST_TYPE_RIST_SRC)) + return FALSE; + if (!gst_element_register (plugin, "ristsink", GST_RANK_PRIMARY, + GST_TYPE_RIST_SINK)) + return FALSE; + if (!gst_element_register (plugin, "ristrtxsend", GST_RANK_NONE, + GST_TYPE_RIST_RTX_SEND)) + return FALSE; + if (!gst_element_register (plugin, "ristrtxreceive", GST_RANK_NONE, + GST_TYPE_RIST_RTX_RECEIVE)) + return FALSE; + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + rist, + "Source and Sink for RIST TR-06-1 streaming specification", + plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/gst/rist/gstristrtxreceive.c b/gst/rist/gstristrtxreceive.c new file mode 100644 index 0000000000..fe8a6d5fa7 --- /dev/null +++ b/gst/rist/gstristrtxreceive.c @@ -0,0 +1,302 @@ +/* RTP Retransmission receiver element for GStreamer + * + * gstrtprtxreceive.c: + * + * Copyright (C) 2013-2019 Collabora Ltd. + * @author Julien Isorce + * Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-ristrtxreceive + * @title: ristrtxreceive + * @see_also: ristrtxsend + * + * This element translates RIST RTX packets into its original form with the + * %GST_RTP_BUFFER_FLAG_RETRANSMISSION flag set. This element is intented to + * be used by ristsrc element. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +#include "gstrist.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rist_rtx_receive_debug); +#define GST_CAT_DEFAULT gst_rist_rtx_receive_debug + +enum +{ + PROP_0, + PROP_NUM_RTX_REQUESTS, + PROP_NUM_RTX_PACKETS, + PROP_RIST +}; + +static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +struct _GstRistRtxReceive +{ + GstElement element; + + /* pad */ + GstPad *sinkpad; + GstPad *srcpad; + + /* statistics */ + guint num_rtx_requests; + guint num_rtx_packets; + + GstClockTime last_time; +}; + +static gboolean gst_rist_rtx_receive_src_event (GstPad * pad, + GstObject * parent, GstEvent * event); +static GstFlowReturn gst_rist_rtx_receive_chain (GstPad * pad, + GstObject * parent, GstBuffer * buffer); +static GstStateChangeReturn gst_rist_rtx_receive_change_state (GstElement * + element, GstStateChange transition); +static void gst_rist_rtx_receive_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +G_DEFINE_TYPE_WITH_CODE (GstRistRtxReceive, gst_rist_rtx_receive, + GST_TYPE_ELEMENT, GST_DEBUG_CATEGORY_INIT (gst_rist_rtx_receive_debug, + "ristrtxreceive", 0, "RIST retransmission receiver")); + +static void +gst_rist_rtx_receive_class_init (GstRistRtxReceiveClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + gobject_class->get_property = gst_rist_rtx_receive_get_property; + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS, + g_param_spec_uint ("num-rtx-requests", "Num RTX Requests", + "Number of retransmission events received", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS, + g_param_spec_uint ("num-rtx-packets", "Num RTX Packets", + " Number of retransmission packets received", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_add_static_pad_template (gstelement_class, &src_factory); + gst_element_class_add_static_pad_template (gstelement_class, &sink_factory); + + gst_element_class_set_static_metadata (gstelement_class, + "RIST Retransmission receiver", "Codec", + "Receive retransmitted RIST packets according to VSF TR-06-1", + "Nicolas Dufresne "); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rist_rtx_receive_change_state); +} + +static void +gst_rist_rtx_receive_reset (GstRistRtxReceive * rtx) +{ + GST_OBJECT_LOCK (rtx); + rtx->num_rtx_requests = 0; + rtx->num_rtx_packets = 0; + GST_OBJECT_UNLOCK (rtx); +} + +static void +gst_rist_rtx_receive_init (GstRistRtxReceive * rtx) +{ + GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx); + + rtx->srcpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "src"), "src"); + GST_PAD_SET_PROXY_CAPS (rtx->srcpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad); + gst_pad_set_event_function (rtx->srcpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_receive_src_event)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad); + + rtx->sinkpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "sink"), "sink"); + GST_PAD_SET_PROXY_CAPS (rtx->sinkpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad); + gst_pad_set_chain_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_receive_chain)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); +} + +static gboolean +gst_rist_rtx_receive_src_event (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (parent); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + { + const GstStructure *s = gst_event_get_structure (event); + + /* This event usually comes from the downstream gstrtpjitterbuffer */ + if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { +#ifndef GST_DISABLE_GST_DEBUG + guint seqnum = 0; + guint ssrc = 0; + + /* retrieve seqnum of the packet that need to be retransmitted */ + if (!gst_structure_get_uint (s, "seqnum", &seqnum)) + seqnum = -1; + + /* retrieve ssrc of the packet that need to be retransmitted + * it's useful when reconstructing the original packet from the rtx packet */ + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + + GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X", + seqnum, ssrc); +#endif + + GST_OBJECT_LOCK (rtx); + /* increase number of seen requests for our statistics */ + ++rtx->num_rtx_requests; + GST_OBJECT_UNLOCK (rtx); + } + break; + } + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +static GstFlowReturn +gst_rist_rtx_receive_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (parent); + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + guint32 ssrc = 0; + guint16 seqnum = 0; + gboolean is_rtx; + + /* map current rtp packet to parse its header */ + if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) + goto invalid_buffer; + + ssrc = gst_rtp_buffer_get_ssrc (&rtp); + seqnum = gst_rtp_buffer_get_seq (&rtp); + + /* check if we have a retransmission packet (this information comes from SDP) */ + GST_OBJECT_LOCK (rtx); + + /* RIST sets SSRC LSB to 1 to indicate an RTC packet */ + is_rtx = ssrc & 0x1; + rtx->last_time = GST_BUFFER_PTS (buffer); + + if (is_rtx) + /* increase our statistic */ + ++rtx->num_rtx_packets; + + GST_OBJECT_UNLOCK (rtx); + + /* create the retransmission packet */ + if (is_rtx) { + GST_DEBUG_OBJECT (rtx, + "Recovered packet from RIST RTX seqnum:%u ssrc: %u", + gst_rtp_buffer_get_seq (&rtp), gst_rtp_buffer_get_ssrc (&rtp)); + gst_rtp_buffer_set_ssrc (&rtp, ssrc & 0xFFFFFFFE); + GST_BUFFER_FLAG_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION); + } + + gst_rtp_buffer_unmap (&rtp); + + GST_TRACE_OBJECT (rtx, "pushing packet seqnum:%u from master stream " + "ssrc: %X", seqnum, ssrc); + return gst_pad_push (rtx->srcpad, buffer); + +invalid_buffer: + { + GST_ELEMENT_WARNING (rtx, STREAM, DECODE, (NULL), + ("Received invalid RTP payload, dropping")); + gst_buffer_unref (buffer); + return GST_FLOW_OK; + } +} + +static void +gst_rist_rtx_receive_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (object); + + switch (prop_id) { + case PROP_NUM_RTX_REQUESTS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->num_rtx_requests); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_NUM_RTX_PACKETS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->num_rtx_packets); + GST_OBJECT_UNLOCK (rtx); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rist_rtx_receive_change_state (GstElement * element, + GstStateChange transition) +{ + GstRistRtxReceive *rtx = GST_RIST_RTX_RECEIVE (element); + GstStateChangeReturn ret; + + ret = + GST_ELEMENT_CLASS (gst_rist_rtx_receive_parent_class)->change_state + (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rist_rtx_receive_reset (rtx); + break; + default: + break; + } + + return ret; +} diff --git a/gst/rist/gstristrtxsend.c b/gst/rist/gstristrtxsend.c new file mode 100644 index 0000000000..927fb1cf06 --- /dev/null +++ b/gst/rist/gstristrtxsend.c @@ -0,0 +1,772 @@ +/* RIST Retransmission sender element for GStreamer + * + * gsristprtxsend.c: + * + * Copyright (C) 2013-2019 Collabora Ltd. + * @author Julien Isorce + * Nicoas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-ristrtxsend + * @title: ristrtxsend + * @see_also: ristrtxreceive + * + * This elements replies to custom events 'GstRTPRetransmissionRequest' and + * when available sends in RIST form the lost packet. This element is intented + * to be used by ristsink element. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include + +#include "gstrist.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rist_rtx_send_debug); +#define GST_CAT_DEFAULT gst_rist_rtx_send_debug + +#define DEFAULT_MAX_SIZE_TIME 0 +#define DEFAULT_MAX_SIZE_PACKETS 100 + +enum +{ + PROP_0, + PROP_MAX_SIZE_TIME, + PROP_MAX_SIZE_PACKETS, + PROP_NUM_RTX_REQUESTS, + PROP_NUM_RTX_PACKETS, +}; + +static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp") + ); + +static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]") + ); + +struct _GstRistRtxSend +{ + GstElement element; + + /* pad */ + GstPad *sinkpad; + GstPad *srcpad; + + /* rtp packets that will be pushed out */ + GstDataQueue *queue; + + /* ssrc -> SSRCRtxData */ + GHashTable *ssrc_data; + /* rtx ssrc -> master ssrc */ + GHashTable *rtx_ssrcs; + + /* buffering control properties */ + guint max_size_time; + guint max_size_packets; + + /* statistics */ + guint num_rtx_requests; + guint num_rtx_packets; +}; + +static gboolean gst_rist_rtx_send_queue_check_full (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata); + +static gboolean gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_rist_rtx_send_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static GstFlowReturn gst_rist_rtx_send_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer); +static GstFlowReturn gst_rist_rtx_send_chain_list (GstPad * pad, + GstObject * parent, GstBufferList * list); + +static void gst_rist_rtx_send_src_loop (GstRistRtxSend * rtx); +static gboolean gst_rist_rtx_send_activate_mode (GstPad * pad, + GstObject * parent, GstPadMode mode, gboolean active); + +static GstStateChangeReturn gst_rist_rtx_send_change_state (GstElement * + element, GstStateChange transition); + +static void gst_rist_rtx_send_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_rist_rtx_send_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_rist_rtx_send_finalize (GObject * object); + +G_DEFINE_TYPE_WITH_CODE (GstRistRtxSend, gst_rist_rtx_send, GST_TYPE_ELEMENT, + GST_DEBUG_CATEGORY_INIT (gst_rist_rtx_send_debug, "ristrtxsend", 0, + "RIST retransmission sender")); + +typedef struct +{ + guint16 seqnum; + guint32 timestamp; + GstBuffer *buffer; +} BufferQueueItem; + +static void +buffer_queue_item_free (BufferQueueItem * item) +{ + gst_buffer_unref (item->buffer); + g_slice_free (BufferQueueItem, item); +} + +typedef struct +{ + guint32 rtx_ssrc; + guint16 seqnum_base, next_seqnum; + gint clock_rate; + + /* history of rtp packets */ + GSequence *queue; +} SSRCRtxData; + +static SSRCRtxData * +ssrc_rtx_data_new (guint32 rtx_ssrc) +{ + SSRCRtxData *data = g_slice_new0 (SSRCRtxData); + + data->rtx_ssrc = rtx_ssrc; + data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16); + data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free); + + return data; +} + +static void +ssrc_rtx_data_free (SSRCRtxData * data) +{ + g_sequence_free (data->queue); + g_slice_free (SSRCRtxData, data); +} + +static void +gst_rist_rtx_send_class_init (GstRistRtxSendClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + gobject_class->get_property = gst_rist_rtx_send_get_property; + gobject_class->set_property = gst_rist_rtx_send_set_property; + gobject_class->finalize = gst_rist_rtx_send_finalize; + + g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, + g_param_spec_uint ("max-size-time", "Max Size Time", + "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT, + DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS, + g_param_spec_uint ("max-size-packets", "Max Size Packets", + "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16, + DEFAULT_MAX_SIZE_PACKETS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS, + g_param_spec_uint ("num-rtx-requests", "Num RTX Requests", + "Number of retransmission events received", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS, + g_param_spec_uint ("num-rtx-packets", "Num RTX Packets", + " Number of retransmission packets sent", 0, G_MAXUINT, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_add_static_pad_template (gstelement_class, &src_factory); + gst_element_class_add_static_pad_template (gstelement_class, &sink_factory); + + gst_element_class_set_static_metadata (gstelement_class, + "RIST Retransmission Sender", "Codec", + "Retransmit RTP packets when needed, according to VSF TR-06-1", + "Nicolas Dufresne "); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_change_state); +} + +static void +gst_rist_rtx_send_reset (GstRistRtxSend * rtx) +{ + GST_OBJECT_LOCK (rtx); + gst_data_queue_flush (rtx->queue); + g_hash_table_remove_all (rtx->ssrc_data); + g_hash_table_remove_all (rtx->rtx_ssrcs); + rtx->num_rtx_requests = 0; + rtx->num_rtx_packets = 0; + GST_OBJECT_UNLOCK (rtx); +} + +static void +gst_rist_rtx_send_finalize (GObject * object) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object); + + g_hash_table_unref (rtx->ssrc_data); + g_hash_table_unref (rtx->rtx_ssrcs); + g_object_unref (rtx->queue); + + G_OBJECT_CLASS (gst_rist_rtx_send_parent_class)->finalize (object); +} + +static void +gst_rist_rtx_send_init (GstRistRtxSend * rtx) +{ + GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx); + + rtx->srcpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "src"), "src"); + GST_PAD_SET_PROXY_CAPS (rtx->srcpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad); + gst_pad_set_event_function (rtx->srcpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_src_event)); + gst_pad_set_activatemode_function (rtx->srcpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_activate_mode)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad); + + rtx->sinkpad = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "sink"), "sink"); + GST_PAD_SET_PROXY_CAPS (rtx->sinkpad); + GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad); + gst_pad_set_event_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_sink_event)); + gst_pad_set_chain_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_chain)); + gst_pad_set_chain_list_function (rtx->sinkpad, + GST_DEBUG_FUNCPTR (gst_rist_rtx_send_chain_list)); + gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad); + + rtx->queue = gst_data_queue_new (gst_rist_rtx_send_queue_check_full, NULL, + NULL, rtx); + rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal, + NULL, (GDestroyNotify) ssrc_rtx_data_free); + rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal); + + rtx->max_size_time = DEFAULT_MAX_SIZE_TIME; + rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS; +} + +static void +gst_rist_rtx_send_set_flushing (GstRistRtxSend * rtx, gboolean flush) +{ + GST_OBJECT_LOCK (rtx); + gst_data_queue_set_flushing (rtx->queue, flush); + gst_data_queue_flush (rtx->queue); + GST_OBJECT_UNLOCK (rtx); +} + +static gboolean +gst_rist_rtx_send_queue_check_full (GstDataQueue * queue, + guint visible, guint bytes, guint64 time, gpointer checkdata) +{ + return FALSE; +} + +static void +gst_rtp_rtx_data_queue_item_free (gpointer item) +{ + GstDataQueueItem *data = item; + if (data->object) + gst_mini_object_unref (data->object); + g_slice_free (GstDataQueueItem, data); +} + +static gboolean +gst_rist_rtx_send_push_out (GstRistRtxSend * rtx, gpointer object) +{ + GstDataQueueItem *data; + gboolean success; + + data = g_slice_new0 (GstDataQueueItem); + data->object = GST_MINI_OBJECT (object); + data->size = 1; + data->duration = 1; + data->visible = TRUE; + data->destroy = gst_rtp_rtx_data_queue_item_free; + + success = gst_data_queue_push (rtx->queue, data); + + if (!success) + data->destroy (data); + + return success; +} + +static SSRCRtxData * +gst_rist_rtx_send_get_ssrc_data (GstRistRtxSend * rtx, guint32 ssrc) +{ + SSRCRtxData *data; + guint32 rtx_ssrc = 0; + + data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)); + if (!data) { + /* See 5.3.2 Retransmitted Packets, orignal packet have SSRC LSB set to + * 0, while RTX packet have LSB set to 1 */ + rtx_ssrc = ssrc + 1; + data = ssrc_rtx_data_new (rtx_ssrc); + g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data); + g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc), + GUINT_TO_POINTER (ssrc)); + } + + return data; +} + +/* + * see RIST TR-06-1 5.3.2 Retransmitted Packets + * + * RIST simply resend the packet verbatim, with SSRC+1, the defaults SSRC always + * have the LSB set to 0, so we can differentiate the retransmission and the + * normal packet. + */ +static GstBuffer * +gst_rtp_rist_buffer_new (GstRistRtxSend * rtx, GstBuffer * buffer, guint32 ssrc) +{ + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + buffer = gst_buffer_copy_deep (buffer); + gst_rtp_buffer_map (buffer, GST_MAP_WRITE, &rtp); + gst_rtp_buffer_set_ssrc (&rtp, ssrc + 1); + gst_rtp_buffer_unmap (&rtp); + + return buffer; +} + +static gint +buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b, + gpointer user_data) +{ + /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want, + * it returns negative when seqnum1 > seqnum2 and we want negative + * when b > a, i.e. a is smaller, so it comes first in the sequence */ + return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum); +} + +static gboolean +gst_rist_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + { + const GstStructure *s = gst_event_get_structure (event); + + /* This event usually comes from the downstream gstrtpsession */ + if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) { + guint seqnum = 0; + guint ssrc = 0; + GstBuffer *rtx_buf = NULL; + + /* retrieve seqnum of the packet that need to be retransmitted */ + if (!gst_structure_get_uint (s, "seqnum", &seqnum)) + seqnum = -1; + + /* retrieve ssrc of the packet that need to be retransmitted */ + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + + GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X", + seqnum, ssrc); + + GST_OBJECT_LOCK (rtx); + /* check if request is for us */ + if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) { + SSRCRtxData *data; + GSequenceIter *iter; + BufferQueueItem search_item; + + /* update statistics */ + ++rtx->num_rtx_requests; + + data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc); + + search_item.seqnum = seqnum; + iter = g_sequence_lookup (data->queue, &search_item, + (GCompareDataFunc) buffer_queue_items_cmp, NULL); + if (iter) { + BufferQueueItem *item = g_sequence_get (iter); + GST_LOG_OBJECT (rtx, "found %u", item->seqnum); + rtx_buf = gst_rtp_rist_buffer_new (rtx, item->buffer, ssrc); + } +#ifndef GST_DISABLE_DEBUG + else { + BufferQueueItem *item = NULL; + + iter = g_sequence_get_begin_iter (data->queue); + if (!g_sequence_iter_is_end (iter)) + item = g_sequence_get (iter); + + if (item && seqnum < item->seqnum) { + GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been " + "removed from the rtx queue; the first available is %u", + seqnum, item->seqnum); + } else { + GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been " + "transmitted yet in the original stream; either the remote end " + "is not configured correctly, or the source is too slow", + seqnum); + } +#endif + } + } + GST_OBJECT_UNLOCK (rtx); + + if (rtx_buf) + gst_rist_rtx_send_push_out (rtx, rtx_buf); + + gst_event_unref (event); + return TRUE; + } + break; + } + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +static gboolean +gst_rist_rtx_send_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + gst_pad_push_event (rtx->srcpad, event); + gst_rist_rtx_send_set_flushing (rtx, TRUE); + gst_pad_pause_task (rtx->srcpad); + return TRUE; + case GST_EVENT_FLUSH_STOP: + gst_pad_push_event (rtx->srcpad, event); + gst_rist_rtx_send_set_flushing (rtx, FALSE); + gst_pad_start_task (rtx->srcpad, + (GstTaskFunction) gst_rist_rtx_send_src_loop, rtx, NULL); + return TRUE; + case GST_EVENT_EOS: + GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it"); + gst_rist_rtx_send_push_out (rtx, event); + return TRUE; + case GST_EVENT_CAPS: + { + GstCaps *caps; + GstStructure *s; + guint ssrc; + gint payload; + SSRCRtxData *data; + + gst_event_parse_caps (event, &caps); + + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_uint (s, "ssrc", &ssrc)) + ssrc = -1; + if (!gst_structure_get_int (s, "payload", &payload)) + payload = -1; + + if (payload == -1) + GST_WARNING_OBJECT (rtx, "No payload in caps"); + + GST_OBJECT_LOCK (rtx); + data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc); + + GST_DEBUG_OBJECT (rtx, + "got caps for payload: %d->%d, ssrc: %u : %" GST_PTR_FORMAT, + payload, ssrc, data->rtx_ssrc, caps); + + gst_structure_get_int (s, "clock-rate", &data->clock_rate); + + /* The session might need to know the RTX ssrc */ + caps = gst_caps_copy (caps); + gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc, + "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL); + + GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u", + data->clock_rate, ssrc); + GST_OBJECT_UNLOCK (rtx); + + gst_event_unref (event); + event = gst_event_new_caps (caps); + gst_caps_unref (caps); + break; + } + default: + break; + } + + return gst_pad_event_default (pad, parent, event); +} + +/* like rtp_jitter_buffer_get_ts_diff() */ +static guint32 +gst_rist_rtx_send_get_ts_diff (SSRCRtxData * data) +{ + guint64 high_ts, low_ts; + BufferQueueItem *high_buf, *low_buf; + guint32 result; + + high_buf = + g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter + (data->queue))); + low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue)); + + if (!high_buf || !low_buf || high_buf == low_buf) + return 0; + + high_ts = high_buf->timestamp; + low_ts = low_buf->timestamp; + + /* it needs to work if ts wraps */ + if (high_ts >= low_ts) { + result = (guint32) (high_ts - low_ts); + } else { + result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts); + } + + /* return value in ms instead of clock ticks */ + return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate); +} + +/* Must be called with lock */ +static void +process_buffer (GstRistRtxSend * rtx, GstBuffer * buffer) +{ + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + BufferQueueItem *item; + SSRCRtxData *data; + guint16 seqnum; + guint32 ssrc, rtptime; + + /* read the information we want from the buffer */ + gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp); + seqnum = gst_rtp_buffer_get_seq (&rtp); + ssrc = gst_rtp_buffer_get_ssrc (&rtp); + rtptime = gst_rtp_buffer_get_timestamp (&rtp); + gst_rtp_buffer_unmap (&rtp); + + GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum, + ssrc); + + data = gst_rist_rtx_send_get_ssrc_data (rtx, ssrc); + + /* add current rtp buffer to queue history */ + item = g_slice_new0 (BufferQueueItem); + item->seqnum = seqnum; + item->timestamp = rtptime; + item->buffer = gst_buffer_ref (buffer); + g_sequence_append (data->queue, item); + + /* remove oldest packets from history if they are too many */ + if (rtx->max_size_packets) { + while (g_sequence_get_length (data->queue) > rtx->max_size_packets) + g_sequence_remove (g_sequence_get_begin_iter (data->queue)); + } + if (rtx->max_size_time) { + while (gst_rist_rtx_send_get_ts_diff (data) > rtx->max_size_time) + g_sequence_remove (g_sequence_get_begin_iter (data->queue)); + } +} + +static GstFlowReturn +gst_rist_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + GstFlowReturn ret; + + GST_OBJECT_LOCK (rtx); + process_buffer (rtx, buffer); + GST_OBJECT_UNLOCK (rtx); + ret = gst_pad_push (rtx->srcpad, buffer); + + return ret; +} + +static gboolean +process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data) +{ + process_buffer (user_data, *buffer); + return TRUE; +} + +static GstFlowReturn +gst_rist_rtx_send_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * list) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + GstFlowReturn ret; + + GST_OBJECT_LOCK (rtx); + gst_buffer_list_foreach (list, process_buffer_from_list, rtx); + GST_OBJECT_UNLOCK (rtx); + + ret = gst_pad_push_list (rtx->srcpad, list); + + return ret; +} + +static void +gst_rist_rtx_send_src_loop (GstRistRtxSend * rtx) +{ + GstDataQueueItem *data; + + if (gst_data_queue_pop (rtx->queue, &data)) { + GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object); + + if (G_LIKELY (GST_IS_BUFFER (data->object))) { + GST_OBJECT_LOCK (rtx); + /* Update statistics just before pushing. */ + rtx->num_rtx_packets++; + GST_OBJECT_UNLOCK (rtx); + + gst_pad_push (rtx->srcpad, GST_BUFFER (data->object)); + } else if (GST_IS_EVENT (data->object)) { + gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object)); + + /* after EOS, we should not send any more buffers, + * even if there are more requests coming in */ + if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) { + gst_rist_rtx_send_set_flushing (rtx, TRUE); + } + } else { + g_assert_not_reached (); + } + + data->object = NULL; /* we no longer own that object */ + data->destroy (data); + } else { + GST_LOG_OBJECT (rtx, "flushing"); + gst_pad_pause_task (rtx->srcpad); + } +} + +static gboolean +gst_rist_rtx_send_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (parent); + gboolean ret = FALSE; + + switch (mode) { + case GST_PAD_MODE_PUSH: + if (active) { + gst_rist_rtx_send_set_flushing (rtx, FALSE); + ret = gst_pad_start_task (rtx->srcpad, + (GstTaskFunction) gst_rist_rtx_send_src_loop, rtx, NULL); + } else { + gst_rist_rtx_send_set_flushing (rtx, TRUE); + ret = gst_pad_stop_task (rtx->srcpad); + } + GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret); + break; + default: + break; + } + return ret; +} + +static void +gst_rist_rtx_send_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object); + + switch (prop_id) { + case PROP_MAX_SIZE_TIME: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->max_size_time); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_MAX_SIZE_PACKETS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->max_size_packets); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_NUM_RTX_REQUESTS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->num_rtx_requests); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_NUM_RTX_PACKETS: + GST_OBJECT_LOCK (rtx); + g_value_set_uint (value, rtx->num_rtx_packets); + GST_OBJECT_UNLOCK (rtx); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_rtx_send_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (object); + + switch (prop_id) { + case PROP_MAX_SIZE_TIME: + GST_OBJECT_LOCK (rtx); + rtx->max_size_time = g_value_get_uint (value); + GST_OBJECT_UNLOCK (rtx); + break; + case PROP_MAX_SIZE_PACKETS: + GST_OBJECT_LOCK (rtx); + rtx->max_size_packets = g_value_get_uint (value); + GST_OBJECT_UNLOCK (rtx); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rist_rtx_send_change_state (GstElement * element, GstStateChange transition) +{ + GstStateChangeReturn ret; + GstRistRtxSend *rtx = GST_RIST_RTX_SEND (element); + + ret = + GST_ELEMENT_CLASS (gst_rist_rtx_send_parent_class)->change_state (element, + transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rist_rtx_send_reset (rtx); + break; + default: + break; + } + + return ret; +} diff --git a/gst/rist/gstristsink.c b/gst/rist/gstristsink.c new file mode 100644 index 0000000000..0bad78dacd --- /dev/null +++ b/gst/rist/gstristsink.c @@ -0,0 +1,838 @@ +/* GStreamer RIST plugin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-ristsink + * @title: ristsink + * @see_also: ristsrc + * + * This element implements RIST TR-06-1 Simple Profile transmitter. It + * currently supports any registered RTP payload types such as MPEG TS. The + * stream passed to this element must be RTP payloaded already. Even though + * RTP SSRC collision is rare in unidirectional streaming, this element expect + * the upstream elements to obey to collision events and change the SSRC in + * use. Collision will ocure when tranmitting and receiving over multicast on + * the same host. + * + * ## Example launch line + * |[ + * gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! ristsink address=10.0.0.1 port=5004 + * ]| + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +#include "gstrist.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rist_sink_debug); +#define GST_CAT_DEFAULT gst_rist_sink_debug + +enum +{ + PROP_ADDRESS = 1, + PROP_PORT, + PROP_SENDER_BUFFER, + PROP_MIN_RTCP_INTERVAL, + PROP_MAX_RTCP_BANDWIDTH, + PROP_STATS_UPDATE_INTERVAL, + PROP_STATS, + PROP_CNAME, + PROP_MULTICAST_LOOPBACK, + PROP_MULTICAST_IFACE, + PROP_MULTICAST_TTL +}; + +static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp")); + +struct _GstRistSink +{ + GstBin parent; + + /* Elements contained in the pipeline */ + GstElement *rtpbin; + GstElement *rtp_sink; + GstElement *rtcp_src; + GstElement *rtcp_sink; + GstElement *ssrc_filter; + GstPad *sinkpad; + + /* RTX Elements */ + GstElement *rtxbin; + GstElement *rtx_send; + + /* For stats */ + guint stats_interval; + guint32 rtp_ssrc; + guint32 rtcp_ssrc; + GstClockID stats_cid; + + /* This is set whenever there is a pipeline construction failure, and used + * to fail state changes later */ + gboolean construct_failed; + const gchar *missing_plugin; +}; + +G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN, + GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink")); + +static GstCaps * +gst_rist_sink_request_pt_map (GstRistSrc * sink, GstElement * session, guint pt) +{ + const GstRTPPayloadInfo *pt_info; + GstCaps *ret; + + pt_info = gst_rtp_payload_info_for_pt (pt); + if (!pt_info || !pt_info->clock_rate) + return NULL; + + ret = gst_caps_new_simple ("application/x-rtp", + "media", G_TYPE_STRING, pt_info->media, + "encoding_name", G_TYPE_STRING, pt_info->encoding_name, + "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL); + + /* FIXME add sprop-parameter-set if any */ + g_warn_if_fail (pt_info->encoding_parameters == NULL); + + return ret; +} + +static GstElement * +gst_rist_sink_request_aux_sender (GstRistSink * sink, guint session_id, + GstElement * rtpbin) +{ + if (session_id != 0) + return NULL; + + return gst_object_ref (sink->rtxbin); +} + +static void +on_app_rtcp (GObject * session, guint32 subtype, guint32 ssrc, + const gchar * name, GstBuffer * data, GstElement * rtpsession) +{ + if (g_str_equal (name, "RIST")) { + GstEvent *event; + GstPad *send_rtp_sink; + GstMapInfo map; + gint i; + + send_rtp_sink = gst_element_get_static_pad (rtpsession, "send_rtp_sink"); + if (send_rtp_sink) { + gst_buffer_map (data, &map, GST_MAP_READ); + + for (i = 0; i < map.size; i += sizeof (guint32)) { + guint32 dword = GST_READ_UINT32_BE (map.data + i); + guint16 seqnum = dword >> 16; + guint16 num = dword & 0x0000FFFF; + guint16 j; + + GST_DEBUG ("got RIST nack packet, #%u %u", seqnum, num); + + /* num is inclusive, i.e. it can be 0, which means exactly 1 seqnum */ + for (j = 0; j <= num; j++) { + event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstRTPRetransmissionRequest", + "seqnum", G_TYPE_UINT, (guint) seqnum + j, + "ssrc", G_TYPE_UINT, (guint) ssrc, NULL)); + gst_pad_push_event (send_rtp_sink, event); + } + } + + gst_buffer_unmap (data, &map); + gst_object_unref (send_rtp_sink); + } + } +} + +static void +gst_rist_sink_on_new_sender_ssrc (GstRistSink * sink, guint session_id, + guint ssrc, GstElement * rtpbin) +{ + GObject *gstsession = NULL; + GObject *session = NULL; + GObject *source = NULL; + + if (session_id != 0) + return; + + g_signal_emit_by_name (rtpbin, "get-session", session_id, &gstsession); + g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session); + g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source); + + if (ssrc & 1) + g_object_set (source, "disable-rtcp", TRUE, NULL); + else + g_signal_connect (session, "on-app-rtcp", (GCallback) on_app_rtcp, + gstsession); + + g_object_unref (source); + g_object_unref (session); +} + +static void +gst_rist_sink_on_new_receiver_ssrc (GstRistSink * sink, guint session_id, + guint ssrc, GstElement * rtpbin) +{ + if (session_id != 0) + return; + + GST_INFO_OBJECT (sink, "Got RTCP remote SSRC %u", ssrc); + sink->rtcp_ssrc = ssrc; +} + +static GstPadProbeReturn +gst_rist_sink_fix_collision (GstPad * pad, GstPadProbeInfo * info, + gpointer user_data) +{ + GstEvent *event = info->data; + const GstStructure *cs; + GstStructure *s; + guint ssrc; + + /* We simply ignore collisions */ + if (GST_EVENT_TYPE (event) != GST_EVENT_CUSTOM_UPSTREAM) + return GST_PAD_PROBE_OK; + + cs = gst_event_get_structure (event); + if (!gst_structure_has_name (cs, "GstRTPCollision")) + return GST_PAD_PROBE_OK; + + gst_structure_get_uint (cs, "suggested-ssrc", &ssrc); + if ((ssrc & 1) == 0) + return GST_PAD_PROBE_OK; + + event = info->data = gst_event_make_writable (event); + /* we can drop the const qualifier as we ensured writability */ + s = (GstStructure *) gst_event_get_structure (event); + gst_structure_set (s, "suggested-ssrc", G_TYPE_UINT, ssrc - 1, NULL); + + return GST_PAD_PROBE_OK; +} + +static gboolean +gst_rist_sink_set_caps (GstRistSink * sink, GstCaps * caps) +{ + const GstStructure *s = gst_caps_get_structure (caps, 0); + + if (!gst_structure_get_uint (s, "ssrc", &sink->rtp_ssrc)) { + GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, ("No 'ssrc' field in caps."), + (NULL)); + return FALSE; + } + + if (sink->rtp_ssrc & 1) { + GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, + ("Invalid RIST SSRC, LSB must be zero."), (NULL)); + return FALSE; + } + + return TRUE; +} + +static gboolean +gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRistSink *sink = GST_RIST_SINK (parent); + GstCaps *caps; + gboolean ret = TRUE; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS: + gst_event_parse_caps (event, &caps); + ret = gst_rist_sink_set_caps (sink, caps); + break; + default: + break; + } + + if (ret) + ret = gst_pad_event_default (pad, parent, event); + else + gst_event_unref (event); + + return ret; +} + +static void +gst_rist_sink_init (GstRistSink * sink) +{ + GstPad *ssrc_filter_sinkpad; + GstCaps *ssrc_caps; + GstPad *pad, *gpad; + GstStructure *sdes = NULL; + + /* Construct the RIST RTP sender pipeline. + * + * capsfilter*-> [send_rtp_sink_%u] -------- [send_rtp_src_%u] -> udpsink + * | rtpbin | + * udpsrc -> [recv_rtcp_sink_%u] -------- [send_rtcp_src_%u] -> * udpsink + * + * * To select RIST compatible SSRC + */ + sink->rtpbin = gst_element_factory_make ("rtpbin", "rist_send_rtbpin"); + if (!sink->rtpbin) { + sink->missing_plugin = "rtpmanager"; + goto missing_plugin; + } + + /* RIST specification says the SDES should only contain the CNAME */ + g_object_get (sink->rtpbin, "sdes", &sdes, NULL); + gst_structure_remove_field (sdes, "tool"); + + gst_bin_add (GST_BIN (sink), sink->rtpbin); + g_object_set (sink->rtpbin, "do-retransmission", TRUE, + "rtp-profile", 3 /* AVFP */ , + "sdes", sdes, NULL); + gst_structure_free (sdes); + + g_signal_connect_swapped (sink->rtpbin, "request-pt-map", + G_CALLBACK (gst_rist_sink_request_pt_map), sink); + g_signal_connect_swapped (sink->rtpbin, "request-aux-sender", + G_CALLBACK (gst_rist_sink_request_aux_sender), sink); + g_signal_connect_swapped (sink->rtpbin, "on-new-sender-ssrc", + G_CALLBACK (gst_rist_sink_on_new_sender_ssrc), sink); + g_signal_connect_swapped (sink->rtpbin, "on-new-ssrc", + G_CALLBACK (gst_rist_sink_on_new_receiver_ssrc), sink); + + sink->rtxbin = gst_bin_new ("rist_send_rtxbin"); + g_object_ref_sink (sink->rtxbin); + sink->rtx_send = gst_element_factory_make ("ristrtxsend", "rist_rtx_send"); + gst_bin_add (GST_BIN (sink->rtxbin), sink->rtx_send); + g_object_set (sink->rtx_send, "max-size-packets", 0, NULL); + + pad = gst_element_get_static_pad (sink->rtx_send, "sink"); + gpad = gst_ghost_pad_new ("sink_0", pad); + gst_object_unref (pad); + gst_element_add_pad (sink->rtxbin, gpad); + + pad = gst_element_get_static_pad (sink->rtx_send, "src"); + gpad = gst_ghost_pad_new ("src_0", pad); + gst_object_unref (pad); + gst_element_add_pad (sink->rtxbin, gpad); + + sink->rtp_sink = gst_element_factory_make ("udpsink", "rist_rtp_udpsink"); + sink->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc"); + sink->rtcp_sink = gst_element_factory_make ("udpsink", "rist_rtcp_udpsink"); + if (!sink->rtp_sink || !sink->rtcp_src || !sink->rtcp_sink) { + g_clear_object (&sink->rtp_sink); + g_clear_object (&sink->rtcp_src); + g_clear_object (&sink->rtcp_sink); + sink->missing_plugin = "udp"; + goto missing_plugin; + } + gst_bin_add_many (GST_BIN (sink), sink->rtp_sink, sink->rtcp_src, + sink->rtcp_sink, NULL); + gst_element_set_locked_state (sink->rtcp_src, TRUE); + gst_element_set_locked_state (sink->rtcp_sink, TRUE); + + sink->ssrc_filter = gst_element_factory_make ("capsfilter", + "rist_ssrc_filter"); + if (!sink->ssrc_filter) { + sink->missing_plugin = "coreelements"; + goto missing_plugin; + } + gst_bin_add (GST_BIN (sink), sink->ssrc_filter); + + sink->rtp_ssrc = g_random_int () & ~1; + ssrc_caps = gst_caps_new_simple ("application/x-rtp", + "ssrc", G_TYPE_UINT, sink->rtp_ssrc, NULL); + gst_caps_append_structure (ssrc_caps, + gst_structure_new_empty ("application/x-rtp")); + g_object_set (sink->ssrc_filter, "caps", ssrc_caps, NULL); + gst_caps_unref (ssrc_caps); + gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin, + "send_rtp_sink_0"); + gst_element_link_pads (sink->rtpbin, "send_rtp_src_0", sink->rtp_sink, + "sink"); + gst_element_link_pads (sink->rtcp_src, "src", sink->rtpbin, + "recv_rtcp_sink_0"); + gst_element_link_pads (sink->rtpbin, "send_rtcp_src_0", sink->rtcp_sink, + "sink"); + + ssrc_filter_sinkpad = gst_element_get_static_pad (sink->ssrc_filter, "sink"); + sink->sinkpad = gst_ghost_pad_new_from_template ("sink", ssrc_filter_sinkpad, + gst_static_pad_template_get (&sink_templ)); + gst_pad_set_event_function (sink->sinkpad, gst_rist_sink_event); + gst_element_add_pad (GST_ELEMENT (sink), sink->sinkpad); + gst_object_unref (ssrc_filter_sinkpad); + + gst_pad_add_probe (sink->sinkpad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, + gst_rist_sink_fix_collision, sink, NULL); + + return; + +missing_plugin: + { + GST_ERROR_OBJECT (sink, "'%s' plugin is missing.", sink->missing_plugin); + sink->construct_failed = TRUE; + /* Just make our element valid, so we fail cleanly */ + gst_element_add_pad (GST_ELEMENT (sink), + gst_pad_new_from_static_template (&sink_templ, "sink")); + } +} + +static GstStateChangeReturn +gst_rist_sink_start (GstRistSink * sink) +{ + GSocket *socket = NULL; + GInetAddress *iaddr = NULL; + gchar *remote_addr = NULL; + guint remote_port; + GError *error = NULL; + + if (sink->construct_failed) { + GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN, + ("Your GStreamer installation is missing plugin '%s'", + sink->missing_plugin), (NULL)); + return GST_STATE_CHANGE_FAILURE; + } + + g_object_get (sink->rtcp_sink, "host", &remote_addr, "port", &remote_port, + NULL); + + iaddr = g_inet_address_new_from_string (remote_addr); + if (!iaddr) { + GList *results; + GResolver *resolver = NULL; + + resolver = g_resolver_get_default (); + results = g_resolver_lookup_by_name (resolver, remote_addr, NULL, &error); + + if (!results) { + g_object_unref (resolver); + goto dns_resolve_failed; + } + + iaddr = G_INET_ADDRESS (g_object_ref (results->data)); + + g_free (remote_addr); + remote_addr = g_inet_address_to_string (iaddr); + + g_resolver_free_addresses (results); + g_object_unref (resolver); + } + + if (g_inet_address_get_is_multicast (iaddr)) { + g_object_set (sink->rtcp_src, "address", remote_addr, "port", remote_port, + NULL); + } else { + const gchar *any_addr; + + if (g_inet_address_get_family (iaddr) == G_SOCKET_FAMILY_IPV6) + any_addr = "::"; + else + any_addr = "0.0.0.0"; + + g_object_set (sink->rtcp_src, "address", any_addr, "port", 0, NULL); + } + g_object_unref (iaddr); + + gst_element_set_locked_state (sink->rtcp_src, FALSE); + gst_element_sync_state_with_parent (sink->rtcp_src); + + /* share the socket created by the sink */ + g_object_get (sink->rtcp_src, "used-socket", &socket, NULL); + g_object_set (sink->rtcp_sink, "socket", socket, "auto-multicast", FALSE, + "close-socket", FALSE, NULL); + g_object_unref (socket); + + gst_element_set_locked_state (sink->rtcp_sink, FALSE); + gst_element_sync_state_with_parent (sink->rtcp_sink); + + return GST_STATE_CHANGE_SUCCESS; + +dns_resolve_failed: + GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, + ("Could not resolve hostname '%s'", remote_addr), + ("DNS resolver reported: %s", error->message)); + g_free (remote_addr); + g_error_free (error); + return GST_STATE_CHANGE_FAILURE; +} + +static GstStructure * +gst_rist_sink_create_stats (GstRistSink * sink) +{ + GObject *session = NULL, *source = NULL; + GstStructure *sstats = NULL, *ret; + guint64 pkt_sent = 0, rtx_sent = 0, rtt; + guint rb_rtt = 0; + + ret = gst_structure_new_empty ("rist/x-sender-stats"); + + g_signal_emit_by_name (sink->rtpbin, "get-internal-session", 0, &session); + if (!session) + return ret; + + g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtp_ssrc, + &source); + if (source) { + g_object_get (source, "stats", &sstats, NULL); + gst_structure_get_uint64 (sstats, "packets-sent", &pkt_sent); + gst_structure_free (sstats); + g_clear_object (&source); + } + + g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtcp_ssrc, + &source); + if (source) { + g_object_get (source, "stats", &sstats, NULL); + gst_structure_get_uint (sstats, "rb-round-trip", &rb_rtt); + gst_structure_free (sstats); + g_clear_object (&source); + } + g_object_unref (session); + + g_object_get (sink->rtx_send, "num-rtx-packets", &rtx_sent, NULL); + + /* rb_rtt is in Q16 in NTP time */ + rtt = gst_util_uint64_scale (rb_rtt, GST_SECOND, 65536); + + gst_structure_set (ret, "sent-original-packets", G_TYPE_UINT64, pkt_sent, + "sent-retransmitted-packets", G_TYPE_UINT64, rtx_sent, + "round-trip-time", G_TYPE_UINT64, rtt, NULL); + + return ret; +} + +static gboolean +gst_rist_sink_dump_stats (GstClock * clock, GstClockTime time, GstClockID id, + gpointer user_data) +{ + GstRistSink *sink = GST_RIST_SINK (user_data); + GstStructure *stats = gst_rist_sink_create_stats (sink); + + gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (sink), stats); + + gst_structure_free (stats); + return TRUE; +} + +static void +gst_rist_sink_enable_stats_interval (GstRistSink * sink) +{ + GstClock *clock; + GstClockTime start, interval; + + if (sink->stats_interval == 0) + return; + + interval = sink->stats_interval * GST_MSECOND; + clock = gst_system_clock_obtain (); + start = gst_clock_get_time (clock) + interval; + + sink->stats_cid = gst_clock_new_periodic_id (clock, start, interval); + gst_clock_id_wait_async (sink->stats_cid, gst_rist_sink_dump_stats, + gst_object_ref (sink), (GDestroyNotify) gst_object_unref); + + gst_object_unref (clock); +} + +static void +gst_rist_sink_disable_stats_interval (GstRistSink * sink) +{ + if (sink->stats_cid) { + gst_clock_id_unschedule (sink->stats_cid); + gst_clock_id_unref (sink->stats_cid); + sink->stats_cid = NULL; + } +} + +static GstStateChangeReturn +gst_rist_sink_change_state (GstElement * element, GstStateChange transition) +{ + GstRistSink *sink = GST_RIST_SINK (element); + GstStateChangeReturn ret; + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rist_sink_disable_stats_interval (sink); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (gst_rist_sink_parent_class)->change_state (element, + transition); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + ret = gst_rist_sink_start (sink); + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_rist_sink_enable_stats_interval (sink); + break; + default: + break; + } + + return ret; +} + +static void +gst_rist_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRistSink *sink = GST_RIST_SINK (object); + GstElement *session = NULL; + GstClockTime interval; + GstStructure *sdes; + + if (sink->construct_failed) + return; + + switch (prop_id) { + case PROP_ADDRESS: + g_object_get_property (G_OBJECT (sink->rtp_sink), "host", value); + break; + + case PROP_PORT: + g_object_get_property (G_OBJECT (sink->rtp_sink), "port", value); + break; + + case PROP_SENDER_BUFFER: + g_object_get_property (G_OBJECT (sink->rtx_send), "max-size-time", value); + break; + + case PROP_MIN_RTCP_INTERVAL: + g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); + g_object_get (session, "rtcp-min-interval", &interval, NULL); + g_value_set_uint (value, (guint) (interval / GST_MSECOND)); + g_object_unref (session); + break; + + case PROP_MAX_RTCP_BANDWIDTH: + g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); + g_object_get_property (G_OBJECT (session), "rtcp-fraction", value); + g_object_unref (session); + break; + + case PROP_STATS_UPDATE_INTERVAL: + g_value_set_uint (value, sink->stats_interval); + break; + + case PROP_STATS: + g_value_take_boxed (value, gst_rist_sink_create_stats (sink)); + break; + + case PROP_CNAME: + g_object_get (sink->rtpbin, "sdes", &sdes, NULL); + g_value_set_string (value, gst_structure_get_string (sdes, "cname")); + gst_structure_free (sdes); + break; + + case PROP_MULTICAST_LOOPBACK: + g_object_get_property (G_OBJECT (sink->rtp_sink), "loop", value); + break; + + case PROP_MULTICAST_IFACE: + g_object_get_property (G_OBJECT (sink->rtp_sink), + "multicast-iface", value); + break; + + case PROP_MULTICAST_TTL: + g_object_get_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRistSink *sink = GST_RIST_SINK (object); + GstElement *session = NULL; + GstStructure *sdes; + + if (sink->construct_failed) + return; + + switch (prop_id) { + case PROP_ADDRESS: + g_object_set_property (G_OBJECT (sink->rtp_sink), "host", value); + g_object_set_property (G_OBJECT (sink->rtcp_sink), "host", value); + break; + + case PROP_PORT:{ + guint port = g_value_get_uint (value); + + /* According to 5.1.1, RTCP receiver port most be event number and RTCP + * port should be the RTP port + 1 */ + + if (port & 0x1) { + g_warning ("Invalid RIST port %u, should be an even number.", port); + return; + } + + g_object_set (sink->rtp_sink, "port", port, NULL); + g_object_set (sink->rtcp_sink, "port", port + 1, NULL); + break; + } + + case PROP_SENDER_BUFFER: + g_object_set (sink->rtx_send, + "max-size-time", g_value_get_uint (value), NULL); + break; + + case PROP_MIN_RTCP_INTERVAL: + g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); + g_object_set (session, "rtcp-min-interval", + g_value_get_uint (value) * GST_MSECOND, NULL); + g_object_unref (session); + break; + + case PROP_MAX_RTCP_BANDWIDTH: + g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); + g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL); + g_object_unref (session); + break; + + case PROP_STATS_UPDATE_INTERVAL: + sink->stats_interval = g_value_get_uint (value); + break; + + case PROP_CNAME: + g_object_get (sink->rtpbin, "sdes", &sdes, NULL); + gst_structure_set_value (sdes, "cname", value); + g_object_set (sink->rtpbin, "sdes", sdes, NULL); + gst_structure_free (sdes); + break; + + case PROP_MULTICAST_LOOPBACK: + g_object_set_property (G_OBJECT (sink->rtp_sink), "loop", value); + g_object_set_property (G_OBJECT (sink->rtcp_sink), "loop", value); + break; + + case PROP_MULTICAST_IFACE: + g_object_set_property (G_OBJECT (sink->rtp_sink), + "multicast-iface", value); + g_object_set_property (G_OBJECT (sink->rtcp_sink), + "multicast-iface", value); + break; + + case PROP_MULTICAST_TTL: + g_object_set_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value); + g_object_set_property (G_OBJECT (sink->rtcp_sink), "ttl-mc", value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_sink_finalize (GObject * object) +{ + GstRistSink *sink = GST_RIST_SINK (object); + g_clear_object (&sink->rtxbin); + + G_OBJECT_CLASS (gst_rist_sink_parent_class)->finalize (object); +} + +static void +gst_rist_sink_class_init (GstRistSinkClass * klass) +{ + GstElementClass *element_class = (GstElementClass *) klass; + GObjectClass *object_class = (GObjectClass *) klass; + + gst_element_class_set_metadata (element_class, + "RIST Sink", "Source/Network", + "Sink that implements RIST TR-06-1 streaming specification", + "Nicolas Dufresne change_state = gst_rist_sink_change_state; + + object_class->get_property = gst_rist_sink_get_property; + object_class->set_property = gst_rist_sink_set_property; + object_class->finalize = gst_rist_sink_finalize; + + g_object_class_install_property (object_class, PROP_ADDRESS, + g_param_spec_string ("address", "Address", + "Address to send packets to (can be IPv4 or IPv6).", "0.0.0.0", + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_PORT, + g_param_spec_uint ("port", "Port", "The port RTP packets will be sent, " + "RTCP port is derived from it, this port must be an even number.", + 2, 65534, 5004, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_SENDER_BUFFER, + g_param_spec_uint ("sender-buffer", "Sender Buffer", + "Size of the retransmission queue in ms", 0, G_MAXUINT, 1200, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL, + g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal", + "The minimum interval in ms between two regular successive RTCP " + "packets.", 0, 100, 100, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH, + g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth", + "The maximum bandwidth used for RTCP in fraction of RTP bandwdith", + 0.0, 0.05, 0.05, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL, + g_param_spec_uint ("stats-update-interval", "Statistics Update Interval", + "The interval between 'stats' update notification (0 disabled)", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_STATS, + g_param_spec_boxed ("stats", "Statistics", + "Statistic in a GstStructure named 'rist/x-sender-stats'", + GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_CNAME, + g_param_spec_string ("cname", "CName", + "Set the CNAME in the SDES block of the sender report.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK, + g_param_spec_boolean ("multicast-loopback", "Multicast Loopback", + "When enabled, the packet will be received locally.", FALSE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MULTICAST_IFACE, + g_param_spec_string ("multicast-iface", "multicast-iface", + "The multicast interface to use to send packets.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_MULTICAST_TTL, + g_param_spec_int ("multicast-ttl", "Multicast TTL", + "The multicast time-to-live parameter.", 0, 255, 1, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); +} diff --git a/gst/rist/gstristsrc.c b/gst/rist/gstristsrc.c new file mode 100644 index 0000000000..9e8d403f6f --- /dev/null +++ b/gst/rist/gstristsrc.c @@ -0,0 +1,1021 @@ +/* GStreamer RIST plugin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-ristsrc + * @title: ristsrc + * @see_also: ristsink + * + * This element implements RIST TR-06-1 Simple Profile receiver. The stream + * produced by this element will be RTP payloaded. This element also implement + * the URI scheme `rist://` allowing to render RIST streams in GStreamer based + * media players. The RIST uri handler also allow setting propertied through + * the URI query. + * + * ## Example launch line + * |[ + * gst-launch-1.0 ristsrc address=0.0.0.0 port=5004 ! rtpmp2depay ! udpsink + * gst-play-1.0 "rist://0.0.0.0:5004?receiver-buffer=700" + * ]| + */ + + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include + +/* for setsockopt() */ +#ifndef G_OS_WIN32 +#include +#include +#endif + +#include "gstrist.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rist_src_debug); +#define GST_CAT_DEFAULT gst_rist_src_debug + +enum +{ + PROP_ADDRESS = 1, + PROP_PORT, + PROP_RECEIVER_BUFFER, + PROP_REORDER_SECTION, + PROP_MAX_RTX_RETRIES, + PROP_MIN_RTCP_INTERVAL, + PROP_MAX_RTCP_BANDWIDTH, + PROP_STATS_UPDATE_INTERVAL, + PROP_STATS, + PROP_CNAME, + PROP_MULTICAST_LOOPBACK, + PROP_MULTICAST_IFACE, + PROP_MULTICAST_TTL +}; + +static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp")); + +struct _GstRistSrc +{ + GstBin parent; + + GstUri *uri; + + /* Elements contained in the pipeline, the rtp/rtcp_src are 'udpsrc' */ + GstElement *rtpbin; + GstElement *rtp_src; + GstElement *rtcp_src; + GstElement *rtcp_sink; + gulong rtcp_recv_probe; + gulong rtcp_send_probe; + GSocketAddress *rtcp_send_addr; + GstPad *srcpad; + gint multicast_ttl; + + /* RTX Elements */ + GstElement *rtxbin; + GstElement *rtx_receive; + + /* For property handling */ + guint reorder_section; + guint max_rtx_retries; + + /* For stats */ + guint stats_interval; + guint32 rtp_ssrc; + GstClockID stats_cid; + GstElement *jitterbuffer; + + /* This is set whenever there is a pipeline construction failure, and used + * to fail state changes later */ + gboolean construct_failed; + const gchar *missing_plugin; +}; + +static void gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data); + +G_DEFINE_TYPE_WITH_CODE (GstRistSrc, gst_rist_src, GST_TYPE_BIN, + G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rist_src_uri_init); + GST_DEBUG_CATEGORY_INIT (gst_rist_src_debug, "ristsrc", 0, "RIST Source")); + +static void +gst_rist_src_pad_added (GstRistSrc * src, GstPad * new_pad, GstElement * rtpbin) +{ + GST_TRACE_OBJECT (src, "New pad '%s'.", GST_PAD_NAME (new_pad)); + + if (g_str_has_prefix (GST_PAD_NAME (new_pad), "recv_rtp_src_0_")) { + GST_DEBUG_OBJECT (src, "Using new pad '%s' as ghost pad target.", + GST_PAD_NAME (new_pad)); + gst_ghost_pad_set_target (GST_GHOST_PAD (src->srcpad), new_pad); + } +} + +static GstCaps * +gst_rist_src_request_pt_map (GstRistSrc * src, GstElement * session, guint pt) +{ + const GstRTPPayloadInfo *pt_info; + GstCaps *ret; + + pt_info = gst_rtp_payload_info_for_pt (pt); + if (!pt_info || !pt_info->clock_rate) + return NULL; + + ret = gst_caps_new_simple ("application/x-rtp", + "media", G_TYPE_STRING, pt_info->media, + "encoding_name", G_TYPE_STRING, pt_info->encoding_name, + "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL); + + /* FIXME add sprop-parameter-set if any */ + g_warn_if_fail (pt_info->encoding_parameters == NULL); + + return ret; +} + +static GstElement * +gst_rist_src_request_aux_receiver (GstRistSrc * src, guint session_id, + GstElement * rtpbin) +{ + if (session_id != 0) + return NULL; + + return gst_object_ref (src->rtxbin); +} + +/* Overrides the nack creation. Right now we don't send mixed NACKS type, we + * simply send a set of range NACK if it takes less space, or allow adding + * more seqnum. */ +static guint +gst_rist_src_on_sending_nacks (GObject * session, guint sender_ssrc, + guint media_ssrc, GArray * nacks, GstBuffer * buffer, gpointer user_data) +{ + GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; + GstRTCPPacket packet; + guint8 *app_data; + guint nacked_seqnums = 0; + guint range_size = 0; + guint n_rg_nacks = 0; + guint n_fb_nacks = 0; + guint16 seqnum; + guint i; + gint diff; + + /* We'll assume that range will be best, and find how many generic NACK + * would have been created. If this number ends up being smaller, we will + * just remove the APP packet and return 0, leaving it to RTPSession to + * create the generic NACK.*/ + + gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp); + if (!gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet)) + /* exit because the packet is full, will put next request in a + * further packet */ + goto done; + + gst_rtcp_packet_app_set_ssrc (&packet, media_ssrc); + gst_rtcp_packet_app_set_name (&packet, "RIST"); + + if (!gst_rtcp_packet_app_set_data_length (&packet, 1)) { + gst_rtcp_packet_remove (&packet); + GST_WARNING ("no range nacks fit in the packet"); + goto done; + } + + app_data = gst_rtcp_packet_app_get_data (&packet); + for (i = 0; i < nacks->len; i = nacked_seqnums) { + guint j; + seqnum = g_array_index (nacks, guint16, i); + + if (!gst_rtcp_packet_app_set_data_length (&packet, n_rg_nacks + 1)) + break; + + n_rg_nacks++; + nacked_seqnums++; + + for (j = i + 1; j < nacks->len; j++) { + guint16 next_seqnum = g_array_index (nacks, guint16, j); + diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum); + GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, next_seqnum, diff); + if (diff > (j - i)) + break; + + nacked_seqnums++; + } + + range_size = j - i - 1; + GST_WRITE_UINT32_BE (app_data, seqnum << 16 | range_size); + app_data += 4; + } + + /* count how many FB NACK it would take to wrap nacked_seqnums */ + seqnum = g_array_index (nacks, guint16, 0); + n_fb_nacks = 1; + for (i = 1; i < nacked_seqnums; i++) { + guint16 next_seqnum = g_array_index (nacks, guint16, i); + diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum); + if (diff > 16) { + n_fb_nacks++; + seqnum = next_seqnum; + } + } + + if (n_fb_nacks <= n_rg_nacks) { + GST_DEBUG ("Not sending %u range nacks, as %u FB nacks will be smaller", + n_rg_nacks, n_fb_nacks); + gst_rtcp_packet_remove (&packet); + nacked_seqnums = 0; + goto done; + } + + GST_DEBUG ("Sent %u seqnums into %u Range NACKs", nacked_seqnums, n_rg_nacks); + +done: + gst_rtcp_buffer_unmap (&rtcp); + return nacked_seqnums; +} + +static void +gst_rist_src_on_new_ssrc (GstRistSrc * src, guint session_id, guint ssrc, + GstElement * rtpbin) +{ + GObject *session = NULL; + GObject *source = NULL; + + if (session_id != 0) + return; + + g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session); + g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source); + + if (ssrc & 1) + g_object_set (source, "disable-rtcp", TRUE, "probation", 0, NULL); + else + g_signal_connect (session, "on-sending-nacks", + (GCallback) gst_rist_src_on_sending_nacks, NULL); + + g_object_unref (source); + g_object_unref (session); +} + +static void +gst_rist_src_new_jitterbuffer (GstRistSrc * src, GstElement * jitterbuffer, + guint session, guint ssrc, GstElement * rtpbin) +{ + GST_OBJECT_LOCK (src); + g_object_set (jitterbuffer, "rtx-delay", src->reorder_section, + "rtx-max-retries", src->max_rtx_retries, NULL); + + if ((ssrc & 1) == 0) { + GST_INFO_OBJECT (src, "Saving jitterbuffer for session %u ssrc %u", + session, ssrc); + g_clear_object (&src->jitterbuffer); + src->jitterbuffer = gst_object_ref (jitterbuffer); + src->rtp_ssrc = ssrc; + } + + GST_OBJECT_UNLOCK (src); +} + +static void +gst_rist_src_init (GstRistSrc * src) +{ + GstPad *pad, *gpad; + GstStructure *sdes = NULL; + + /* Construct the RIST RTP receiver pipeline. + * + * udpsrc -> [recv_rtp_sink_%u] -------- [recv_rtp_src_%u_%u_%u] + * | rtpbin | + * udpsrc -> [recv_rtcp_sink_%u] -------- [send_rtcp_src_%u] -> udpsink + * + * This pipeline is fixed for now, note that optionally an FEC stream could + * be added later. + */ + src->srcpad = gst_ghost_pad_new_no_target_from_template ("src", + gst_static_pad_template_get (&src_templ)); + gst_element_add_pad (GST_ELEMENT (src), src->srcpad); + + src->rtpbin = gst_element_factory_make ("rtpbin", "rist_recv_rtbpin"); + if (!src->rtpbin) { + src->missing_plugin = "rtpmanager"; + goto missing_plugin; + } + + /* RIST specification says the SDES should only contain the CNAME */ + g_object_get (src->rtpbin, "sdes", &sdes, NULL); + gst_structure_remove_field (sdes, "tool"); + + gst_bin_add (GST_BIN (src), src->rtpbin); + g_object_set (src->rtpbin, "do-retransmission", TRUE, + "rtp-profile", 3 /* AVPF */ , + "sdes", sdes, NULL); + + gst_structure_free (sdes); + + g_signal_connect_swapped (src->rtpbin, "request-pt-map", + G_CALLBACK (gst_rist_src_request_pt_map), src); + g_signal_connect_swapped (src->rtpbin, "request-aux-receiver", + G_CALLBACK (gst_rist_src_request_aux_receiver), src); + + src->rtxbin = gst_bin_new ("rist_recv_rtxbin"); + g_object_ref_sink (src->rtxbin); + src->rtx_receive = gst_element_factory_make ("ristrtxreceive", + "rist_rtx_receive"); + gst_bin_add (GST_BIN (src->rtxbin), src->rtx_receive); + + pad = gst_element_get_static_pad (src->rtx_receive, "sink"); + gpad = gst_ghost_pad_new ("sink_0", pad); + gst_object_unref (pad); + gst_element_add_pad (src->rtxbin, gpad); + + pad = gst_element_get_static_pad (src->rtx_receive, "src"); + gpad = gst_ghost_pad_new ("src_0", pad); + gst_object_unref (pad); + gst_element_add_pad (src->rtxbin, gpad); + + src->rtp_src = gst_element_factory_make ("udpsrc", "rist_rtp_udpsrc"); + src->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc"); + src->rtcp_sink = + gst_element_factory_make ("dynudpsink", "rist_rtcp_dynudpsink"); + if (!src->rtp_src || !src->rtcp_src || !src->rtcp_sink) { + g_clear_object (&src->rtp_src); + g_clear_object (&src->rtcp_src); + g_clear_object (&src->rtcp_sink); + src->missing_plugin = "udp"; + goto missing_plugin; + } + gst_bin_add_many (GST_BIN (src), src->rtp_src, src->rtcp_src, + src->rtcp_sink, NULL); + g_object_set (src->rtcp_sink, "sync", FALSE, "async", FALSE, NULL); + /* delay udpsink startup, we will give it the socket from the RTCP udpsrc, + * but socket can only be set in NULL state */ + gst_element_set_locked_state (src->rtcp_sink, TRUE); + + gst_element_link_pads (src->rtp_src, "src", src->rtpbin, "recv_rtp_sink_0"); + gst_element_link_pads (src->rtcp_src, "src", src->rtpbin, "recv_rtcp_sink_0"); + gst_element_link_pads (src->rtpbin, "send_rtcp_src_0", + src->rtcp_sink, "sink"); + + g_signal_connect_swapped (src->rtpbin, "pad-added", + G_CALLBACK (gst_rist_src_pad_added), src); + g_signal_connect_swapped (src->rtpbin, "on-new-ssrc", + G_CALLBACK (gst_rist_src_on_new_ssrc), src); + g_signal_connect_swapped (src->rtpbin, "new-jitterbuffer", + G_CALLBACK (gst_rist_src_new_jitterbuffer), src); + + return; + +missing_plugin: + { + GST_ERROR_OBJECT (src, "'%s' plugin is missing.", src->missing_plugin); + src->construct_failed = TRUE; + } +} + +static GstPadProbeReturn +gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info, + gpointer user_data) +{ + GstRistSrc *src = GST_RIST_SRC (user_data); + GstBuffer *buffer; + GstNetAddressMeta *meta; + + if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) { + GstBufferList *buffer_list = info->data; + buffer = gst_buffer_list_get (buffer_list, 0); + } else { + buffer = info->data; + } + + meta = gst_buffer_get_net_address_meta (buffer); + + GST_OBJECT_LOCK (src); + g_clear_object (&src->rtcp_send_addr); + src->rtcp_send_addr = g_object_ref (meta->addr); + GST_OBJECT_UNLOCK (src); + + return GST_PAD_PROBE_OK; +} + +static inline void +gst_rist_src_attach_net_address_meta (GstRistSrc * src, GstBuffer * buffer) +{ + GST_OBJECT_LOCK (src); + if (src->rtcp_send_addr) + gst_buffer_add_net_address_meta (buffer, src->rtcp_send_addr); + GST_OBJECT_UNLOCK (src); +} + +static GstPadProbeReturn +gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info, + gpointer user_data) +{ + GstRistSrc *src = GST_RIST_SRC (user_data); + + if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) { + GstBufferList *buffer_list = info->data; + GstBuffer *buffer; + gint i; + + info->data = buffer_list = gst_buffer_list_make_writable (buffer_list); + for (i = 0; i < gst_buffer_list_length (buffer_list); i++) { + buffer = gst_buffer_list_get (buffer_list, i); + gst_rist_src_attach_net_address_meta (src, buffer); + } + } else { + GstBuffer *buffer = info->data; + info->data = buffer = gst_buffer_make_writable (buffer); + gst_rist_src_attach_net_address_meta (src, buffer); + } + + return GST_PAD_PROBE_OK; +} + +static GstStateChangeReturn +gst_rist_src_start (GstRistSrc * src) +{ + GstPad *pad; + GSocket *socket = NULL; + gchar *address; + guint rtcp_port; + GInetAddress *iaddr; + + if (src->construct_failed) { + GST_ELEMENT_ERROR (src, CORE, MISSING_PLUGIN, + ("Your GStreamer installation is missing plugin '%s'", + src->missing_plugin), (NULL)); + return GST_STATE_CHANGE_FAILURE; + } + + g_object_get (src->rtcp_src, "used-socket", &socket, + "address", &address, "port", &rtcp_port, NULL); + + iaddr = g_inet_address_new_from_string (address); + g_free (address); + + if (g_inet_address_get_is_multicast (iaddr)) { + /* mc-ttl is not supported by dynudpsink */ + g_socket_set_multicast_ttl (socket, src->multicast_ttl); + /* In multicast, send RTCP to the multicast group */ + src->rtcp_send_addr = g_inet_socket_address_new (iaddr, rtcp_port); + } else { + /* In unicast, send RTCP to the detected sender address */ + pad = gst_element_get_static_pad (src->rtcp_src, "src"); + src->rtcp_recv_probe = gst_pad_add_probe (pad, + GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, + gst_rist_src_on_recv_rtcp, src, NULL); + gst_object_unref (pad); + } + g_object_unref (iaddr); + + pad = gst_element_get_static_pad (src->rtcp_sink, "sink"); + src->rtcp_send_probe = gst_pad_add_probe (pad, + GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, + gst_rist_src_on_send_rtcp, src, NULL); + gst_object_unref (pad); + + /* share the socket created by the source */ + g_object_set (src->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL); + g_object_unref (socket); + + gst_element_set_locked_state (src->rtcp_sink, FALSE); + gst_element_sync_state_with_parent (src->rtcp_sink); + + return GST_STATE_CHANGE_SUCCESS; +} + +static GstStructure * +gst_rist_src_create_stats (GstRistSrc * src) +{ + GObject *session = NULL, *source = NULL; + GstStructure *stats = NULL, *ret; + guint64 dropped = 0, received = 0, recovered = 0, lost = 0; + guint64 duplicates = 0, rtx_sent = 0, rtt = 0; + + ret = gst_structure_new_empty ("rist/x-receiver-stats"); + + g_signal_emit_by_name (src->rtpbin, "get-internal-session", 0, &session); + if (!session) + return ret; + + g_signal_emit_by_name (session, "get-source-by-ssrc", src->rtp_ssrc, &source); + if (source) { + gint packets_lost; + g_object_get (source, "stats", &stats, NULL); + gst_structure_get_int (stats, "packets-lost", &packets_lost); + gst_structure_free (stats); + g_clear_object (&source); + dropped = MAX (packets_lost, 0); + } + g_object_unref (session); + + if (src->jitterbuffer) { + g_object_get (src->jitterbuffer, "stats", &stats, NULL); + gst_structure_get (stats, "num-pushed", G_TYPE_UINT64, &received, + "num-lost", G_TYPE_UINT64, &lost, + "rtx-count", G_TYPE_UINT64, &rtx_sent, + "num-duplicates", G_TYPE_UINT64, &duplicates, + "rtx-success-count", G_TYPE_UINT64, &recovered, + "rtx-rtt", G_TYPE_UINT64, &rtt, NULL); + gst_structure_free (stats); + } + + gst_structure_set (ret, "dropped", G_TYPE_UINT64, dropped, + "received", G_TYPE_UINT64, received, + "recovered", G_TYPE_UINT64, recovered, + "permanently-lost", G_TYPE_UINT64, lost, + "duplicates", G_TYPE_UINT64, duplicates, + "retransmission-requests-sent", G_TYPE_UINT64, rtx_sent, + "rtx-roundtrip-time", G_TYPE_UINT64, rtt, NULL); + + return ret; +} + +static gboolean +gst_rist_src_dump_stats (GstClock * clock, GstClockTime time, GstClockID id, + gpointer user_data) +{ + GstRistSrc *src = GST_RIST_SRC (user_data); + GstStructure *stats = gst_rist_src_create_stats (src); + + gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (src), stats); + + gst_structure_free (stats); + return TRUE; +} + +static void +gst_rist_src_enable_stats_interval (GstRistSrc * src) +{ + GstClock *clock; + GstClockTime start, interval; + + if (src->stats_interval == 0) + return; + + interval = src->stats_interval * GST_MSECOND; + clock = gst_system_clock_obtain (); + start = gst_clock_get_time (clock) + interval; + + src->stats_cid = gst_clock_new_periodic_id (clock, start, interval); + gst_clock_id_wait_async (src->stats_cid, gst_rist_src_dump_stats, + gst_object_ref (src), (GDestroyNotify) gst_object_unref); + + gst_object_unref (clock); +} + +static void +gst_rist_src_disable_stats_interval (GstRistSrc * src) +{ + if (src->stats_cid) { + gst_clock_id_unschedule (src->stats_cid); + gst_clock_id_unref (src->stats_cid); + src->stats_cid = NULL; + } +} + +static void +gst_rist_src_stop (GstRistSrc * src) +{ + GstPad *pad; + + if (src->rtcp_recv_probe) { + pad = gst_element_get_static_pad (src->rtcp_src, "src"); + gst_pad_remove_probe (pad, src->rtcp_recv_probe); + src->rtcp_recv_probe = 0; + gst_object_unref (pad); + } + + pad = gst_element_get_static_pad (src->rtcp_sink, "sink"); + gst_pad_remove_probe (pad, src->rtcp_send_probe); + src->rtcp_send_probe = 0; + gst_object_unref (pad); +} + +static GstStateChangeReturn +gst_rist_src_change_state (GstElement * element, GstStateChange transition) +{ + GstRistSrc *src = GST_RIST_SRC (element); + GstStateChangeReturn ret; + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rist_src_disable_stats_interval (src); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (gst_rist_src_parent_class)->change_state (element, + transition); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + gst_rist_src_start (src); + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_rist_src_enable_stats_interval (src); + break; + case GST_STATE_CHANGE_READY_TO_NULL: + gst_rist_src_stop (src); + break; + default: + break; + } + + return ret; +} + +static void +gst_rist_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRistSrc *src = GST_RIST_SRC (object); + GstElement *session = NULL; + GstClockTime interval; + GstStructure *sdes; + + if (src->construct_failed) + return; + + switch (prop_id) { + case PROP_ADDRESS: + g_object_get_property (G_OBJECT (src->rtp_src), "address", value); + break; + + case PROP_PORT: + g_object_get_property (G_OBJECT (src->rtp_src), "port", value); + break; + + case PROP_RECEIVER_BUFFER: + g_object_get_property (G_OBJECT (src->rtpbin), "latency", value); + break; + + case PROP_REORDER_SECTION: + GST_OBJECT_LOCK (src); + g_value_set_uint (value, src->reorder_section); + GST_OBJECT_UNLOCK (src); + break; + + case PROP_MAX_RTX_RETRIES: + GST_OBJECT_LOCK (src); + g_value_set_uint (value, src->max_rtx_retries); + GST_OBJECT_UNLOCK (src); + break; + + case PROP_MIN_RTCP_INTERVAL: + g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); + g_object_get (session, "rtcp-min-interval", &interval, NULL); + g_value_set_uint (value, (guint) (interval / GST_MSECOND)); + g_object_unref (session); + break; + + case PROP_MAX_RTCP_BANDWIDTH: + g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); + g_object_get_property (G_OBJECT (session), "rtcp-fraction", value); + g_object_unref (session); + break; + + case PROP_STATS_UPDATE_INTERVAL: + g_value_set_uint (value, src->stats_interval); + break; + + case PROP_STATS: + g_value_take_boxed (value, gst_rist_src_create_stats (src)); + break; + + case PROP_CNAME: + g_object_get (src->rtpbin, "sdes", &sdes, NULL); + g_value_set_string (value, gst_structure_get_string (sdes, "cname")); + gst_structure_free (sdes); + break; + + case PROP_MULTICAST_LOOPBACK: + g_object_get_property (G_OBJECT (src->rtp_src), "loop", value); + break; + + case PROP_MULTICAST_IFACE: + g_object_get_property (G_OBJECT (src->rtp_src), "multicast-iface", value); + break; + + case PROP_MULTICAST_TTL: + g_value_set_int (value, src->multicast_ttl); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRistSrc *src = GST_RIST_SRC (object); + GstElement *session = NULL; + GstStructure *sdes; + + if (src->construct_failed) + return; + + switch (prop_id) { + case PROP_ADDRESS: + g_object_set_property (G_OBJECT (src->rtp_src), "address", value); + g_object_set_property (G_OBJECT (src->rtcp_src), "address", value); + break; + + case PROP_PORT:{ + guint port = g_value_get_uint (value); + + /* According to 5.1.1, RTCP receiver port most be event number and RTCP + * port should be the RTP port + 1 */ + + if (port & 0x1) { + g_warning ("Invalid RIST port %u, should be an even number.", port); + return; + } + + g_object_set (src->rtp_src, "port", port, NULL); + g_object_set (src->rtcp_src, "port", port + 1, NULL); + break; + } + + case PROP_RECEIVER_BUFFER: + g_object_set (src->rtpbin, "latency", g_value_get_uint (value), NULL); + break; + + case PROP_REORDER_SECTION: + GST_OBJECT_LOCK (src); + src->reorder_section = g_value_get_uint (value); + GST_OBJECT_UNLOCK (src); + break; + + case PROP_MAX_RTX_RETRIES: + GST_OBJECT_LOCK (src); + src->max_rtx_retries = g_value_get_uint (value); + GST_OBJECT_UNLOCK (src); + break; + + case PROP_MIN_RTCP_INTERVAL: + g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); + g_object_set (session, "rtcp-min-interval", + g_value_get_uint (value) * GST_MSECOND, NULL); + g_object_unref (session); + break; + + case PROP_MAX_RTCP_BANDWIDTH: + g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); + g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL); + g_object_unref (session); + break; + + case PROP_STATS_UPDATE_INTERVAL: + src->stats_interval = g_value_get_uint (value); + break; + + case PROP_CNAME: + g_object_get (src->rtpbin, "sdes", &sdes, NULL); + gst_structure_set_value (sdes, "cname", value); + g_object_set (src->rtpbin, "sdes", sdes, NULL); + gst_structure_free (sdes); + break; + + case PROP_MULTICAST_LOOPBACK: + g_object_set_property (G_OBJECT (src->rtp_src), "loop", value); + g_object_set_property (G_OBJECT (src->rtcp_src), "loop", value); + break; + + case PROP_MULTICAST_IFACE: + g_object_set_property (G_OBJECT (src->rtp_src), "multicast-iface", value); + g_object_set_property (G_OBJECT (src->rtcp_src), + "multicast-iface", value); + break; + + case PROP_MULTICAST_TTL: + src->multicast_ttl = g_value_get_int (value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rist_src_finalize (GObject * object) +{ + GstRistSrc *src = GST_RIST_SRC (object); + + if (src->jitterbuffer) + gst_object_unref (src->jitterbuffer); + + gst_object_unref (src->rtxbin); + + G_OBJECT_CLASS (gst_rist_src_parent_class)->finalize (object); +} + +static void +gst_rist_src_class_init (GstRistSrcClass * klass) +{ + GstElementClass *element_class = (GstElementClass *) klass; + GObjectClass *object_class = (GObjectClass *) klass; + + gst_element_class_set_metadata (element_class, + "RIST Source", "Source/Network", + "Source that implements RIST TR-06-1 streaming specification", + "Nicolas Dufresne change_state = gst_rist_src_change_state; + + object_class->get_property = gst_rist_src_get_property; + object_class->set_property = gst_rist_src_set_property; + object_class->finalize = gst_rist_src_finalize; + + g_object_class_install_property (object_class, PROP_ADDRESS, + g_param_spec_string ("address", "Address", + "Address to receive packets from (can be IPv4 or IPv6).", "0.0.0.0", + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_PORT, + g_param_spec_uint ("port", "Port", "The port to listen for RTP packets, " + "RTCP port is derived from it, this port must be an even number.", + 2, 65534, 5004, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_RECEIVER_BUFFER, + g_param_spec_uint ("receiver-buffer", "Receiver Buffer", + "Buffering duration in ms", 0, G_MAXUINT, 1000, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_REORDER_SECTION, + g_param_spec_uint ("reorder-section", "Recorder Section", + "Time to wait before sending retransmission request in ms.", + 0, G_MAXUINT, 70, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MAX_RTX_RETRIES, + g_param_spec_uint ("max-rtx-retries", "Maximum Retransmission Retries", + "The maximum number of retransmission requests for a lost packet.", + 0, G_MAXUINT, 7, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL, + g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal", + "The minimum interval in ms between two successive RTCP packets", + 0, 100, 100, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH, + g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth", + "The maximum bandwidth used for RTCP in fraction of RTP bandwdith", + 0.0, 0.05, 0.05, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL, + g_param_spec_uint ("stats-update-interval", "Statistics Update Interval", + "The interval between 'stats' update notification (0 disabled)", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_STATS, + g_param_spec_boxed ("stats", "Statistics", + "Statistic in a GstStructure named 'rist/x-receiver-stats'", + GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_CNAME, + g_param_spec_string ("cname", "CName", + "Set the CNAME in the SDES block of the receiver report.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK, + g_param_spec_boolean ("multicast-loopback", "Multicast Loopback", + "When enabled, the packet will be received locally.", FALSE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_MULTICAST_IFACE, + g_param_spec_string ("multicast-iface", "multicast-iface", + "The multicast interface to use to send packets.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_MULTICAST_TTL, + g_param_spec_int ("multicast-ttl", "Multicast TTL", + "The multicast time-to-live parameter.", 0, 255, 1, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); +} + +static GstURIType +gst_rist_src_uri_get_type (GType type) +{ + return GST_URI_SRC; +} + +static const gchar *const * +gst_rist_src_uri_get_protocols (GType type) +{ + static const char *protocols[] = { "rist", NULL }; + return protocols; +} + +static gchar * +gst_rist_src_uri_get_uri (GstURIHandler * handler) +{ + GstRistSrc *src = GST_RIST_SRC (handler); + gchar *uri = NULL; + + GST_OBJECT_LOCK (src); + if (src->uri) + uri = gst_uri_to_string (src->uri); + GST_OBJECT_UNLOCK (src); + + return uri; +} + +static void +gst_rist_src_uri_query_foreach (const gchar * key, const gchar * value, + GObject * src) +{ + if (g_str_equal (key, "async-handling")) { + GST_WARNING_OBJECT (src, "Setting '%s' property from URI is not allowed.", + key); + return; + } + + GST_DEBUG_OBJECT (src, "Setting property '%s' to '%s'", key, value); + gst_util_set_object_arg (src, key, value); +} + +static gboolean +gst_rist_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, + GError ** error) +{ + GstRistSrc *src = GST_RIST_SRC (handler); + GstUri *gsturi; + GHashTable *query_table; + + if (GST_STATE (src) >= GST_STATE_PAUSED) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE, + "Changing the URI on ristsrc when it is running is not supported"); + GST_ERROR_OBJECT (src, "%s", (*error)->message); + return FALSE; + } + + if (!(gsturi = gst_uri_from_string (uri))) { + g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, + "Could not parse URI"); + GST_ERROR_OBJECT (src, "%s", (*error)->message); + gst_uri_unref (gsturi); + return FALSE; + } + + GST_OBJECT_LOCK (src); + if (src->uri) + gst_uri_unref (src->uri); + src->uri = gst_uri_ref (gsturi); + GST_OBJECT_UNLOCK (src); + + g_object_set (src, "address", gst_uri_get_host (gsturi), NULL); + if (gst_uri_get_port (gsturi)) + g_object_set (src, "port", gst_uri_get_port (gsturi), NULL); + + query_table = gst_uri_get_query_table (gsturi); + if (query_table) + g_hash_table_foreach (query_table, + (GHFunc) gst_rist_src_uri_query_foreach, src); + + gst_uri_unref (gsturi); + return TRUE; +} + +static void +gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data) +{ + GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; + + iface->get_type = gst_rist_src_uri_get_type; + iface->get_protocols = gst_rist_src_uri_get_protocols; + iface->get_uri = gst_rist_src_uri_get_uri; + iface->set_uri = gst_rist_src_uri_set_uri; +} diff --git a/gst/rist/meson.build b/gst/rist/meson.build new file mode 100644 index 0000000000..9b8cf98979 --- /dev/null +++ b/gst/rist/meson.build @@ -0,0 +1,17 @@ +rist_sources = [ + 'gstristrtxsend.c', + 'gstristrtxreceive.c', + 'gstristsrc.c', + 'gstristsink.c', + 'gstristplugin.c', +] + +gstrist = library('gstrist', + rist_sources, + c_args : gst_plugins_bad_args, + include_directories : [configinc], + dependencies : [gstrtp_dep, gstnet_dep, gio_dep], + install : true, + install_dir : plugins_install_dir, +) +pkgconfig.generate(gstrist, install_dir : plugins_pkgconfig_install_dir) diff --git a/meson_options.txt b/meson_options.txt index 7439513504..4d067f1e81 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -48,6 +48,7 @@ option('pnm', type : 'feature', value : 'auto') option('proxy', type : 'feature', value : 'auto') option('rawparse', type : 'feature', value : 'auto') option('removesilence', type : 'feature', value : 'auto') +option('rist', type : 'feature', value : 'auto') option('sdp', type : 'feature', value : 'auto') option('segmentclip', type : 'feature', value : 'auto') option('siren', type : 'feature', value : 'auto')