mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-10 17:35:59 +00:00
ristsink: Implement bonding support
This commit is contained in:
parent
ca36d70538
commit
ffedd7ce2d
1 changed files with 439 additions and 119 deletions
|
@ -33,7 +33,21 @@
|
|||
*
|
||||
* ## Example launch line
|
||||
* |[
|
||||
* gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! ristsink address=10.0.0.1 port=5004
|
||||
* gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! \
|
||||
* ristsink address=10.0.0.1 port=5004
|
||||
* ]|
|
||||
*
|
||||
* Additionally, this element supports bonding, which concist of using multiple
|
||||
* links in order to transmit the streams. The address of each bonds is
|
||||
* configured through "bonding-addresses" property. When set, this will replace
|
||||
* the value that might have been set on "address" and "port". Each bonds will be
|
||||
* mapped to it's down RTP session. RTX request are only replied on the link
|
||||
* the NACK was received from.
|
||||
*
|
||||
* ## Exmaple launch line for bonding
|
||||
* |[
|
||||
* gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! \
|
||||
* ristsink bonding-addresses="10.0.0.1:5004,11.0.0.1:5006"
|
||||
* ]|
|
||||
*/
|
||||
|
||||
|
@ -44,6 +58,9 @@
|
|||
#include <gio/gio.h>
|
||||
#include <gst/rtp/rtp.h>
|
||||
|
||||
/* for strtol() */
|
||||
#include <stdlib.h>
|
||||
|
||||
#include "gstrist.h"
|
||||
|
||||
GST_DEBUG_CATEGORY_STATIC (gst_rist_sink_debug);
|
||||
|
@ -61,7 +78,8 @@ enum
|
|||
PROP_CNAME,
|
||||
PROP_MULTICAST_LOOPBACK,
|
||||
PROP_MULTICAST_IFACE,
|
||||
PROP_MULTICAST_TTL
|
||||
PROP_MULTICAST_TTL,
|
||||
PROP_BONDING_ADDRESSES
|
||||
};
|
||||
|
||||
static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink",
|
||||
|
@ -69,26 +87,47 @@ static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink",
|
|||
GST_PAD_ALWAYS,
|
||||
GST_STATIC_CAPS ("application/x-rtp"));
|
||||
|
||||
typedef struct
|
||||
{
|
||||
guint session;
|
||||
gchar *address;
|
||||
gchar *multicast_iface;
|
||||
guint port;
|
||||
GstElement *rtcp_src;
|
||||
GstElement *rtp_sink;
|
||||
GstElement *rtcp_sink;
|
||||
GstElement *rtx_send;
|
||||
GstElement *rtx_queue;
|
||||
guint32 rtcp_ssrc;
|
||||
} RistSenderBond;
|
||||
|
||||
struct _GstRistSink
|
||||
{
|
||||
GstBin parent;
|
||||
|
||||
/* Elements contained in the pipeline */
|
||||
/* Common elements in the pipeline */
|
||||
GstElement *rtpbin;
|
||||
GstElement *rtp_sink;
|
||||
GstElement *rtcp_src;
|
||||
GstElement *rtcp_sink;
|
||||
GstElement *ssrc_filter;
|
||||
GstPad *sinkpad;
|
||||
|
||||
/* RTX Elements */
|
||||
GstElement *rtxbin;
|
||||
GstElement *rtx_send;
|
||||
GstElement *rtx_tee;
|
||||
|
||||
/* Common properties, protected by bonds_lock */
|
||||
gint multicast_ttl;
|
||||
gboolean multicast_loopback;
|
||||
GstClockTime min_rtcp_interval;
|
||||
gdouble max_rtcp_bandwidth;
|
||||
|
||||
/* Bonds */
|
||||
GPtrArray *bonds;
|
||||
/* this is needed as setting sibling properties will try to take the object
|
||||
* lock. Thus, any properties that affects the bonds will be protected with
|
||||
* that lock instead of the object lock. */
|
||||
GMutex bonds_lock;
|
||||
|
||||
/* For stats */
|
||||
guint stats_interval;
|
||||
guint32 rtp_ssrc;
|
||||
guint32 rtcp_ssrc;
|
||||
GstClockID stats_cid;
|
||||
|
||||
/* This is set whenever there is a pipeline construction failure, and used
|
||||
|
@ -100,6 +139,94 @@ struct _GstRistSink
|
|||
G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN,
|
||||
GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink"));
|
||||
|
||||
static RistSenderBond *
|
||||
gst_rist_sink_add_bond (GstRistSink * sink)
|
||||
{
|
||||
RistSenderBond *bond = g_slice_new0 (RistSenderBond);
|
||||
GstPad *pad, *gpad;
|
||||
gchar name[32];
|
||||
|
||||
bond->session = sink->bonds->len;
|
||||
bond->address = g_strdup ("localhost");
|
||||
|
||||
g_snprintf (name, 32, "rist_rtp_udpsink%u", bond->session);
|
||||
bond->rtp_sink = gst_element_factory_make ("udpsink", name);
|
||||
if (!bond->rtp_sink) {
|
||||
g_slice_free (RistSenderBond, bond);
|
||||
sink->missing_plugin = "udp";
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* these are all from UDP plugin, so they cannot fail */
|
||||
g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session);
|
||||
bond->rtcp_src = gst_element_factory_make ("udpsrc", name);
|
||||
g_snprintf (name, 32, "rist_rtcp_udpsink%u", bond->session);
|
||||
bond->rtcp_sink = gst_element_factory_make ("udpsink", name);
|
||||
g_object_set (bond->rtcp_sink, "async", FALSE, NULL);
|
||||
|
||||
gst_bin_add_many (GST_BIN (sink), bond->rtp_sink, bond->rtcp_src,
|
||||
bond->rtcp_sink, NULL);
|
||||
gst_element_set_locked_state (bond->rtcp_src, TRUE);
|
||||
gst_element_set_locked_state (bond->rtcp_sink, TRUE);
|
||||
|
||||
g_snprintf (name, 32, "rist_rtx_queue%u", bond->session);
|
||||
bond->rtx_queue = gst_element_factory_make ("queue", name);
|
||||
gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_queue);
|
||||
|
||||
g_snprintf (name, 32, "rist_rtx_send%u", bond->session);
|
||||
bond->rtx_send = gst_element_factory_make ("ristrtxsend", name);
|
||||
if (!bond->rtx_send) {
|
||||
sink->missing_plugin = "rtpmanager";
|
||||
g_slice_free (RistSenderBond, bond);
|
||||
return NULL;
|
||||
}
|
||||
gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_send);
|
||||
|
||||
gst_element_link (bond->rtx_queue, bond->rtx_send);
|
||||
|
||||
pad = gst_element_get_static_pad (bond->rtx_send, "src");
|
||||
g_snprintf (name, 32, "src_%u", bond->session);
|
||||
gpad = gst_ghost_pad_new (name, pad);
|
||||
gst_object_unref (pad);
|
||||
gst_element_add_pad (sink->rtxbin, gpad);
|
||||
|
||||
g_snprintf (name, 32, "src_%u", bond->session);
|
||||
pad = gst_element_get_request_pad (sink->rtx_tee, name);
|
||||
gst_element_link_pads (sink->rtx_tee, name, bond->rtx_queue, "sink");
|
||||
gst_object_unref (pad);
|
||||
|
||||
g_object_set (bond->rtx_send, "max-size-packets", 0, NULL);
|
||||
|
||||
g_snprintf (name, 32, "send_rtp_sink_%u", bond->session);
|
||||
if (bond->session == 0) {
|
||||
gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin, name);
|
||||
} else {
|
||||
GstPad *pad;
|
||||
|
||||
/* to make a sender, we need to create an unused pad on rtpbin, which will
|
||||
* require an unused pad on the rtxbin */
|
||||
g_snprintf (name, 32, "sink_%u", bond->session);
|
||||
pad = gst_ghost_pad_new_no_target (name, GST_PAD_SINK);
|
||||
gst_element_add_pad (sink->rtxbin, pad);
|
||||
|
||||
g_snprintf (name, 32, "send_rtp_sink_%u", bond->session);
|
||||
pad = gst_element_get_request_pad (sink->rtpbin, name);
|
||||
gst_object_unref (pad);
|
||||
}
|
||||
|
||||
g_snprintf (name, 32, "send_rtp_src_%u", bond->session);
|
||||
gst_element_link_pads (sink->rtpbin, name, bond->rtp_sink, "sink");
|
||||
|
||||
g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session);
|
||||
gst_element_link_pads (bond->rtcp_src, "src", sink->rtpbin, name);
|
||||
|
||||
g_snprintf (name, 32, "send_rtcp_src_%u", bond->session);
|
||||
gst_element_link_pads (sink->rtpbin, name, bond->rtcp_sink, "sink");
|
||||
|
||||
g_ptr_array_add (sink->bonds, bond);
|
||||
return bond;
|
||||
}
|
||||
|
||||
static GstCaps *
|
||||
gst_rist_sink_request_pt_map (GstRistSrc * sink, GstElement * session, guint pt)
|
||||
{
|
||||
|
@ -125,9 +252,6 @@ static GstElement *
|
|||
gst_rist_sink_request_aux_sender (GstRistSink * sink, guint session_id,
|
||||
GstElement * rtpbin)
|
||||
{
|
||||
if (session_id != 0)
|
||||
return NULL;
|
||||
|
||||
return gst_object_ref (sink->rtxbin);
|
||||
}
|
||||
|
||||
|
@ -198,11 +322,14 @@ static void
|
|||
gst_rist_sink_on_new_receiver_ssrc (GstRistSink * sink, guint session_id,
|
||||
guint ssrc, GstElement * rtpbin)
|
||||
{
|
||||
RistSenderBond *bond;
|
||||
|
||||
if (session_id != 0)
|
||||
return;
|
||||
|
||||
GST_INFO_OBJECT (sink, "Got RTCP remote SSRC %u", ssrc);
|
||||
sink->rtcp_ssrc = ssrc;
|
||||
bond = g_ptr_array_index (sink->bonds, session_id);
|
||||
bond->rtcp_ssrc = ssrc;
|
||||
}
|
||||
|
||||
static GstPadProbeReturn
|
||||
|
@ -281,10 +408,13 @@ gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
|
|||
static void
|
||||
gst_rist_sink_init (GstRistSink * sink)
|
||||
{
|
||||
GstPad *ssrc_filter_sinkpad;
|
||||
GstPad *ssrc_filter_sinkpad, *rtx_tee_sinkpad, *rtxbin_gpad;
|
||||
GstCaps *ssrc_caps;
|
||||
GstPad *pad, *gpad;
|
||||
GstStructure *sdes = NULL;
|
||||
RistSenderBond *bond;
|
||||
|
||||
g_mutex_init (&sink->bonds_lock);
|
||||
sink->bonds = g_ptr_array_new ();
|
||||
|
||||
/* Construct the RIST RTP sender pipeline.
|
||||
*
|
||||
|
@ -321,43 +451,24 @@ gst_rist_sink_init (GstRistSink * sink)
|
|||
|
||||
sink->rtxbin = gst_bin_new ("rist_send_rtxbin");
|
||||
g_object_ref_sink (sink->rtxbin);
|
||||
sink->rtx_send = gst_element_factory_make ("ristrtxsend", "rist_rtx_send");
|
||||
gst_bin_add (GST_BIN (sink->rtxbin), sink->rtx_send);
|
||||
g_object_set (sink->rtx_send, "max-size-packets", 0, NULL);
|
||||
|
||||
pad = gst_element_get_static_pad (sink->rtx_send, "sink");
|
||||
gpad = gst_ghost_pad_new ("sink_0", pad);
|
||||
gst_object_unref (pad);
|
||||
gst_element_add_pad (sink->rtxbin, gpad);
|
||||
|
||||
pad = gst_element_get_static_pad (sink->rtx_send, "src");
|
||||
gpad = gst_ghost_pad_new ("src_0", pad);
|
||||
gst_object_unref (pad);
|
||||
gst_element_add_pad (sink->rtxbin, gpad);
|
||||
|
||||
sink->rtp_sink = gst_element_factory_make ("udpsink", "rist_rtp_udpsink");
|
||||
sink->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc");
|
||||
sink->rtcp_sink = gst_element_factory_make ("udpsink", "rist_rtcp_udpsink");
|
||||
if (!sink->rtp_sink || !sink->rtcp_src || !sink->rtcp_sink) {
|
||||
g_clear_object (&sink->rtp_sink);
|
||||
g_clear_object (&sink->rtcp_src);
|
||||
g_clear_object (&sink->rtcp_sink);
|
||||
sink->missing_plugin = "udp";
|
||||
goto missing_plugin;
|
||||
}
|
||||
gst_bin_add_many (GST_BIN (sink), sink->rtp_sink, sink->rtcp_src,
|
||||
sink->rtcp_sink, NULL);
|
||||
gst_element_set_locked_state (sink->rtcp_src, TRUE);
|
||||
gst_element_set_locked_state (sink->rtcp_sink, TRUE);
|
||||
|
||||
sink->ssrc_filter = gst_element_factory_make ("capsfilter",
|
||||
"rist_ssrc_filter");
|
||||
if (!sink->ssrc_filter) {
|
||||
sink->rtx_tee = gst_element_factory_make ("tee", "rist_rtx_tee");
|
||||
if (!sink->rtx_tee) {
|
||||
sink->missing_plugin = "coreelements";
|
||||
goto missing_plugin;
|
||||
}
|
||||
gst_bin_add (GST_BIN (sink->rtxbin), sink->rtx_tee);
|
||||
|
||||
rtx_tee_sinkpad = gst_element_get_static_pad (sink->rtx_tee, "sink");
|
||||
rtxbin_gpad = gst_ghost_pad_new ("sink_0", rtx_tee_sinkpad);
|
||||
gst_object_unref (rtx_tee_sinkpad);
|
||||
gst_element_add_pad (sink->rtxbin, rtxbin_gpad);
|
||||
|
||||
sink->ssrc_filter = gst_element_factory_make ("capsfilter",
|
||||
"rist_ssrc_filter");
|
||||
gst_bin_add (GST_BIN (sink), sink->ssrc_filter);
|
||||
|
||||
/* RIST RTP SSRC should have LSB set to 0 */
|
||||
sink->rtp_ssrc = g_random_int () & ~1;
|
||||
ssrc_caps = gst_caps_new_simple ("application/x-rtp",
|
||||
"ssrc", G_TYPE_UINT, sink->rtp_ssrc, NULL);
|
||||
|
@ -365,14 +476,6 @@ gst_rist_sink_init (GstRistSink * sink)
|
|||
gst_structure_new_empty ("application/x-rtp"));
|
||||
g_object_set (sink->ssrc_filter, "caps", ssrc_caps, NULL);
|
||||
gst_caps_unref (ssrc_caps);
|
||||
gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin,
|
||||
"send_rtp_sink_0");
|
||||
gst_element_link_pads (sink->rtpbin, "send_rtp_src_0", sink->rtp_sink,
|
||||
"sink");
|
||||
gst_element_link_pads (sink->rtcp_src, "src", sink->rtpbin,
|
||||
"recv_rtcp_sink_0");
|
||||
gst_element_link_pads (sink->rtpbin, "send_rtcp_src_0", sink->rtcp_sink,
|
||||
"sink");
|
||||
|
||||
ssrc_filter_sinkpad = gst_element_get_static_pad (sink->ssrc_filter, "sink");
|
||||
sink->sinkpad = gst_ghost_pad_new_from_template ("sink", ssrc_filter_sinkpad,
|
||||
|
@ -384,6 +487,10 @@ gst_rist_sink_init (GstRistSink * sink)
|
|||
gst_pad_add_probe (sink->sinkpad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
|
||||
gst_rist_sink_fix_collision, sink, NULL);
|
||||
|
||||
bond = gst_rist_sink_add_bond (sink);
|
||||
if (!bond)
|
||||
goto missing_plugin;
|
||||
|
||||
return;
|
||||
|
||||
missing_plugin:
|
||||
|
@ -396,32 +503,22 @@ missing_plugin:
|
|||
}
|
||||
}
|
||||
|
||||
static GstStateChangeReturn
|
||||
gst_rist_sink_start (GstRistSink * sink)
|
||||
static gboolean
|
||||
gst_rist_sink_setup_rtcp_socket (GstRistSink * sink, RistSenderBond * bond)
|
||||
{
|
||||
GSocket *socket = NULL;
|
||||
GInetAddress *iaddr = NULL;
|
||||
gchar *remote_addr = NULL;
|
||||
guint remote_port;
|
||||
guint port = bond->port + 1;
|
||||
GError *error = NULL;
|
||||
|
||||
if (sink->construct_failed) {
|
||||
GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN,
|
||||
("Your GStreamer installation is missing plugin '%s'",
|
||||
sink->missing_plugin), (NULL));
|
||||
return GST_STATE_CHANGE_FAILURE;
|
||||
}
|
||||
|
||||
g_object_get (sink->rtcp_sink, "host", &remote_addr, "port", &remote_port,
|
||||
NULL);
|
||||
|
||||
iaddr = g_inet_address_new_from_string (remote_addr);
|
||||
iaddr = g_inet_address_new_from_string (bond->address);
|
||||
if (!iaddr) {
|
||||
GList *results;
|
||||
GResolver *resolver = NULL;
|
||||
|
||||
resolver = g_resolver_get_default ();
|
||||
results = g_resolver_lookup_by_name (resolver, remote_addr, NULL, &error);
|
||||
results = g_resolver_lookup_by_name (resolver, bond->address, NULL, &error);
|
||||
|
||||
if (!results) {
|
||||
g_object_unref (resolver);
|
||||
|
@ -430,16 +527,13 @@ gst_rist_sink_start (GstRistSink * sink)
|
|||
|
||||
iaddr = G_INET_ADDRESS (g_object_ref (results->data));
|
||||
|
||||
g_free (remote_addr);
|
||||
remote_addr = g_inet_address_to_string (iaddr);
|
||||
|
||||
g_resolver_free_addresses (results);
|
||||
g_object_unref (resolver);
|
||||
}
|
||||
remote_addr = g_inet_address_to_string (iaddr);
|
||||
|
||||
if (g_inet_address_get_is_multicast (iaddr)) {
|
||||
g_object_set (sink->rtcp_src, "address", remote_addr, "port", remote_port,
|
||||
NULL);
|
||||
g_object_set (bond->rtcp_src, "address", remote_addr, "port", port, NULL);
|
||||
} else {
|
||||
const gchar *any_addr;
|
||||
|
||||
|
@ -448,21 +542,21 @@ gst_rist_sink_start (GstRistSink * sink)
|
|||
else
|
||||
any_addr = "0.0.0.0";
|
||||
|
||||
g_object_set (sink->rtcp_src, "address", any_addr, "port", 0, NULL);
|
||||
g_object_set (bond->rtcp_src, "address", any_addr, "port", 0, NULL);
|
||||
}
|
||||
g_object_unref (iaddr);
|
||||
|
||||
gst_element_set_locked_state (sink->rtcp_src, FALSE);
|
||||
gst_element_sync_state_with_parent (sink->rtcp_src);
|
||||
gst_element_set_locked_state (bond->rtcp_src, FALSE);
|
||||
gst_element_sync_state_with_parent (bond->rtcp_src);
|
||||
|
||||
/* share the socket created by the sink */
|
||||
g_object_get (sink->rtcp_src, "used-socket", &socket, NULL);
|
||||
g_object_set (sink->rtcp_sink, "socket", socket, "auto-multicast", FALSE,
|
||||
g_object_get (bond->rtcp_src, "used-socket", &socket, NULL);
|
||||
g_object_set (bond->rtcp_sink, "socket", socket, "auto-multicast", FALSE,
|
||||
"close-socket", FALSE, NULL);
|
||||
g_object_unref (socket);
|
||||
|
||||
gst_element_set_locked_state (sink->rtcp_sink, FALSE);
|
||||
gst_element_sync_state_with_parent (sink->rtcp_sink);
|
||||
gst_element_set_locked_state (bond->rtcp_sink, FALSE);
|
||||
gst_element_sync_state_with_parent (bond->rtcp_sink);
|
||||
|
||||
return GST_STATE_CHANGE_SUCCESS;
|
||||
|
||||
|
@ -475,6 +569,35 @@ dns_resolve_failed:
|
|||
return GST_STATE_CHANGE_FAILURE;
|
||||
}
|
||||
|
||||
static GstStateChangeReturn
|
||||
gst_rist_sink_start (GstRistSink * sink)
|
||||
{
|
||||
gint i;
|
||||
|
||||
if (sink->construct_failed) {
|
||||
GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN,
|
||||
("Your GStreamer installation is missing plugin '%s'",
|
||||
sink->missing_plugin), (NULL));
|
||||
return GST_STATE_CHANGE_FAILURE;
|
||||
}
|
||||
|
||||
for (i = 0; i < sink->bonds->len; i++) {
|
||||
RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
|
||||
GObject *session = NULL;
|
||||
|
||||
g_signal_emit_by_name (sink->rtpbin, "get-session", i, &session);
|
||||
g_object_set (session, "rtcp-min-interval", sink->min_rtcp_interval,
|
||||
"rtcp-fraction", sink->max_rtcp_bandwidth, NULL);
|
||||
g_object_unref (session);
|
||||
|
||||
if (!gst_rist_sink_setup_rtcp_socket (sink, bond))
|
||||
return GST_STATE_CHANGE_FAILURE;
|
||||
}
|
||||
|
||||
return GST_STATE_CHANGE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static GstStructure *
|
||||
gst_rist_sink_create_stats (GstRistSink * sink)
|
||||
{
|
||||
|
@ -482,7 +605,9 @@ gst_rist_sink_create_stats (GstRistSink * sink)
|
|||
GstStructure *sstats = NULL, *ret;
|
||||
guint64 pkt_sent = 0, rtx_sent = 0, rtt;
|
||||
guint rb_rtt = 0;
|
||||
RistSenderBond *bond;
|
||||
|
||||
bond = g_ptr_array_index (sink->bonds, 0);
|
||||
ret = gst_structure_new_empty ("rist/x-sender-stats");
|
||||
|
||||
g_signal_emit_by_name (sink->rtpbin, "get-internal-session", 0, &session);
|
||||
|
@ -498,7 +623,7 @@ gst_rist_sink_create_stats (GstRistSink * sink)
|
|||
g_clear_object (&source);
|
||||
}
|
||||
|
||||
g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtcp_ssrc,
|
||||
g_signal_emit_by_name (session, "get-source-by-ssrc", bond->rtcp_ssrc,
|
||||
&source);
|
||||
if (source) {
|
||||
g_object_get (source, "stats", &sstats, NULL);
|
||||
|
@ -508,7 +633,7 @@ gst_rist_sink_create_stats (GstRistSink * sink)
|
|||
}
|
||||
g_object_unref (session);
|
||||
|
||||
g_object_get (sink->rtx_send, "num-rtx-packets", &rtx_sent, NULL);
|
||||
g_object_get (bond->rtx_send, "num-rtx-packets", &rtx_sent, NULL);
|
||||
|
||||
/* rb_rtt is in Q16 in NTP time */
|
||||
rtt = gst_util_uint64_scale (rb_rtt, GST_SECOND, 65536);
|
||||
|
@ -594,42 +719,204 @@ gst_rist_sink_change_state (GstElement * element, GstStateChange transition)
|
|||
return ret;
|
||||
}
|
||||
|
||||
/* called with bonds lock */
|
||||
static void
|
||||
gst_rist_sink_update_bond_address (GstRistSink * sink, RistSenderBond * bond,
|
||||
const gchar * address, guint port, const gchar * multicast_iface)
|
||||
{
|
||||
g_free (bond->address);
|
||||
g_free (bond->multicast_iface);
|
||||
bond->address = g_strdup (address);
|
||||
bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
|
||||
bond->port = port;
|
||||
|
||||
g_object_set (G_OBJECT (bond->rtp_sink), "host", address, "port", port,
|
||||
"multicast-iface", bond->multicast_iface, NULL);
|
||||
g_object_set (G_OBJECT (bond->rtcp_sink), "host", address,
|
||||
"port", port + 1, "multicast-iface", bond->multicast_iface, NULL);
|
||||
|
||||
/* TODO add runtime support
|
||||
* - add blocking the pad probe
|
||||
* - update RTCP socket
|
||||
* - cycle elements through NULL state
|
||||
*/
|
||||
}
|
||||
|
||||
/* called with bonds lock */
|
||||
static gchar *
|
||||
gst_rist_sink_get_bonds (GstRistSink * sink)
|
||||
{
|
||||
GString *bonds = g_string_new ("");
|
||||
gint i;
|
||||
|
||||
for (i = 0; i < sink->bonds->len; i++) {
|
||||
RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
|
||||
if (bonds->len > 0)
|
||||
g_string_append_c (bonds, ':');
|
||||
|
||||
g_string_append_printf (bonds, "%s:%u", bond->address, bond->port);
|
||||
|
||||
if (bond->multicast_iface)
|
||||
g_string_append_printf (bonds, "/%s", bond->multicast_iface);
|
||||
}
|
||||
|
||||
return g_string_free (bonds, FALSE);
|
||||
}
|
||||
|
||||
struct RistAddress
|
||||
{
|
||||
gchar *address;
|
||||
char *multicast_iface;
|
||||
guint port;
|
||||
};
|
||||
|
||||
/* called with bonds lock */
|
||||
static void
|
||||
gst_rist_sink_set_bonds (GstRistSink * sink, const gchar * bonds)
|
||||
{
|
||||
GStrv tokens = NULL;
|
||||
struct RistAddress *addrs;
|
||||
gint i;
|
||||
|
||||
if (bonds == NULL)
|
||||
goto missing_address;
|
||||
|
||||
tokens = g_strsplit (bonds, ",", 0);
|
||||
if (tokens[0] == NULL)
|
||||
goto missing_address;
|
||||
|
||||
addrs = g_new0 (struct RistAddress, g_strv_length (tokens));
|
||||
|
||||
/* parse the address list */
|
||||
for (i = 0; tokens[i]; i++) {
|
||||
gchar *address = tokens[i];
|
||||
char *port_ptr, *iface_ptr, *endptr;
|
||||
guint port;
|
||||
|
||||
port_ptr = g_utf8_strrchr (address, -1, ':');
|
||||
iface_ptr = g_utf8_strrchr (address, -1, '/');
|
||||
|
||||
if (!port_ptr)
|
||||
goto bad_parameter;
|
||||
if (!g_ascii_isdigit (port_ptr[1]))
|
||||
goto bad_parameter;
|
||||
|
||||
if (iface_ptr) {
|
||||
if (iface_ptr < port_ptr)
|
||||
goto bad_parameter;
|
||||
iface_ptr[0] = '\0';
|
||||
}
|
||||
|
||||
port = strtol (port_ptr + 1, &endptr, 0);
|
||||
if (endptr[0] != '\0')
|
||||
goto bad_parameter;
|
||||
|
||||
/* port must be a multiple of 2 between 2 and 65534 */
|
||||
if (port < 2 || (port & 1) || port > G_MAXUINT16)
|
||||
goto invalid_port;
|
||||
|
||||
port_ptr[0] = '\0';
|
||||
addrs[i].port = port;
|
||||
addrs[i].address = g_strstrip (address);
|
||||
if (iface_ptr)
|
||||
addrs[i].multicast_iface = g_strstrip (iface_ptr + 1);
|
||||
}
|
||||
|
||||
/* configure the bonds */
|
||||
for (i = 0; tokens[i]; i++) {
|
||||
RistSenderBond *bond = NULL;
|
||||
|
||||
if (i < sink->bonds->len)
|
||||
bond = g_ptr_array_index (sink->bonds, i);
|
||||
else
|
||||
bond = gst_rist_sink_add_bond (sink);
|
||||
|
||||
gst_rist_sink_update_bond_address (sink, bond, addrs[i].address,
|
||||
addrs[i].port, addrs[i].multicast_iface);
|
||||
}
|
||||
|
||||
g_strfreev (tokens);
|
||||
return;
|
||||
|
||||
missing_address:
|
||||
g_warning ("'bonding-addresses' cannot be empty");
|
||||
g_strfreev (tokens);
|
||||
return;
|
||||
|
||||
bad_parameter:
|
||||
g_warning ("Failed to parse address '%s", tokens[i]);
|
||||
g_strfreev (tokens);
|
||||
g_free (addrs);
|
||||
return;
|
||||
|
||||
invalid_port:
|
||||
g_warning ("RIST port must valid UDP port and a multiple of 2.");
|
||||
g_strfreev (tokens);
|
||||
g_free (addrs);
|
||||
return;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rist_sink_set_multicast_loopback (GstRistSink * sink, gboolean loop)
|
||||
{
|
||||
gint i;
|
||||
|
||||
sink->multicast_loopback = loop;
|
||||
for (i = 0; i < sink->bonds->len; i++) {
|
||||
RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
|
||||
g_object_set (G_OBJECT (bond->rtp_sink), "loop", loop, NULL);
|
||||
g_object_set (G_OBJECT (bond->rtcp_sink), "loop", loop, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* called with bonds lock */
|
||||
static void
|
||||
gst_rist_sink_set_multicast_ttl (GstRistSink * sink, gint ttl)
|
||||
{
|
||||
gint i;
|
||||
|
||||
sink->multicast_ttl = ttl;
|
||||
for (i = 0; i < sink->bonds->len; i++) {
|
||||
RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
|
||||
g_object_set (G_OBJECT (bond->rtp_sink), "ttl-mc", ttl, NULL);
|
||||
g_object_set (G_OBJECT (bond->rtcp_sink), "ttl-mc", ttl, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rist_sink_get_property (GObject * object, guint prop_id,
|
||||
GValue * value, GParamSpec * pspec)
|
||||
{
|
||||
GstRistSink *sink = GST_RIST_SINK (object);
|
||||
GstElement *session = NULL;
|
||||
GstClockTime interval;
|
||||
GstStructure *sdes;
|
||||
RistSenderBond *bond;
|
||||
|
||||
if (sink->construct_failed)
|
||||
return;
|
||||
|
||||
g_mutex_lock (&sink->bonds_lock);
|
||||
|
||||
bond = g_ptr_array_index (sink->bonds, 0);
|
||||
|
||||
switch (prop_id) {
|
||||
case PROP_ADDRESS:
|
||||
g_object_get_property (G_OBJECT (sink->rtp_sink), "host", value);
|
||||
g_value_set_string (value, bond->address);
|
||||
break;
|
||||
|
||||
case PROP_PORT:
|
||||
g_object_get_property (G_OBJECT (sink->rtp_sink), "port", value);
|
||||
g_value_set_uint (value, bond->port);
|
||||
break;
|
||||
|
||||
case PROP_SENDER_BUFFER:
|
||||
g_object_get_property (G_OBJECT (sink->rtx_send), "max-size-time", value);
|
||||
g_object_get_property (G_OBJECT (bond->rtx_send), "max-size-time", value);
|
||||
break;
|
||||
|
||||
case PROP_MIN_RTCP_INTERVAL:
|
||||
g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session);
|
||||
g_object_get (session, "rtcp-min-interval", &interval, NULL);
|
||||
g_value_set_uint (value, (guint) (interval / GST_MSECOND));
|
||||
g_object_unref (session);
|
||||
g_value_set_uint (value, (guint) (sink->min_rtcp_interval / GST_MSECOND));
|
||||
break;
|
||||
|
||||
case PROP_MAX_RTCP_BANDWIDTH:
|
||||
g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session);
|
||||
g_object_get_property (G_OBJECT (session), "rtcp-fraction", value);
|
||||
g_object_unref (session);
|
||||
g_value_set_double (value, sink->max_rtcp_bandwidth);
|
||||
break;
|
||||
|
||||
case PROP_STATS_UPDATE_INTERVAL:
|
||||
|
@ -647,22 +934,27 @@ gst_rist_sink_get_property (GObject * object, guint prop_id,
|
|||
break;
|
||||
|
||||
case PROP_MULTICAST_LOOPBACK:
|
||||
g_object_get_property (G_OBJECT (sink->rtp_sink), "loop", value);
|
||||
g_value_set_boolean (value, sink->multicast_loopback);
|
||||
break;
|
||||
|
||||
case PROP_MULTICAST_IFACE:
|
||||
g_object_get_property (G_OBJECT (sink->rtp_sink),
|
||||
"multicast-iface", value);
|
||||
g_value_set_string (value, bond->multicast_iface);
|
||||
break;
|
||||
|
||||
case PROP_MULTICAST_TTL:
|
||||
g_object_get_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value);
|
||||
g_value_set_int (value, sink->multicast_ttl);
|
||||
break;
|
||||
|
||||
case PROP_BONDING_ADDRESSES:
|
||||
g_value_take_string (value, gst_rist_sink_get_bonds (sink));
|
||||
break;
|
||||
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
g_mutex_unlock (&sink->bonds_lock);
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -670,16 +962,22 @@ gst_rist_sink_set_property (GObject * object, guint prop_id,
|
|||
const GValue * value, GParamSpec * pspec)
|
||||
{
|
||||
GstRistSink *sink = GST_RIST_SINK (object);
|
||||
GstElement *session = NULL;
|
||||
GstStructure *sdes;
|
||||
RistSenderBond *bond;
|
||||
|
||||
if (sink->construct_failed)
|
||||
return;
|
||||
|
||||
g_mutex_lock (&sink->bonds_lock);
|
||||
|
||||
bond = g_ptr_array_index (sink->bonds, 0);
|
||||
|
||||
switch (prop_id) {
|
||||
case PROP_ADDRESS:
|
||||
g_object_set_property (G_OBJECT (sink->rtp_sink), "host", value);
|
||||
g_object_set_property (G_OBJECT (sink->rtcp_sink), "host", value);
|
||||
g_free (bond->address);
|
||||
bond->address = g_value_dup_string (value);
|
||||
g_object_set_property (G_OBJECT (bond->rtp_sink), "host", value);
|
||||
g_object_set_property (G_OBJECT (bond->rtcp_sink), "host", value);
|
||||
break;
|
||||
|
||||
case PROP_PORT:{
|
||||
|
@ -693,27 +991,23 @@ gst_rist_sink_set_property (GObject * object, guint prop_id,
|
|||
return;
|
||||
}
|
||||
|
||||
g_object_set (sink->rtp_sink, "port", port, NULL);
|
||||
g_object_set (sink->rtcp_sink, "port", port + 1, NULL);
|
||||
bond->port = port;
|
||||
g_object_set (bond->rtp_sink, "port", port, NULL);
|
||||
g_object_set (bond->rtcp_sink, "port", port + 1, NULL);
|
||||
break;
|
||||
}
|
||||
|
||||
case PROP_SENDER_BUFFER:
|
||||
g_object_set (sink->rtx_send,
|
||||
g_object_set (bond->rtx_send,
|
||||
"max-size-time", g_value_get_uint (value), NULL);
|
||||
break;
|
||||
|
||||
case PROP_MIN_RTCP_INTERVAL:
|
||||
g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session);
|
||||
g_object_set (session, "rtcp-min-interval",
|
||||
g_value_get_uint (value) * GST_MSECOND, NULL);
|
||||
g_object_unref (session);
|
||||
sink->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND;
|
||||
break;
|
||||
|
||||
case PROP_MAX_RTCP_BANDWIDTH:
|
||||
g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session);
|
||||
g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL);
|
||||
g_object_unref (session);
|
||||
sink->max_rtcp_bandwidth = g_value_get_double (value);
|
||||
break;
|
||||
|
||||
case PROP_STATS_UPDATE_INTERVAL:
|
||||
|
@ -728,34 +1022,55 @@ gst_rist_sink_set_property (GObject * object, guint prop_id,
|
|||
break;
|
||||
|
||||
case PROP_MULTICAST_LOOPBACK:
|
||||
g_object_set_property (G_OBJECT (sink->rtp_sink), "loop", value);
|
||||
g_object_set_property (G_OBJECT (sink->rtcp_sink), "loop", value);
|
||||
gst_rist_sink_set_multicast_loopback (sink, g_value_get_boolean (value));
|
||||
break;
|
||||
|
||||
case PROP_MULTICAST_IFACE:
|
||||
g_object_set_property (G_OBJECT (sink->rtp_sink),
|
||||
g_free (bond->multicast_iface);
|
||||
bond->multicast_iface = g_value_dup_string (value);
|
||||
g_object_set_property (G_OBJECT (bond->rtp_sink),
|
||||
"multicast-iface", value);
|
||||
g_object_set_property (G_OBJECT (sink->rtcp_sink),
|
||||
g_object_set_property (G_OBJECT (bond->rtcp_sink),
|
||||
"multicast-iface", value);
|
||||
break;
|
||||
|
||||
case PROP_MULTICAST_TTL:
|
||||
g_object_set_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value);
|
||||
g_object_set_property (G_OBJECT (sink->rtcp_sink), "ttl-mc", value);
|
||||
gst_rist_sink_set_multicast_ttl (sink, g_value_get_int (value));
|
||||
break;
|
||||
|
||||
case PROP_BONDING_ADDRESSES:
|
||||
gst_rist_sink_set_bonds (sink, g_value_get_string (value));
|
||||
break;
|
||||
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
g_mutex_unlock (&sink->bonds_lock);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rist_sink_finalize (GObject * object)
|
||||
{
|
||||
GstRistSink *sink = GST_RIST_SINK (object);
|
||||
gint i;
|
||||
|
||||
g_mutex_lock (&sink->bonds_lock);
|
||||
|
||||
for (i = 0; i < sink->bonds->len; i++) {
|
||||
RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
|
||||
g_free (bond->address);
|
||||
g_free (bond->multicast_iface);
|
||||
g_slice_free (RistSenderBond, bond);
|
||||
}
|
||||
g_ptr_array_free (sink->bonds, TRUE);
|
||||
|
||||
g_clear_object (&sink->rtxbin);
|
||||
|
||||
g_mutex_unlock (&sink->bonds_lock);
|
||||
g_mutex_clear (&sink->bonds_lock);
|
||||
|
||||
G_OBJECT_CLASS (gst_rist_sink_parent_class)->finalize (object);
|
||||
}
|
||||
|
||||
|
@ -836,4 +1151,9 @@ gst_rist_sink_class_init (GstRistSinkClass * klass)
|
|||
g_param_spec_int ("multicast-ttl", "Multicast TTL",
|
||||
"The multicast time-to-live parameter.", 0, 255, 1,
|
||||
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
|
||||
|
||||
g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES,
|
||||
g_param_spec_string ("bonding-addresses", "Bonding Addresses",
|
||||
"Comma (,) seperated list of <address>:<port> to send to. ", NULL,
|
||||
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue