rist: Add combined bonding-method support

This patchs add support for configuring the bonding method used. There is
two method specified

 - redundant: All the RTP packets are replicated
 - combined: RTP packet are evenly distributed over each links

Additionally, an application can set the "dispatcher" property in order
to implement custom dispatching method. Whenever the "dispatcher"
property is set, "bonding-method" property will be ignored.
This commit is contained in:
Nicolas Dufresne 2019-05-07 15:58:04 -04:00 committed by Nicolas Dufresne
parent 9a443c04bc
commit 98acb3260d
6 changed files with 288 additions and 22 deletions

View file

@ -5,9 +5,11 @@ libgstrist_la_SOURCES = \
gstristsink.c \
gstristrtxsend.c \
gstristrtxreceive.c \
gstroundrobin.c \
gstristplugin.c
noinst_HEADERS = \
gstroundrobin.h \
gstrist.h
libgstrist_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) \

View file

@ -23,6 +23,7 @@
#endif
#include "gstrist.h"
#include "gstroundrobin.h"
static gboolean
plugin_init (GstPlugin * plugin)
@ -39,6 +40,9 @@ plugin_init (GstPlugin * plugin)
if (!gst_element_register (plugin, "ristrtxreceive", GST_RANK_NONE,
GST_TYPE_RIST_RTX_RECEIVE))
return FALSE;
if (!gst_element_register (plugin, "roundrobin", GST_RANK_NONE,
GST_TYPE_ROUND_ROBIN))
return FALSE;
return TRUE;
}

View file

