From ffedd7ce2da8376afa80497e839af58d163a85a8 Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Fri, 15 Feb 2019 17:13:02 -0500 Subject: [PATCH] ristsink: Implement bonding support --- gst/rist/gstristsink.c | 558 ++++++++++++++++++++++++++++++++--------- 1 file changed, 439 insertions(+), 119 deletions(-) diff --git a/gst/rist/gstristsink.c b/gst/rist/gstristsink.c index b078ba430b..b2dd86ac52 100644 --- a/gst/rist/gstristsink.c +++ b/gst/rist/gstristsink.c @@ -33,7 +33,21 @@ * * ## Example launch line * |[ - * gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! ristsink address=10.0.0.1 port=5004 + * gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! \ + * ristsink address=10.0.0.1 port=5004 + * ]| + * + * Additionally, this element supports bonding, which concist of using multiple + * links in order to transmit the streams. The address of each bonds is + * configured through "bonding-addresses" property. When set, this will replace + * the value that might have been set on "address" and "port". Each bonds will be + * mapped to it's down RTP session. RTX request are only replied on the link + * the NACK was received from. + * + * ## Exmaple launch line for bonding + * |[ + * gst-launch-1.0 udpsrc ! tsparse set-timestamp=1 ! rtpmp2pay ! \ + * ristsink bonding-addresses="10.0.0.1:5004,11.0.0.1:5006" * ]| */ @@ -44,6 +58,9 @@ #include #include +/* for strtol() */ +#include + #include "gstrist.h" GST_DEBUG_CATEGORY_STATIC (gst_rist_sink_debug); @@ -61,7 +78,8 @@ enum PROP_CNAME, PROP_MULTICAST_LOOPBACK, PROP_MULTICAST_IFACE, - PROP_MULTICAST_TTL + PROP_MULTICAST_TTL, + PROP_BONDING_ADDRESSES }; static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink", @@ -69,26 +87,47 @@ static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-rtp")); +typedef struct +{ + guint session; + gchar *address; + gchar *multicast_iface; + guint port; + GstElement *rtcp_src; + GstElement *rtp_sink; + GstElement *rtcp_sink; + GstElement *rtx_send; + GstElement *rtx_queue; + guint32 rtcp_ssrc; +} RistSenderBond; + struct _GstRistSink { GstBin parent; - /* Elements contained in the pipeline */ + /* Common elements in the pipeline */ GstElement *rtpbin; - GstElement *rtp_sink; - GstElement *rtcp_src; - GstElement *rtcp_sink; GstElement *ssrc_filter; GstPad *sinkpad; - - /* RTX Elements */ GstElement *rtxbin; - GstElement *rtx_send; + GstElement *rtx_tee; + + /* Common properties, protected by bonds_lock */ + gint multicast_ttl; + gboolean multicast_loopback; + GstClockTime min_rtcp_interval; + gdouble max_rtcp_bandwidth; + + /* Bonds */ + GPtrArray *bonds; + /* this is needed as setting sibling properties will try to take the object + * lock. Thus, any properties that affects the bonds will be protected with + * that lock instead of the object lock. */ + GMutex bonds_lock; /* For stats */ guint stats_interval; guint32 rtp_ssrc; - guint32 rtcp_ssrc; GstClockID stats_cid; /* This is set whenever there is a pipeline construction failure, and used @@ -100,6 +139,94 @@ struct _GstRistSink G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN, GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink")); +static RistSenderBond * +gst_rist_sink_add_bond (GstRistSink * sink) +{ + RistSenderBond *bond = g_slice_new0 (RistSenderBond); + GstPad *pad, *gpad; + gchar name[32]; + + bond->session = sink->bonds->len; + bond->address = g_strdup ("localhost"); + + g_snprintf (name, 32, "rist_rtp_udpsink%u", bond->session); + bond->rtp_sink = gst_element_factory_make ("udpsink", name); + if (!bond->rtp_sink) { + g_slice_free (RistSenderBond, bond); + sink->missing_plugin = "udp"; + return NULL; + } + + /* these are all from UDP plugin, so they cannot fail */ + g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session); + bond->rtcp_src = gst_element_factory_make ("udpsrc", name); + g_snprintf (name, 32, "rist_rtcp_udpsink%u", bond->session); + bond->rtcp_sink = gst_element_factory_make ("udpsink", name); + g_object_set (bond->rtcp_sink, "async", FALSE, NULL); + + gst_bin_add_many (GST_BIN (sink), bond->rtp_sink, bond->rtcp_src, + bond->rtcp_sink, NULL); + gst_element_set_locked_state (bond->rtcp_src, TRUE); + gst_element_set_locked_state (bond->rtcp_sink, TRUE); + + g_snprintf (name, 32, "rist_rtx_queue%u", bond->session); + bond->rtx_queue = gst_element_factory_make ("queue", name); + gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_queue); + + g_snprintf (name, 32, "rist_rtx_send%u", bond->session); + bond->rtx_send = gst_element_factory_make ("ristrtxsend", name); + if (!bond->rtx_send) { + sink->missing_plugin = "rtpmanager"; + g_slice_free (RistSenderBond, bond); + return NULL; + } + gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_send); + + gst_element_link (bond->rtx_queue, bond->rtx_send); + + pad = gst_element_get_static_pad (bond->rtx_send, "src"); + g_snprintf (name, 32, "src_%u", bond->session); + gpad = gst_ghost_pad_new (name, pad); + gst_object_unref (pad); + gst_element_add_pad (sink->rtxbin, gpad); + + g_snprintf (name, 32, "src_%u", bond->session); + pad = gst_element_get_request_pad (sink->rtx_tee, name); + gst_element_link_pads (sink->rtx_tee, name, bond->rtx_queue, "sink"); + gst_object_unref (pad); + + g_object_set (bond->rtx_send, "max-size-packets", 0, NULL); + + g_snprintf (name, 32, "send_rtp_sink_%u", bond->session); + if (bond->session == 0) { + gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin, name); + } else { + GstPad *pad; + + /* to make a sender, we need to create an unused pad on rtpbin, which will + * require an unused pad on the rtxbin */ + g_snprintf (name, 32, "sink_%u", bond->session); + pad = gst_ghost_pad_new_no_target (name, GST_PAD_SINK); + gst_element_add_pad (sink->rtxbin, pad); + + g_snprintf (name, 32, "send_rtp_sink_%u", bond->session); + pad = gst_element_get_request_pad (sink->rtpbin, name); + gst_object_unref (pad); + } + + g_snprintf (name, 32, "send_rtp_src_%u", bond->session); + gst_element_link_pads (sink->rtpbin, name, bond->rtp_sink, "sink"); + + g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session); + gst_element_link_pads (bond->rtcp_src, "src", sink->rtpbin, name); + + g_snprintf (name, 32, "send_rtcp_src_%u", bond->session); + gst_element_link_pads (sink->rtpbin, name, bond->rtcp_sink, "sink"); + + g_ptr_array_add (sink->bonds, bond); + return bond; +} + static GstCaps * gst_rist_sink_request_pt_map (GstRistSrc * sink, GstElement * session, guint pt) { @@ -125,9 +252,6 @@ static GstElement * gst_rist_sink_request_aux_sender (GstRistSink * sink, guint session_id, GstElement * rtpbin) { - if (session_id != 0) - return NULL; - return gst_object_ref (sink->rtxbin); } @@ -198,11 +322,14 @@ static void gst_rist_sink_on_new_receiver_ssrc (GstRistSink * sink, guint session_id, guint ssrc, GstElement * rtpbin) { + RistSenderBond *bond; + if (session_id != 0) return; GST_INFO_OBJECT (sink, "Got RTCP remote SSRC %u", ssrc); - sink->rtcp_ssrc = ssrc; + bond = g_ptr_array_index (sink->bonds, session_id); + bond->rtcp_ssrc = ssrc; } static GstPadProbeReturn @@ -281,10 +408,13 @@ gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) static void gst_rist_sink_init (GstRistSink * sink) { - GstPad *ssrc_filter_sinkpad; + GstPad *ssrc_filter_sinkpad, *rtx_tee_sinkpad, *rtxbin_gpad; GstCaps *ssrc_caps; - GstPad *pad, *gpad; GstStructure *sdes = NULL; + RistSenderBond *bond; + + g_mutex_init (&sink->bonds_lock); + sink->bonds = g_ptr_array_new (); /* Construct the RIST RTP sender pipeline. * @@ -321,43 +451,24 @@ gst_rist_sink_init (GstRistSink * sink) sink->rtxbin = gst_bin_new ("rist_send_rtxbin"); g_object_ref_sink (sink->rtxbin); - sink->rtx_send = gst_element_factory_make ("ristrtxsend", "rist_rtx_send"); - gst_bin_add (GST_BIN (sink->rtxbin), sink->rtx_send); - g_object_set (sink->rtx_send, "max-size-packets", 0, NULL); - pad = gst_element_get_static_pad (sink->rtx_send, "sink"); - gpad = gst_ghost_pad_new ("sink_0", pad); - gst_object_unref (pad); - gst_element_add_pad (sink->rtxbin, gpad); - - pad = gst_element_get_static_pad (sink->rtx_send, "src"); - gpad = gst_ghost_pad_new ("src_0", pad); - gst_object_unref (pad); - gst_element_add_pad (sink->rtxbin, gpad); - - sink->rtp_sink = gst_element_factory_make ("udpsink", "rist_rtp_udpsink"); - sink->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc"); - sink->rtcp_sink = gst_element_factory_make ("udpsink", "rist_rtcp_udpsink"); - if (!sink->rtp_sink || !sink->rtcp_src || !sink->rtcp_sink) { - g_clear_object (&sink->rtp_sink); - g_clear_object (&sink->rtcp_src); - g_clear_object (&sink->rtcp_sink); - sink->missing_plugin = "udp"; - goto missing_plugin; - } - gst_bin_add_many (GST_BIN (sink), sink->rtp_sink, sink->rtcp_src, - sink->rtcp_sink, NULL); - gst_element_set_locked_state (sink->rtcp_src, TRUE); - gst_element_set_locked_state (sink->rtcp_sink, TRUE); - - sink->ssrc_filter = gst_element_factory_make ("capsfilter", - "rist_ssrc_filter"); - if (!sink->ssrc_filter) { + sink->rtx_tee = gst_element_factory_make ("tee", "rist_rtx_tee"); + if (!sink->rtx_tee) { sink->missing_plugin = "coreelements"; goto missing_plugin; } + gst_bin_add (GST_BIN (sink->rtxbin), sink->rtx_tee); + + rtx_tee_sinkpad = gst_element_get_static_pad (sink->rtx_tee, "sink"); + rtxbin_gpad = gst_ghost_pad_new ("sink_0", rtx_tee_sinkpad); + gst_object_unref (rtx_tee_sinkpad); + gst_element_add_pad (sink->rtxbin, rtxbin_gpad); + + sink->ssrc_filter = gst_element_factory_make ("capsfilter", + "rist_ssrc_filter"); gst_bin_add (GST_BIN (sink), sink->ssrc_filter); + /* RIST RTP SSRC should have LSB set to 0 */ sink->rtp_ssrc = g_random_int () & ~1; ssrc_caps = gst_caps_new_simple ("application/x-rtp", "ssrc", G_TYPE_UINT, sink->rtp_ssrc, NULL); @@ -365,14 +476,6 @@ gst_rist_sink_init (GstRistSink * sink) gst_structure_new_empty ("application/x-rtp")); g_object_set (sink->ssrc_filter, "caps", ssrc_caps, NULL); gst_caps_unref (ssrc_caps); - gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin, - "send_rtp_sink_0"); - gst_element_link_pads (sink->rtpbin, "send_rtp_src_0", sink->rtp_sink, - "sink"); - gst_element_link_pads (sink->rtcp_src, "src", sink->rtpbin, - "recv_rtcp_sink_0"); - gst_element_link_pads (sink->rtpbin, "send_rtcp_src_0", sink->rtcp_sink, - "sink"); ssrc_filter_sinkpad = gst_element_get_static_pad (sink->ssrc_filter, "sink"); sink->sinkpad = gst_ghost_pad_new_from_template ("sink", ssrc_filter_sinkpad, @@ -384,6 +487,10 @@ gst_rist_sink_init (GstRistSink * sink) gst_pad_add_probe (sink->sinkpad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, gst_rist_sink_fix_collision, sink, NULL); + bond = gst_rist_sink_add_bond (sink); + if (!bond) + goto missing_plugin; + return; missing_plugin: @@ -396,32 +503,22 @@ missing_plugin: } } -static GstStateChangeReturn -gst_rist_sink_start (GstRistSink * sink) +static gboolean +gst_rist_sink_setup_rtcp_socket (GstRistSink * sink, RistSenderBond * bond) { GSocket *socket = NULL; GInetAddress *iaddr = NULL; gchar *remote_addr = NULL; - guint remote_port; + guint port = bond->port + 1; GError *error = NULL; - if (sink->construct_failed) { - GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN, - ("Your GStreamer installation is missing plugin '%s'", - sink->missing_plugin), (NULL)); - return GST_STATE_CHANGE_FAILURE; - } - - g_object_get (sink->rtcp_sink, "host", &remote_addr, "port", &remote_port, - NULL); - - iaddr = g_inet_address_new_from_string (remote_addr); + iaddr = g_inet_address_new_from_string (bond->address); if (!iaddr) { GList *results; GResolver *resolver = NULL; resolver = g_resolver_get_default (); - results = g_resolver_lookup_by_name (resolver, remote_addr, NULL, &error); + results = g_resolver_lookup_by_name (resolver, bond->address, NULL, &error); if (!results) { g_object_unref (resolver); @@ -430,16 +527,13 @@ gst_rist_sink_start (GstRistSink * sink) iaddr = G_INET_ADDRESS (g_object_ref (results->data)); - g_free (remote_addr); - remote_addr = g_inet_address_to_string (iaddr); - g_resolver_free_addresses (results); g_object_unref (resolver); } + remote_addr = g_inet_address_to_string (iaddr); if (g_inet_address_get_is_multicast (iaddr)) { - g_object_set (sink->rtcp_src, "address", remote_addr, "port", remote_port, - NULL); + g_object_set (bond->rtcp_src, "address", remote_addr, "port", port, NULL); } else { const gchar *any_addr; @@ -448,21 +542,21 @@ gst_rist_sink_start (GstRistSink * sink) else any_addr = "0.0.0.0"; - g_object_set (sink->rtcp_src, "address", any_addr, "port", 0, NULL); + g_object_set (bond->rtcp_src, "address", any_addr, "port", 0, NULL); } g_object_unref (iaddr); - gst_element_set_locked_state (sink->rtcp_src, FALSE); - gst_element_sync_state_with_parent (sink->rtcp_src); + gst_element_set_locked_state (bond->rtcp_src, FALSE); + gst_element_sync_state_with_parent (bond->rtcp_src); /* share the socket created by the sink */ - g_object_get (sink->rtcp_src, "used-socket", &socket, NULL); - g_object_set (sink->rtcp_sink, "socket", socket, "auto-multicast", FALSE, + g_object_get (bond->rtcp_src, "used-socket", &socket, NULL); + g_object_set (bond->rtcp_sink, "socket", socket, "auto-multicast", FALSE, "close-socket", FALSE, NULL); g_object_unref (socket); - gst_element_set_locked_state (sink->rtcp_sink, FALSE); - gst_element_sync_state_with_parent (sink->rtcp_sink); + gst_element_set_locked_state (bond->rtcp_sink, FALSE); + gst_element_sync_state_with_parent (bond->rtcp_sink); return GST_STATE_CHANGE_SUCCESS; @@ -475,6 +569,35 @@ dns_resolve_failed: return GST_STATE_CHANGE_FAILURE; } +static GstStateChangeReturn +gst_rist_sink_start (GstRistSink * sink) +{ + gint i; + + if (sink->construct_failed) { + GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN, + ("Your GStreamer installation is missing plugin '%s'", + sink->missing_plugin), (NULL)); + return GST_STATE_CHANGE_FAILURE; + } + + for (i = 0; i < sink->bonds->len; i++) { + RistSenderBond *bond = g_ptr_array_index (sink->bonds, i); + GObject *session = NULL; + + g_signal_emit_by_name (sink->rtpbin, "get-session", i, &session); + g_object_set (session, "rtcp-min-interval", sink->min_rtcp_interval, + "rtcp-fraction", sink->max_rtcp_bandwidth, NULL); + g_object_unref (session); + + if (!gst_rist_sink_setup_rtcp_socket (sink, bond)) + return GST_STATE_CHANGE_FAILURE; + } + + return GST_STATE_CHANGE_SUCCESS; +} + + static GstStructure * gst_rist_sink_create_stats (GstRistSink * sink) { @@ -482,7 +605,9 @@ gst_rist_sink_create_stats (GstRistSink * sink) GstStructure *sstats = NULL, *ret; guint64 pkt_sent = 0, rtx_sent = 0, rtt; guint rb_rtt = 0; + RistSenderBond *bond; + bond = g_ptr_array_index (sink->bonds, 0); ret = gst_structure_new_empty ("rist/x-sender-stats"); g_signal_emit_by_name (sink->rtpbin, "get-internal-session", 0, &session); @@ -498,7 +623,7 @@ gst_rist_sink_create_stats (GstRistSink * sink) g_clear_object (&source); } - g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtcp_ssrc, + g_signal_emit_by_name (session, "get-source-by-ssrc", bond->rtcp_ssrc, &source); if (source) { g_object_get (source, "stats", &sstats, NULL); @@ -508,7 +633,7 @@ gst_rist_sink_create_stats (GstRistSink * sink) } g_object_unref (session); - g_object_get (sink->rtx_send, "num-rtx-packets", &rtx_sent, NULL); + g_object_get (bond->rtx_send, "num-rtx-packets", &rtx_sent, NULL); /* rb_rtt is in Q16 in NTP time */ rtt = gst_util_uint64_scale (rb_rtt, GST_SECOND, 65536); @@ -594,42 +719,204 @@ gst_rist_sink_change_state (GstElement * element, GstStateChange transition) return ret; } +/* called with bonds lock */ +static void +gst_rist_sink_update_bond_address (GstRistSink * sink, RistSenderBond * bond, + const gchar * address, guint port, const gchar * multicast_iface) +{ + g_free (bond->address); + g_free (bond->multicast_iface); + bond->address = g_strdup (address); + bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL; + bond->port = port; + + g_object_set (G_OBJECT (bond->rtp_sink), "host", address, "port", port, + "multicast-iface", bond->multicast_iface, NULL); + g_object_set (G_OBJECT (bond->rtcp_sink), "host", address, + "port", port + 1, "multicast-iface", bond->multicast_iface, NULL); + + /* TODO add runtime support + * - add blocking the pad probe + * - update RTCP socket + * - cycle elements through NULL state + */ +} + +/* called with bonds lock */ +static gchar * +gst_rist_sink_get_bonds (GstRistSink * sink) +{ + GString *bonds = g_string_new (""); + gint i; + + for (i = 0; i < sink->bonds->len; i++) { + RistSenderBond *bond = g_ptr_array_index (sink->bonds, i); + if (bonds->len > 0) + g_string_append_c (bonds, ':'); + + g_string_append_printf (bonds, "%s:%u", bond->address, bond->port); + + if (bond->multicast_iface) + g_string_append_printf (bonds, "/%s", bond->multicast_iface); + } + + return g_string_free (bonds, FALSE); +} + +struct RistAddress +{ + gchar *address; + char *multicast_iface; + guint port; +}; + +/* called with bonds lock */ +static void +gst_rist_sink_set_bonds (GstRistSink * sink, const gchar * bonds) +{ + GStrv tokens = NULL; + struct RistAddress *addrs; + gint i; + + if (bonds == NULL) + goto missing_address; + + tokens = g_strsplit (bonds, ",", 0); + if (tokens[0] == NULL) + goto missing_address; + + addrs = g_new0 (struct RistAddress, g_strv_length (tokens)); + + /* parse the address list */ + for (i = 0; tokens[i]; i++) { + gchar *address = tokens[i]; + char *port_ptr, *iface_ptr, *endptr; + guint port; + + port_ptr = g_utf8_strrchr (address, -1, ':'); + iface_ptr = g_utf8_strrchr (address, -1, '/'); + + if (!port_ptr) + goto bad_parameter; + if (!g_ascii_isdigit (port_ptr[1])) + goto bad_parameter; + + if (iface_ptr) { + if (iface_ptr < port_ptr) + goto bad_parameter; + iface_ptr[0] = '\0'; + } + + port = strtol (port_ptr + 1, &endptr, 0); + if (endptr[0] != '\0') + goto bad_parameter; + + /* port must be a multiple of 2 between 2 and 65534 */ + if (port < 2 || (port & 1) || port > G_MAXUINT16) + goto invalid_port; + + port_ptr[0] = '\0'; + addrs[i].port = port; + addrs[i].address = g_strstrip (address); + if (iface_ptr) + addrs[i].multicast_iface = g_strstrip (iface_ptr + 1); + } + + /* configure the bonds */ + for (i = 0; tokens[i]; i++) { + RistSenderBond *bond = NULL; + + if (i < sink->bonds->len) + bond = g_ptr_array_index (sink->bonds, i); + else + bond = gst_rist_sink_add_bond (sink); + + gst_rist_sink_update_bond_address (sink, bond, addrs[i].address, + addrs[i].port, addrs[i].multicast_iface); + } + + g_strfreev (tokens); + return; + +missing_address: + g_warning ("'bonding-addresses' cannot be empty"); + g_strfreev (tokens); + return; + +bad_parameter: + g_warning ("Failed to parse address '%s", tokens[i]); + g_strfreev (tokens); + g_free (addrs); + return; + +invalid_port: + g_warning ("RIST port must valid UDP port and a multiple of 2."); + g_strfreev (tokens); + g_free (addrs); + return; +} + +static void +gst_rist_sink_set_multicast_loopback (GstRistSink * sink, gboolean loop) +{ + gint i; + + sink->multicast_loopback = loop; + for (i = 0; i < sink->bonds->len; i++) { + RistSenderBond *bond = g_ptr_array_index (sink->bonds, i); + g_object_set (G_OBJECT (bond->rtp_sink), "loop", loop, NULL); + g_object_set (G_OBJECT (bond->rtcp_sink), "loop", loop, NULL); + } +} + +/* called with bonds lock */ +static void +gst_rist_sink_set_multicast_ttl (GstRistSink * sink, gint ttl) +{ + gint i; + + sink->multicast_ttl = ttl; + for (i = 0; i < sink->bonds->len; i++) { + RistSenderBond *bond = g_ptr_array_index (sink->bonds, i); + g_object_set (G_OBJECT (bond->rtp_sink), "ttl-mc", ttl, NULL); + g_object_set (G_OBJECT (bond->rtcp_sink), "ttl-mc", ttl, NULL); + } +} + static void gst_rist_sink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstRistSink *sink = GST_RIST_SINK (object); - GstElement *session = NULL; - GstClockTime interval; GstStructure *sdes; + RistSenderBond *bond; if (sink->construct_failed) return; + g_mutex_lock (&sink->bonds_lock); + + bond = g_ptr_array_index (sink->bonds, 0); + switch (prop_id) { case PROP_ADDRESS: - g_object_get_property (G_OBJECT (sink->rtp_sink), "host", value); + g_value_set_string (value, bond->address); break; case PROP_PORT: - g_object_get_property (G_OBJECT (sink->rtp_sink), "port", value); + g_value_set_uint (value, bond->port); break; case PROP_SENDER_BUFFER: - g_object_get_property (G_OBJECT (sink->rtx_send), "max-size-time", value); + g_object_get_property (G_OBJECT (bond->rtx_send), "max-size-time", value); break; case PROP_MIN_RTCP_INTERVAL: - g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); - g_object_get (session, "rtcp-min-interval", &interval, NULL); - g_value_set_uint (value, (guint) (interval / GST_MSECOND)); - g_object_unref (session); + g_value_set_uint (value, (guint) (sink->min_rtcp_interval / GST_MSECOND)); break; case PROP_MAX_RTCP_BANDWIDTH: - g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); - g_object_get_property (G_OBJECT (session), "rtcp-fraction", value); - g_object_unref (session); + g_value_set_double (value, sink->max_rtcp_bandwidth); break; case PROP_STATS_UPDATE_INTERVAL: @@ -647,22 +934,27 @@ gst_rist_sink_get_property (GObject * object, guint prop_id, break; case PROP_MULTICAST_LOOPBACK: - g_object_get_property (G_OBJECT (sink->rtp_sink), "loop", value); + g_value_set_boolean (value, sink->multicast_loopback); break; case PROP_MULTICAST_IFACE: - g_object_get_property (G_OBJECT (sink->rtp_sink), - "multicast-iface", value); + g_value_set_string (value, bond->multicast_iface); break; case PROP_MULTICAST_TTL: - g_object_get_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value); + g_value_set_int (value, sink->multicast_ttl); + break; + + case PROP_BONDING_ADDRESSES: + g_value_take_string (value, gst_rist_sink_get_bonds (sink)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + g_mutex_unlock (&sink->bonds_lock); } static void @@ -670,16 +962,22 @@ gst_rist_sink_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstRistSink *sink = GST_RIST_SINK (object); - GstElement *session = NULL; GstStructure *sdes; + RistSenderBond *bond; if (sink->construct_failed) return; + g_mutex_lock (&sink->bonds_lock); + + bond = g_ptr_array_index (sink->bonds, 0); + switch (prop_id) { case PROP_ADDRESS: - g_object_set_property (G_OBJECT (sink->rtp_sink), "host", value); - g_object_set_property (G_OBJECT (sink->rtcp_sink), "host", value); + g_free (bond->address); + bond->address = g_value_dup_string (value); + g_object_set_property (G_OBJECT (bond->rtp_sink), "host", value); + g_object_set_property (G_OBJECT (bond->rtcp_sink), "host", value); break; case PROP_PORT:{ @@ -693,27 +991,23 @@ gst_rist_sink_set_property (GObject * object, guint prop_id, return; } - g_object_set (sink->rtp_sink, "port", port, NULL); - g_object_set (sink->rtcp_sink, "port", port + 1, NULL); + bond->port = port; + g_object_set (bond->rtp_sink, "port", port, NULL); + g_object_set (bond->rtcp_sink, "port", port + 1, NULL); break; } case PROP_SENDER_BUFFER: - g_object_set (sink->rtx_send, + g_object_set (bond->rtx_send, "max-size-time", g_value_get_uint (value), NULL); break; case PROP_MIN_RTCP_INTERVAL: - g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); - g_object_set (session, "rtcp-min-interval", - g_value_get_uint (value) * GST_MSECOND, NULL); - g_object_unref (session); + sink->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND; break; case PROP_MAX_RTCP_BANDWIDTH: - g_signal_emit_by_name (sink->rtpbin, "get-session", 0, &session); - g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL); - g_object_unref (session); + sink->max_rtcp_bandwidth = g_value_get_double (value); break; case PROP_STATS_UPDATE_INTERVAL: @@ -728,34 +1022,55 @@ gst_rist_sink_set_property (GObject * object, guint prop_id, break; case PROP_MULTICAST_LOOPBACK: - g_object_set_property (G_OBJECT (sink->rtp_sink), "loop", value); - g_object_set_property (G_OBJECT (sink->rtcp_sink), "loop", value); + gst_rist_sink_set_multicast_loopback (sink, g_value_get_boolean (value)); break; case PROP_MULTICAST_IFACE: - g_object_set_property (G_OBJECT (sink->rtp_sink), + g_free (bond->multicast_iface); + bond->multicast_iface = g_value_dup_string (value); + g_object_set_property (G_OBJECT (bond->rtp_sink), "multicast-iface", value); - g_object_set_property (G_OBJECT (sink->rtcp_sink), + g_object_set_property (G_OBJECT (bond->rtcp_sink), "multicast-iface", value); break; case PROP_MULTICAST_TTL: - g_object_set_property (G_OBJECT (sink->rtp_sink), "ttl-mc", value); - g_object_set_property (G_OBJECT (sink->rtcp_sink), "ttl-mc", value); + gst_rist_sink_set_multicast_ttl (sink, g_value_get_int (value)); + break; + + case PROP_BONDING_ADDRESSES: + gst_rist_sink_set_bonds (sink, g_value_get_string (value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + g_mutex_unlock (&sink->bonds_lock); } static void gst_rist_sink_finalize (GObject * object) { GstRistSink *sink = GST_RIST_SINK (object); + gint i; + + g_mutex_lock (&sink->bonds_lock); + + for (i = 0; i < sink->bonds->len; i++) { + RistSenderBond *bond = g_ptr_array_index (sink->bonds, i); + g_free (bond->address); + g_free (bond->multicast_iface); + g_slice_free (RistSenderBond, bond); + } + g_ptr_array_free (sink->bonds, TRUE); + g_clear_object (&sink->rtxbin); + g_mutex_unlock (&sink->bonds_lock); + g_mutex_clear (&sink->bonds_lock); + G_OBJECT_CLASS (gst_rist_sink_parent_class)->finalize (object); } @@ -836,4 +1151,9 @@ gst_rist_sink_class_init (GstRistSinkClass * klass) g_param_spec_int ("multicast-ttl", "Multicast TTL", "The multicast time-to-live parameter.", 0, 255, 1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES, + g_param_spec_string ("bonding-addresses", "Bonding Addresses", + "Comma (,) seperated list of
: to send to. ", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); }