diff --git a/gst/rist/Makefile.am b/gst/rist/Makefile.am index d198f3f8e6..c2c152d019 100644 --- a/gst/rist/Makefile.am +++ b/gst/rist/Makefile.am @@ -5,9 +5,11 @@ libgstrist_la_SOURCES = \ gstristsink.c \ gstristrtxsend.c \ gstristrtxreceive.c \ + gstroundrobin.c \ gstristplugin.c noinst_HEADERS = \ + gstroundrobin.h \ gstrist.h libgstrist_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) \ diff --git a/gst/rist/gstristplugin.c b/gst/rist/gstristplugin.c index cb6dcd361e..be0110c0c8 100644 --- a/gst/rist/gstristplugin.c +++ b/gst/rist/gstristplugin.c @@ -23,6 +23,7 @@ #endif #include "gstrist.h" +#include "gstroundrobin.h" static gboolean plugin_init (GstPlugin * plugin) @@ -39,6 +40,9 @@ plugin_init (GstPlugin * plugin) if (!gst_element_register (plugin, "ristrtxreceive", GST_RANK_NONE, GST_TYPE_RIST_RTX_RECEIVE)) return FALSE; + if (!gst_element_register (plugin, "roundrobin", GST_RANK_NONE, + GST_TYPE_ROUND_ROBIN)) + return FALSE; return TRUE; } diff --git a/gst/rist/gstristsink.c b/gst/rist/gstristsink.c index c4289cae35..7589e4c3c3 100644 --- a/gst/rist/gstristsink.c +++ b/gst/rist/gstristsink.c @@ -37,13 +37,21 @@ * ristsink address=10.0.0.1 port=5004 * ]| * - * Additionally, this element supports bonding, which concist of using multiple - * links in order to transmit the streams. The address of each bonds is - * configured through "bonding-addresses" property. When set, this will replace + * Additionally, this element supports bonding, which consist of using multiple + * links in order to transmit the streams. The address of each link is + * configured through the "bonding-addresses" property. When set, this will replace * the value that might have been set on "address" and "port". Each bonds will be - * mapped to it's down RTP session. RTX request are only replied on the link + * mapped to its down RTP session. RTX request are only replied on the link * the NACK was received from. * + * There is currently two bonding methods in place: "broadcast" and "round-robin". + * In "broadcast" mode, all the packets are duplicated over all sessions. + * While in "round-robin" mode, packets are evenly distributed over the links. One + * can also implement its down dispatcher element and configure it using the + * "dispatcher" property. As a reference, "broadcast" mode is implemented with + * the "tee" element, while "round-robin" mode is implemented with the + * "round-robin" element. + * * ## Exmaple launch line for bonding * |[ * gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! \ @@ -82,9 +90,17 @@ enum PROP_MULTICAST_LOOPBACK, PROP_MULTICAST_IFACE, PROP_MULTICAST_TTL, - PROP_BONDING_ADDRESSES + PROP_BONDING_ADDRESSES, + PROP_BONDING_METHOD, + PROP_DISPATCHER }; +typedef enum +{ + GST_RIST_BONDING_METHOD_BROADCAST, + GST_RIST_BONDING_METHOD_ROUND_ROBIN, +} GstRistBondingMethod; + static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, @@ -113,13 +129,14 @@ struct _GstRistSink GstElement *ssrc_filter; GstPad *sinkpad; GstElement *rtxbin; - GstElement *rtx_tee; + GstElement *dispatcher; /* Common properties, protected by bonds_lock */ gint multicast_ttl; gboolean multicast_loopback; GstClockTime min_rtcp_interval; gdouble max_rtcp_bandwidth; + GstRistBondingMethod bonding_method; /* Bonds */ GPtrArray *bonds; @@ -139,6 +156,26 @@ struct _GstRistSink const gchar *missing_plugin; }; +static GType +gst_rist_bonding_method_get_type (void) +{ + static gsize id = 0; + static const GEnumValue values[] = { + {GST_RIST_BONDING_METHOD_BROADCAST, + "GST_RIST_BONDING_METHOD_BROADCAST", "broadcast"}, + {GST_RIST_BONDING_METHOD_ROUND_ROBIN, + "GST_RIST_BONDING_METHOD_ROUND_ROBIN", "round-robin"}, + {0, NULL, NULL} + }; + + if (g_once_init_enter (&id)) { + GType tmp = g_enum_register_static ("GstRistBondingMethodType", values); + g_once_init_leave (&id, tmp); + } + + return (GType) id; +} + G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN, GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink")); @@ -193,11 +230,6 @@ gst_rist_sink_add_bond (GstRistSink * sink) gst_object_unref (pad); gst_element_add_pad (sink->rtxbin, gpad); - g_snprintf (name, 32, "src_%u", bond->session); - pad = gst_element_get_request_pad (sink->rtx_tee, name); - gst_element_link_pads (sink->rtx_tee, name, bond->rtx_queue, "sink"); - gst_object_unref (pad); - g_object_set (bond->rtx_send, "max-size-packets", 0, NULL); g_snprintf (name, 32, "send_rtp_sink_%u", bond->session); @@ -411,7 +443,7 @@ gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) static void gst_rist_sink_init (GstRistSink * sink) { - GstPad *ssrc_filter_sinkpad, *rtx_tee_sinkpad, *rtxbin_gpad; + GstPad *ssrc_filter_sinkpad, *rtxbin_gpad; GstCaps *ssrc_caps; GstStructure *sdes = NULL; RistSenderBond *bond; @@ -455,16 +487,7 @@ gst_rist_sink_init (GstRistSink * sink) sink->rtxbin = gst_bin_new ("rist_send_rtxbin"); g_object_ref_sink (sink->rtxbin); - sink->rtx_tee = gst_element_factory_make ("tee", "rist_rtx_tee"); - if (!sink->rtx_tee) { - sink->missing_plugin = "coreelements"; - goto missing_plugin; - } - gst_bin_add (GST_BIN (sink->rtxbin), sink->rtx_tee); - - rtx_tee_sinkpad = gst_element_get_static_pad (sink->rtx_tee, "sink"); - rtxbin_gpad = gst_ghost_pad_new ("sink_0", rtx_tee_sinkpad); - gst_object_unref (rtx_tee_sinkpad); + rtxbin_gpad = gst_ghost_pad_new_no_target ("sink_0", GST_PAD_SINK); gst_element_add_pad (sink->rtxbin, rtxbin_gpad); sink->ssrc_filter = gst_element_factory_make ("capsfilter", @@ -575,8 +598,28 @@ dns_resolve_failed: static GstStateChangeReturn gst_rist_sink_start (GstRistSink * sink) { + GstPad *dispatcher_sinkpad, *rtxbin_gpad; gint i; + /* Unless a custom dispatcher was provided, use the specified bonding method + * to create one */ + if (!sink->dispatcher) { + switch (sink->bonding_method) { + case GST_RIST_BONDING_METHOD_BROADCAST: + sink->dispatcher = gst_element_factory_make ("tee", "rist_dispatcher"); + if (!sink->dispatcher) { + sink->missing_plugin = "coreelements"; + sink->construct_failed = TRUE; + } + break; + case GST_RIST_BONDING_METHOD_ROUND_ROBIN: + sink->dispatcher = gst_element_factory_make ("roundrobin", + "rist_dispatcher"); + g_assert (sink->dispatcher); + break; + } + } + if (sink->construct_failed) { GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN, ("Your GStreamer installation is missing plugin '%s'", @@ -584,15 +627,29 @@ gst_rist_sink_start (GstRistSink * sink) return GST_STATE_CHANGE_FAILURE; } + gst_bin_add (GST_BIN (sink->rtxbin), sink->dispatcher); + dispatcher_sinkpad = gst_element_get_static_pad (sink->dispatcher, "sink"); + rtxbin_gpad = gst_element_get_static_pad (sink->rtxbin, "sink_0"); + gst_ghost_pad_set_target (GST_GHOST_PAD (rtxbin_gpad), dispatcher_sinkpad); + gst_object_unref (dispatcher_sinkpad); + gst_object_unref (rtxbin_gpad); + for (i = 0; i < sink->bonds->len; i++) { RistSenderBond *bond = g_ptr_array_index (sink->bonds, i); GObject *session = NULL; + GstPad *pad; + gchar name[32]; g_signal_emit_by_name (sink->rtpbin, "get-session", i, &session); g_object_set (session, "rtcp-min-interval", sink->min_rtcp_interval, "rtcp-fraction", sink->max_rtcp_bandwidth, NULL); g_object_unref (session); + g_snprintf (name, 32, "src_%u", bond->session); + pad = gst_element_get_request_pad (sink->dispatcher, name); + gst_element_link_pads (sink->dispatcher, name, bond->rtx_queue, "sink"); + gst_object_unref (pad); + if (!gst_rist_sink_setup_rtcp_socket (sink, bond)) return GST_STATE_CHANGE_FAILURE; } @@ -978,6 +1035,14 @@ gst_rist_sink_get_property (GObject * object, guint prop_id, g_value_take_string (value, gst_rist_sink_get_bonds (sink)); break; + case PROP_BONDING_METHOD: + g_value_set_enum (value, sink->bonding_method); + break; + + case PROP_DISPATCHER: + g_value_set_object (value, sink->dispatcher); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1071,6 +1136,16 @@ gst_rist_sink_set_property (GObject * object, guint prop_id, gst_rist_sink_set_bonds (sink, g_value_get_string (value)); break; + case PROP_BONDING_METHOD: + sink->bonding_method = g_value_get_enum (value); + break; + + case PROP_DISPATCHER: + if (sink->dispatcher) + g_object_unref (sink->dispatcher); + sink->dispatcher = g_object_ref_sink (g_value_get_object (value)); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1185,4 +1260,19 @@ gst_rist_sink_class_init (GstRistSinkClass * klass) g_param_spec_string ("bonding-addresses", "Bonding Addresses", "Comma (,) seperated list of
: to send to. ", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (object_class, PROP_BONDING_METHOD, + g_param_spec_enum ("bonding-method", "Bonding Method", + "Defines the bonding method to use.", + gst_rist_bonding_method_get_type (), + GST_RIST_BONDING_METHOD_BROADCAST, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_DISPATCHER, + g_param_spec_object ("dispatcher", "Bonding Dispatcher", + "An element that takes care of multi-plexing bounds. When set " + "\"bonding-method\" is ignored.", + GST_TYPE_ELEMENT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); } diff --git a/gst/rist/gstroundrobin.c b/gst/rist/gstroundrobin.c new file mode 100644 index 0000000000..55e4152a36 --- /dev/null +++ b/gst/rist/gstroundrobin.c @@ -0,0 +1,135 @@ +/* GStreamer Round Robin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-roundrobin + * @title: roundrobin + * + * This is a generic element that will distribute equally incoming + * buffers over multiple src pads. This is the opposite of tee + * element, which duplicates buffers over all pads. This element + * can be used to distrute load across multiple branches when the buffer + * can be processed indepently. + */ + +#include "gstroundrobin.h" + +GST_DEBUG_CATEGORY_STATIC (gst_round_robin_debug); +#define GST_CAT_DEFAULT gst_round_robin_debug + +static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("ANY")); + +static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src_%d", + GST_PAD_SRC, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("ANY")); + +struct _GstRoundRobin +{ + GstElement parent; + gint index; +}; + +G_DEFINE_TYPE_WITH_CODE (GstRoundRobin, gst_round_robin, + GST_TYPE_ELEMENT, GST_DEBUG_CATEGORY_INIT (gst_round_robin_debug, + "roundrobin", 0, "Round Robin")); + +static GstFlowReturn +gst_round_robin_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstRoundRobin *disp = (GstRoundRobin *) parent; + GstElement *elem = (GstElement *) parent; + GstPad *src_pad = NULL; + GstFlowReturn ret; + + GST_OBJECT_LOCK (disp); + if (disp->index >= elem->numsrcpads) + disp->index = 0; + + src_pad = g_list_nth_data (elem->srcpads, disp->index); + + if (src_pad) { + gst_object_ref (src_pad); + disp->index += 1; + } + GST_OBJECT_UNLOCK (disp); + + if (!src_pad) + /* no pad, that's fine */ + return GST_FLOW_OK; + + ret = gst_pad_push (src_pad, buffer); + gst_object_unref (src_pad); + + return ret; +} + +static GstPad * +gst_round_robin_request_pad (GstElement * element, GstPadTemplate * templ, + const gchar * name, const GstCaps * caps) +{ + GstPad *pad; + + pad = gst_element_get_static_pad (element, name); + if (pad) { + gst_object_unref (pad); + return NULL; + } + + pad = gst_pad_new_from_static_template (&src_templ, name); + gst_element_add_pad (element, pad); + + return pad; +} + +static void +gst_round_robin_init (GstRoundRobin * disp) +{ + GstPad *pad; + + gst_element_create_all_pads (GST_ELEMENT (disp)); + pad = GST_PAD (GST_ELEMENT (disp)->sinkpads->data); + + GST_PAD_SET_PROXY_CAPS (pad); + GST_PAD_SET_PROXY_SCHEDULING (pad); + /* do not proxy allocation, it requires special handling like tee does */ + + gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_round_robin_chain)); +} + +static void +gst_round_robin_class_init (GstRoundRobinClass * klass) +{ + GstElementClass *element_class = (GstElementClass *) klass; + + gst_element_class_set_metadata (element_class, + "Round Robin", "Source/Network", + "A round robin dispatcher element.", + "Nicolas Dufresne request_new_pad = + GST_DEBUG_FUNCPTR (gst_round_robin_request_pad); +} diff --git a/gst/rist/gstroundrobin.h b/gst/rist/gstroundrobin.h new file mode 100644 index 0000000000..d1a918059d --- /dev/null +++ b/gst/rist/gstroundrobin.h @@ -0,0 +1,34 @@ +/* GStreamer RIST plugin + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include + +#ifndef __GST_ROUND_ROBIN_H__ +#define __GST_ROUND_ROBIN_H__ + +#define GST_TYPE_ROUND_ROBIN (gst_round_robin_get_type()) +#define GST_ROUND_ROBIN(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_ROUND_ROBIN,GstRoundRobin)) +typedef struct _GstRoundRobin GstRoundRobin; +typedef struct { + GstElementClass parent; +} GstRoundRobinClass; +GType gst_round_robin_get_type (void); + +#endif diff --git a/gst/rist/meson.build b/gst/rist/meson.build index f9eacddd77..9bd5ccd646 100644 --- a/gst/rist/meson.build +++ b/gst/rist/meson.build @@ -1,4 +1,5 @@ rist_sources = [ + 'gstroundrobin.c', 'gstristrtxsend.c', 'gstristrtxreceive.c', 'gstristsrc.c',