@ -37,13 +37,21 @@
* ristsink address=10.0.0.1 port=5004
* ]|
*
* Additionally, this element supports bonding, which concist of using multiple
* links in order to transmit the streams. The address of each bonds is
* configured through "bonding-addresses" property. When set, this will replace
* Additionally, this element supports bonding, which consist of using multiple
* links in order to transmit the streams. The address of each link is
* configured through the "bonding-addresses" property. When set, this will replace
* the value that might have been set on "address" and "port". Each bonds will be
* mapped to it's down RTP session. RTX request are only replied on the link
* mapped to its down RTP session. RTX request are only replied on the link
* the NACK was received from.
*
* There is currently two bonding methods in place: "broadcast" and "round-robin".
* In "broadcast" mode, all the packets are duplicated over all sessions.
* While in "round-robin" mode, packets are evenly distributed over the links. One
* can also implement its down dispatcher element and configure it using the
* "dispatcher" property. As a reference, "broadcast" mode is implemented with
* the "tee" element, while "round-robin" mode is implemented with the
* "round-robin" element.
*
* ## Exmaple launch line for bonding
* |[
* gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! \
@ -82,9 +90,17 @@ enum
PROP_MULTICAST_LOOPBACK,
PROP_MULTICAST_IFACE,
PROP_MULTICAST_TTL,
PROP_BONDING_ADDRESSES
PROP_BONDING_ADDRESSES,
PROP_BONDING_METHOD,
PROP_DISPATCHER
};
typedef enum
{
GST_RIST_BONDING_METHOD_BROADCAST,
GST_RIST_BONDING_METHOD_ROUND_ROBIN,
} GstRistBondingMethod;
static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
@ -113,13 +129,14 @@ struct _GstRistSink
GstElement *ssrc_filter;
GstPad *sinkpad;
GstElement *rtxbin;
GstElement *rtx_tee;
GstElement *dispatcher;
/* Common properties, protected by bonds_lock */
gint multicast_ttl;
gboolean multicast_loopback;
GstClockTime min_rtcp_interval;
gdouble max_rtcp_bandwidth;
GstRistBondingMethod bonding_method;
/* Bonds */
GPtrArray *bonds;
@ -139,6 +156,26 @@ struct _GstRistSink
const gchar *missing_plugin;
};
static GType
gst_rist_bonding_method_get_type (void)
{
static gsize id = 0;
static const GEnumValue values[] = {
{GST_RIST_BONDING_METHOD_BROADCAST,
"GST_RIST_BONDING_METHOD_BROADCAST", "broadcast"},
{GST_RIST_BONDING_METHOD_ROUND_ROBIN,
"GST_RIST_BONDING_METHOD_ROUND_ROBIN", "round-robin"},
{0, NULL, NULL}
};
if (g_once_init_enter (&id)) {
GType tmp = g_enum_register_static ("GstRistBondingMethodType", values);
g_once_init_leave (&id, tmp);
}
return (GType) id;
}
G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN,
GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink"));
@ -193,11 +230,6 @@ gst_rist_sink_add_bond (GstRistSink * sink)
gst_object_unref (pad);
gst_element_add_pad (sink->rtxbin, gpad);
g_snprintf (name, 32, "src_%u", bond->session);
pad = gst_element_get_request_pad (sink->rtx_tee, name);
gst_element_link_pads (sink->rtx_tee, name, bond->rtx_queue, "sink");
gst_object_unref (pad);
g_object_set (bond->rtx_send, "max-size-packets", 0, NULL);
g_snprintf (name, 32, "send_rtp_sink_%u", bond->session);
@ -411,7 +443,7 @@ gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
static void
gst_rist_sink_init (GstRistSink * sink)
{
GstPad *ssrc_filter_sinkpad, *rtx_tee_sinkpad, *rtxbin_gpad;
GstPad *ssrc_filter_sinkpad, *rtxbin_gpad;
GstCaps *ssrc_caps;
GstStructure *sdes = NULL;
RistSenderBond *bond;
@ -455,16 +487,7 @@ gst_rist_sink_init (GstRistSink * sink)
sink->rtxbin = gst_bin_new ("rist_send_rtxbin");
g_object_ref_sink (sink->rtxbin);
sink->rtx_tee = gst_element_factory_make ("tee", "rist_rtx_tee");
if (!sink->rtx_tee) {
sink->missing_plugin = "coreelements";
goto missing_plugin;
}
gst_bin_add (GST_BIN (sink->rtxbin), sink->rtx_tee);
rtx_tee_sinkpad = gst_element_get_static_pad (sink->rtx_tee, "sink");
rtxbin_gpad = gst_ghost_pad_new ("sink_0", rtx_tee_sinkpad);
gst_object_unref (rtx_tee_sinkpad);
rtxbin_gpad = gst_ghost_pad_new_no_target ("sink_0", GST_PAD_SINK);
gst_element_add_pad (sink->rtxbin, rtxbin_gpad);
sink->ssrc_filter = gst_element_factory_make ("capsfilter",
@ -575,8 +598,28 @@ dns_resolve_failed:
static GstStateChangeReturn
gst_rist_sink_start (GstRistSink * sink)
{
GstPad *dispatcher_sinkpad, *rtxbin_gpad;
gint i;
/* Unless a custom dispatcher was provided, use the specified bonding method
* to create one */
if (!sink->dispatcher) {
switch (sink->bonding_method) {
case GST_RIST_BONDING_METHOD_BROADCAST:
sink->dispatcher = gst_element_factory_make ("tee", "rist_dispatcher");
if (!sink->dispatcher) {
sink->missing_plugin = "coreelements";
sink->construct_failed = TRUE;
}
break;
case GST_RIST_BONDING_METHOD_ROUND_ROBIN:
sink->dispatcher = gst_element_factory_make ("roundrobin",
"rist_dispatcher");
g_assert (sink->dispatcher);
break;
}
}
if (sink->construct_failed) {
GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN,
("Your GStreamer installation is missing plugin '%s'",
@ -584,15 +627,29 @@ gst_rist_sink_start (GstRistSink * sink)
return GST_STATE_CHANGE_FAILURE;
}
gst_bin_add (GST_BIN (sink->rtxbin), sink->dispatcher);
dispatcher_sinkpad = gst_element_get_static_pad (sink->dispatcher, "sink");
rtxbin_gpad = gst_element_get_static_pad (sink->rtxbin, "sink_0");
gst_ghost_pad_set_target (GST_GHOST_PAD (rtxbin_gpad), dispatcher_sinkpad);
gst_object_unref (dispatcher_sinkpad);
gst_object_unref (rtxbin_gpad);
for (i = 0; i < sink->bonds->len; i++) {
RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
GObject *session = NULL;
GstPad *pad;
gchar name[32];
g_signal_emit_by_name (sink->rtpbin, "get-session", i, &session);
g_object_set (session, "rtcp-min-interval", sink->min_rtcp_interval,
"rtcp-fraction", sink->max_rtcp_bandwidth, NULL);
g_object_unref (session);
g_snprintf (name, 32, "src_%u", bond->session);
pad = gst_element_get_request_pad (sink->dispatcher, name);
gst_element_link_pads (sink->dispatcher, name, bond->rtx_queue, "sink");
gst_object_unref (pad);
if (!gst_rist_sink_setup_rtcp_socket (sink, bond))
return GST_STATE_CHANGE_FAILURE;
}
@ -978,6 +1035,14 @@ gst_rist_sink_get_property (GObject * object, guint prop_id,
g_value_take_string (value, gst_rist_sink_get_bonds (sink));
break;
case PROP_BONDING_METHOD:
g_value_set_enum (value, sink->bonding_method);
break;
case PROP_DISPATCHER:
g_value_set_object (value, sink->dispatcher);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -1071,6 +1136,16 @@ gst_rist_sink_set_property (GObject * object, guint prop_id,
gst_rist_sink_set_bonds (sink, g_value_get_string (value));
break;
case PROP_BONDING_METHOD:
sink->bonding_method = g_value_get_enum (value);
break;
case PROP_DISPATCHER:
if (sink->dispatcher)
g_object_unref (sink->dispatcher);
sink->dispatcher = g_object_ref_sink (g_value_get_object (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -1185,4 +1260,19 @@ gst_rist_sink_class_init (GstRistSinkClass * klass)
g_param_spec_string ("bonding-addresses", "Bonding Addresses",
"Comma (,) seperated list of <address>:<port> to send to. ", NULL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (object_class, PROP_BONDING_METHOD,
g_param_spec_enum ("bonding-method", "Bonding Method",
"Defines the bonding method to use.",
gst_rist_bonding_method_get_type (),
GST_RIST_BONDING_METHOD_BROADCAST,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
g_object_class_install_property (object_class, PROP_DISPATCHER,
g_param_spec_object ("dispatcher", "Bonding Dispatcher",
"An element that takes care of multi-plexing bounds. When set "
"\"bonding-method\" is ignored.",
GST_TYPE_ELEMENT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY));
}

135
gst/rist/gstroundrobin.c Normal file
View file

@ -0,0 +1,135 @@
/* GStreamer Round Robin
* Copyright (C) 2019 Net Insight AB
* Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-roundrobin
* @title: roundrobin
*
* This is a generic element that will distribute equally incoming
* buffers over multiple src pads. This is the opposite of tee
* element, which duplicates buffers over all pads. This element
* can be used to distrute load across multiple branches when the buffer
* can be processed indepently.
*/
#include "gstroundrobin.h"
GST_DEBUG_CATEGORY_STATIC (gst_round_robin_debug);
#define GST_CAT_DEFAULT gst_round_robin_debug
static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS ("ANY"));
static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src_%d",
GST_PAD_SRC,
GST_PAD_REQUEST,
GST_STATIC_CAPS ("ANY"));
struct _GstRoundRobin
{
GstElement parent;
gint index;
};
G_DEFINE_TYPE_WITH_CODE (GstRoundRobin, gst_round_robin,
GST_TYPE_ELEMENT, GST_DEBUG_CATEGORY_INIT (gst_round_robin_debug,
"roundrobin", 0, "Round Robin"));
static GstFlowReturn
gst_round_robin_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstRoundRobin *disp = (GstRoundRobin *) parent;
GstElement *elem = (GstElement *) parent;
GstPad *src_pad = NULL;
GstFlowReturn ret;
GST_OBJECT_LOCK (disp);
if (disp->index >= elem->numsrcpads)
disp->index = 0;
src_pad = g_list_nth_data (elem->srcpads, disp->index);
if (src_pad) {
gst_object_ref (src_pad);
disp->index += 1;
}
GST_OBJECT_UNLOCK (disp);
if (!src_pad)
/* no pad, that's fine */
return GST_FLOW_OK;
ret = gst_pad_push (src_pad, buffer);
gst_object_unref (src_pad);
return ret;
}
static GstPad *
gst_round_robin_request_pad (GstElement * element, GstPadTemplate * templ,
const gchar * name, const GstCaps * caps)
{
GstPad *pad;
pad = gst_element_get_static_pad (element, name);
if (pad) {
gst_object_unref (pad);
return NULL;
}
pad = gst_pad_new_from_static_template (&src_templ, name);
gst_element_add_pad (element, pad);
return pad;
}
static void
gst_round_robin_init (GstRoundRobin * disp)
{
GstPad *pad;
gst_element_create_all_pads (GST_ELEMENT (disp));
pad = GST_PAD (GST_ELEMENT (disp)->sinkpads->data);
GST_PAD_SET_PROXY_CAPS (pad);
GST_PAD_SET_PROXY_SCHEDULING (pad);
/* do not proxy allocation, it requires special handling like tee does */
gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_round_robin_chain));
}
static void
gst_round_robin_class_init (GstRoundRobinClass * klass)
{
GstElementClass *element_class = (GstElementClass *) klass;
gst_element_class_set_metadata (element_class,
"Round Robin", "Source/Network",
"A round robin dispatcher element.",
"Nicolas Dufresne <nicolas.dufresne@collabora.com");
gst_element_class_add_static_pad_template (element_class, &sink_templ);
gst_element_class_add_static_pad_template (element_class, &src_templ);
element_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_round_robin_request_pad);
}

34
gst/rist/gstroundrobin.h Normal file
View file

@ -0,0 +1,34 @@
/* GStreamer RIST plugin
* Copyright (C) 2019 Net Insight AB
* Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <gst/gst.h>
#ifndef __GST_ROUND_ROBIN_H__
#define __GST_ROUND_ROBIN_H__
#define GST_TYPE_ROUND_ROBIN (gst_round_robin_get_type())
#define GST_ROUND_ROBIN(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_ROUND_ROBIN,GstRoundRobin))
typedef struct _GstRoundRobin GstRoundRobin;
typedef struct {
GstElementClass parent;
} GstRoundRobinClass;
GType gst_round_robin_get_type (void);
#endif

View file

@ -1,4 +1,5 @@
rist_sources = [
'gstroundrobin.c',
'gstristrtxsend.c',
'gstristrtxreceive.c',
'gstristsrc.c',