mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-02-02 20:42:30 +00:00
ristsrc: Add bonding support
This add support for receiving and aggregating the same stream over multiple addresses.
This commit is contained in:
parent
ffedd7ce2d
commit
e914abd402
1 changed files with 466 additions and 150 deletions
|
@ -34,6 +34,17 @@
|
|||
* gst-launch-1.0 ristsrc address=0.0.0.0 port=5004 ! rtpmp2depay ! udpsink
|
||||
* gst-play-1.0 "rist://0.0.0.0:5004?receiver-buffer=700"
|
||||
* ]|
|
||||
*
|
||||
* Additionally, this element supports bonding, which means it can receive the
|
||||
* same stream from multiple addresses. Each address will be mapped to it's
|
||||
* own RTP session. In order to enable bonding support, one need to configure
|
||||
* the list of addresses through "bonding-addresses" properties.
|
||||
*
|
||||
* ## Example launch line for bonding
|
||||
* |[
|
||||
* gst-launch-1.0 ristsrc bonding-addresses="10.0.0.1:5004,11.0.0.1:5006" ! rtpmp2depay ! udpsink
|
||||
* gst-play-1.0 "rist://0.0.0.0:5004?bonding-addresses=10.0.0.1:5004,11.0.0.1:5006"
|
||||
* ]|
|
||||
*/
|
||||
|
||||
|
||||
|
@ -45,6 +56,9 @@
|
|||
#include <gst/net/net.h>
|
||||
#include <gst/rtp/rtp.h>
|
||||
|
||||
/* for strtol() */
|
||||
#include <stdlib.h>
|
||||
|
||||
/* for setsockopt() */
|
||||
#ifndef G_OS_WIN32
|
||||
#include <sys/types.h>
|
||||
|
@ -70,7 +84,8 @@ enum
|
|||
PROP_CNAME,
|
||||
PROP_MULTICAST_LOOPBACK,
|
||||
PROP_MULTICAST_IFACE,
|
||||
PROP_MULTICAST_TTL
|
||||
PROP_MULTICAST_TTL,
|
||||
PROP_BONDING_ADDRESSES
|
||||
};
|
||||
|
||||
static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src",
|
||||
|
@ -78,30 +93,50 @@ static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src",
|
|||
GST_PAD_ALWAYS,
|
||||
GST_STATIC_CAPS ("application/x-rtp"));
|
||||
|
||||
|
||||
typedef struct
|
||||
{
|
||||
guint session;
|
||||
gchar *address;
|
||||
gchar *multicast_iface;
|
||||
guint port;
|
||||
|
||||
GstElement *rtcp_src;
|
||||
GstElement *rtp_src;
|
||||
GstElement *rtcp_sink;
|
||||
GstElement *rtx_receive;
|
||||
gulong rtcp_recv_probe;
|
||||
gulong rtcp_send_probe;
|
||||
GSocketAddress *rtcp_send_addr;
|
||||
|
||||
} RistReceiverBond;
|
||||
|
||||
struct _GstRistSrc
|
||||
{
|
||||
GstBin parent;
|
||||
|
||||
GstUri *uri;
|
||||
|
||||
/* Elements contained in the pipeline, the rtp/rtcp_src are 'udpsrc' */
|
||||
/* Common elements in the pipeline */
|
||||
GstElement *rtpbin;
|
||||
GstElement *rtp_src;
|
||||
GstElement *rtcp_src;
|
||||
GstElement *rtcp_sink;
|
||||
gulong rtcp_recv_probe;
|
||||
gulong rtcp_send_probe;
|
||||
GSocketAddress *rtcp_send_addr;
|
||||
GstPad *srcpad;
|
||||
gint multicast_ttl;
|
||||
|
||||
/* RTX Elements */
|
||||
GstElement *rtxbin;
|
||||
GstElement *rtx_receive;
|
||||
GstElement *rtx_funnel;
|
||||
|
||||
/* For property handling */
|
||||
/* Common properties, protected by bonds_lock */
|
||||
guint reorder_section;
|
||||
guint max_rtx_retries;
|
||||
GstClockTime min_rtcp_interval;
|
||||
gdouble max_rtcp_bandwidth;
|
||||
gint multicast_loopback;
|
||||
gint multicast_ttl;
|
||||
|
||||
/* Bonds */
|
||||
GPtrArray *bonds;
|
||||
/* this is needed as setting sibling properties will try to take the object
|
||||
* lock. Thus, any properties that affects the bonds will be protected with
|
||||
* that lock instead of the object lock. */
|
||||
GMutex bonds_lock;
|
||||
|
||||
/* For stats */
|
||||
guint stats_interval;
|
||||
|
@ -121,6 +156,60 @@ G_DEFINE_TYPE_WITH_CODE (GstRistSrc, gst_rist_src, GST_TYPE_BIN,
|
|||
G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rist_src_uri_init);
|
||||
GST_DEBUG_CATEGORY_INIT (gst_rist_src_debug, "ristsrc", 0, "RIST Source"));
|
||||
|
||||
/* called with bonds lock */
|
||||
static RistReceiverBond *
|
||||
gst_rist_src_add_bond (GstRistSrc * src)
|
||||
{
|
||||
RistReceiverBond *bond = g_slice_new0 (RistReceiverBond);
|
||||
GstPad *pad, *gpad;
|
||||
gchar name[32];
|
||||
|
||||
bond->session = src->bonds->len;
|
||||
bond->address = g_strdup ("0.0.0.0");
|
||||
|
||||
g_snprintf (name, 32, "rist_rtx_receive%u", bond->session);
|
||||
bond->rtx_receive = gst_element_factory_make ("ristrtxreceive", name);
|
||||
gst_bin_add (GST_BIN (src->rtxbin), bond->rtx_receive);
|
||||
|
||||
g_snprintf (name, 32, "sink_%u", bond->session);
|
||||
gst_element_link_pads (bond->rtx_receive, "src", src->rtx_funnel, name);
|
||||
|
||||
g_snprintf (name, 32, "sink_%u", bond->session);
|
||||
pad = gst_element_get_static_pad (bond->rtx_receive, "sink");
|
||||
gpad = gst_ghost_pad_new (name, pad);
|
||||
gst_object_unref (pad);
|
||||
gst_element_add_pad (src->rtxbin, gpad);
|
||||
|
||||
g_snprintf (name, 32, "rist_rtp_udpsrc%u", bond->session);
|
||||
bond->rtp_src = gst_element_factory_make ("udpsrc", name);
|
||||
g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session);
|
||||
bond->rtcp_src = gst_element_factory_make ("udpsrc", name);
|
||||
g_snprintf (name, 32, "rist_rtcp_dynudpsink%u", bond->session);
|
||||
bond->rtcp_sink = gst_element_factory_make ("dynudpsink", name);
|
||||
if (!bond->rtp_src || !bond->rtcp_src || !bond->rtcp_sink) {
|
||||
g_clear_object (&bond->rtp_src);
|
||||
g_clear_object (&bond->rtcp_src);
|
||||
g_clear_object (&bond->rtcp_sink);
|
||||
g_slice_free (RistReceiverBond, bond);
|
||||
src->missing_plugin = "udp";
|
||||
return NULL;
|
||||
}
|
||||
gst_bin_add_many (GST_BIN (src), bond->rtp_src, bond->rtcp_src,
|
||||
bond->rtcp_sink, NULL);
|
||||
g_object_set (bond->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
|
||||
gst_element_set_locked_state (bond->rtcp_sink, TRUE);
|
||||
|
||||
g_snprintf (name, 32, "recv_rtp_sink_%u", bond->session);
|
||||
gst_element_link_pads (bond->rtp_src, "src", src->rtpbin, name);
|
||||
g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session);
|
||||
gst_element_link_pads (bond->rtcp_src, "src", src->rtpbin, name);
|
||||
g_snprintf (name, 32, "send_rtcp_src_%u", bond->session);
|
||||
gst_element_link_pads (src->rtpbin, name, bond->rtcp_sink, "sink");
|
||||
|
||||
g_ptr_array_add (src->bonds, bond);
|
||||
return bond;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rist_src_pad_added (GstRistSrc * src, GstPad * new_pad, GstElement * rtpbin)
|
||||
{
|
||||
|
@ -158,9 +247,6 @@ static GstElement *
|
|||
gst_rist_src_request_aux_receiver (GstRistSrc * src, guint session_id,
|
||||
GstElement * rtpbin)
|
||||
{
|
||||
if (session_id != 0)
|
||||
return NULL;
|
||||
|
||||
return gst_object_ref (src->rtxbin);
|
||||
}
|
||||
|
||||
|
@ -262,17 +348,17 @@ gst_rist_src_on_new_ssrc (GstRistSrc * src, guint session_id, guint ssrc,
|
|||
GObject *session = NULL;
|
||||
GObject *source = NULL;
|
||||
|
||||
if (session_id != 0)
|
||||
return;
|
||||
|
||||
g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session);
|
||||
g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source);
|
||||
|
||||
if (ssrc & 1)
|
||||
if (ssrc & 1) {
|
||||
GST_DEBUG ("Disabling RTCP and probation on RTX stream "
|
||||
"(SSRC %u on session %u)", ssrc, session_id);
|
||||
g_object_set (source, "disable-rtcp", TRUE, "probation", 0, NULL);
|
||||
else
|
||||
} else {
|
||||
g_signal_connect (session, "on-sending-nacks",
|
||||
(GCallback) gst_rist_src_on_sending_nacks, NULL);
|
||||
}
|
||||
|
||||
g_object_unref (source);
|
||||
g_object_unref (session);
|
||||
|
@ -282,19 +368,22 @@ static void
|
|||
gst_rist_src_new_jitterbuffer (GstRistSrc * src, GstElement * jitterbuffer,
|
||||
guint session, guint ssrc, GstElement * rtpbin)
|
||||
{
|
||||
GST_OBJECT_LOCK (src);
|
||||
if (session != 0) {
|
||||
GST_WARNING_OBJECT (rtpbin, "Unexpected jitterbuffer created.");
|
||||
return;
|
||||
}
|
||||
|
||||
g_object_set (jitterbuffer, "rtx-delay", src->reorder_section,
|
||||
"rtx-max-retries", src->max_rtx_retries, NULL);
|
||||
|
||||
if ((ssrc & 1) == 0) {
|
||||
GST_INFO_OBJECT (src, "Saving jitterbuffer for session %u ssrc %u",
|
||||
session, ssrc);
|
||||
|
||||
g_clear_object (&src->jitterbuffer);
|
||||
src->jitterbuffer = gst_object_ref (jitterbuffer);
|
||||
src->rtp_ssrc = ssrc;
|
||||
}
|
||||
|
||||
GST_OBJECT_UNLOCK (src);
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -302,6 +391,10 @@ gst_rist_src_init (GstRistSrc * src)
|
|||
{
|
||||
GstPad *pad, *gpad;
|
||||
GstStructure *sdes = NULL;
|
||||
RistReceiverBond *bond;
|
||||
|
||||
g_mutex_init (&src->bonds_lock);
|
||||
src->bonds = g_ptr_array_new ();
|
||||
|
||||
/* Construct the RIST RTP receiver pipeline.
|
||||
*
|
||||
|
@ -330,7 +423,6 @@ gst_rist_src_init (GstRistSrc * src)
|
|||
g_object_set (src->rtpbin, "do-retransmission", TRUE,
|
||||
"rtp-profile", 3 /* AVPF */ ,
|
||||
"sdes", sdes, NULL);
|
||||
|
||||
gst_structure_free (sdes);
|
||||
|
||||
g_signal_connect_swapped (src->rtpbin, "request-pt-map",
|
||||
|
@ -340,43 +432,15 @@ gst_rist_src_init (GstRistSrc * src)
|
|||
|
||||
src->rtxbin = gst_bin_new ("rist_recv_rtxbin");
|
||||
g_object_ref_sink (src->rtxbin);
|
||||
src->rtx_receive = gst_element_factory_make ("ristrtxreceive",
|
||||
"rist_rtx_receive");
|
||||
gst_bin_add (GST_BIN (src->rtxbin), src->rtx_receive);
|
||||
|
||||
pad = gst_element_get_static_pad (src->rtx_receive, "sink");
|
||||
gpad = gst_ghost_pad_new ("sink_0", pad);
|
||||
gst_object_unref (pad);
|
||||
gst_element_add_pad (src->rtxbin, gpad);
|
||||
src->rtx_funnel = gst_element_factory_make ("funnel", "rist_rtx_funnel");
|
||||
gst_bin_add (GST_BIN (src->rtxbin), src->rtx_funnel);
|
||||
|
||||
pad = gst_element_get_static_pad (src->rtx_receive, "src");
|
||||
pad = gst_element_get_static_pad (src->rtx_funnel, "src");
|
||||
gpad = gst_ghost_pad_new ("src_0", pad);
|
||||
gst_object_unref (pad);
|
||||
gst_element_add_pad (src->rtxbin, gpad);
|
||||
|
||||
src->rtp_src = gst_element_factory_make ("udpsrc", "rist_rtp_udpsrc");
|
||||
src->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc");
|
||||
src->rtcp_sink =
|
||||
gst_element_factory_make ("dynudpsink", "rist_rtcp_dynudpsink");
|
||||
if (!src->rtp_src || !src->rtcp_src || !src->rtcp_sink) {
|
||||
g_clear_object (&src->rtp_src);
|
||||
g_clear_object (&src->rtcp_src);
|
||||
g_clear_object (&src->rtcp_sink);
|
||||
src->missing_plugin = "udp";
|
||||
goto missing_plugin;
|
||||
}
|
||||
gst_bin_add_many (GST_BIN (src), src->rtp_src, src->rtcp_src,
|
||||
src->rtcp_sink, NULL);
|
||||
g_object_set (src->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
|
||||
/* delay udpsink startup, we will give it the socket from the RTCP udpsrc,
|
||||
* but socket can only be set in NULL state */
|
||||
gst_element_set_locked_state (src->rtcp_sink, TRUE);
|
||||
|
||||
gst_element_link_pads (src->rtp_src, "src", src->rtpbin, "recv_rtp_sink_0");
|
||||
gst_element_link_pads (src->rtcp_src, "src", src->rtpbin, "recv_rtcp_sink_0");
|
||||
gst_element_link_pads (src->rtpbin, "send_rtcp_src_0",
|
||||
src->rtcp_sink, "sink");
|
||||
|
||||
g_signal_connect_swapped (src->rtpbin, "pad-added",
|
||||
G_CALLBACK (gst_rist_src_pad_added), src);
|
||||
g_signal_connect_swapped (src->rtpbin, "on-new-ssrc",
|
||||
|
@ -384,6 +448,10 @@ gst_rist_src_init (GstRistSrc * src)
|
|||
g_signal_connect_swapped (src->rtpbin, "new-jitterbuffer",
|
||||
G_CALLBACK (gst_rist_src_new_jitterbuffer), src);
|
||||
|
||||
bond = gst_rist_src_add_bond (src);
|
||||
if (!bond)
|
||||
goto missing_plugin;
|
||||
|
||||
return;
|
||||
|
||||
missing_plugin:
|
||||
|
@ -400,6 +468,28 @@ gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info,
|
|||
GstRistSrc *src = GST_RIST_SRC (user_data);
|
||||
GstBuffer *buffer;
|
||||
GstNetAddressMeta *meta;
|
||||
GstElement *rtcp_src;
|
||||
RistReceiverBond *bond = NULL;
|
||||
gint i;
|
||||
|
||||
rtcp_src = GST_ELEMENT (gst_pad_get_parent (pad));
|
||||
|
||||
g_mutex_lock (&src->bonds_lock);
|
||||
|
||||
for (i = 0; i < src->bonds->len; i++) {
|
||||
RistReceiverBond *b = g_ptr_array_index (src->bonds, i);
|
||||
if (b->rtcp_src == rtcp_src) {
|
||||
bond = b;
|
||||
break;
|
||||
}
|
||||
}
|
||||
gst_object_unref (rtcp_src);
|
||||
|
||||
if (!bond) {
|
||||
GST_WARNING_OBJECT (src, "Unexpected RTCP source.");
|
||||
g_mutex_unlock (&src->bonds_lock);
|
||||
return GST_PAD_PROBE_OK;
|
||||
}
|
||||
|
||||
if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
|
||||
GstBufferList *buffer_list = info->data;
|
||||
|
@ -410,21 +500,21 @@ gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info,
|
|||
|
||||
meta = gst_buffer_get_net_address_meta (buffer);
|
||||
|
||||
GST_OBJECT_LOCK (src);
|
||||
g_clear_object (&src->rtcp_send_addr);
|
||||
src->rtcp_send_addr = g_object_ref (meta->addr);
|
||||
GST_OBJECT_UNLOCK (src);
|
||||
g_clear_object (&bond->rtcp_send_addr);
|
||||
bond->rtcp_send_addr = g_object_ref (meta->addr);
|
||||
|
||||
g_mutex_unlock (&src->bonds_lock);
|
||||
|
||||
return GST_PAD_PROBE_OK;
|
||||
}
|
||||
|
||||
/* called with bonds lock */
|
||||
static inline void
|
||||
gst_rist_src_attach_net_address_meta (GstRistSrc * src, GstBuffer * buffer)
|
||||
gst_rist_src_attach_net_address_meta (RistReceiverBond * bond,
|
||||
GstBuffer * buffer)
|
||||
{
|
||||
GST_OBJECT_LOCK (src);
|
||||
if (src->rtcp_send_addr)
|
||||
gst_buffer_add_net_address_meta (buffer, src->rtcp_send_addr);
|
||||
GST_OBJECT_UNLOCK (src);
|
||||
if (bond->rtcp_send_addr)
|
||||
gst_buffer_add_net_address_meta (buffer, bond->rtcp_send_addr);
|
||||
}
|
||||
|
||||
static GstPadProbeReturn
|
||||
|
@ -432,6 +522,28 @@ gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info,
|
|||
gpointer user_data)
|
||||
{
|
||||
GstRistSrc *src = GST_RIST_SRC (user_data);
|
||||
GstElement *rtcp_sink;
|
||||
RistReceiverBond *bond = NULL;
|
||||
gint i;
|
||||
|
||||
rtcp_sink = GST_ELEMENT (gst_pad_get_parent (pad));
|
||||
|
||||
g_mutex_lock (&src->bonds_lock);
|
||||
|
||||
for (i = 0; i < src->bonds->len; i++) {
|
||||
RistReceiverBond *b = g_ptr_array_index (src->bonds, i);
|
||||
if (b->rtcp_sink == rtcp_sink) {
|
||||
bond = b;
|
||||
break;
|
||||
}
|
||||
}
|
||||
gst_object_unref (rtcp_sink);
|
||||
|
||||
if (!bond) {
|
||||
GST_WARNING_OBJECT (src, "Unexpected RTCP sink.");
|
||||
g_mutex_unlock (&src->bonds_lock);
|
||||
return GST_PAD_PROBE_OK;
|
||||
}
|
||||
|
||||
if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
|
||||
GstBufferList *buffer_list = info->data;
|
||||
|
@ -441,25 +553,68 @@ gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info,
|
|||
info->data = buffer_list = gst_buffer_list_make_writable (buffer_list);
|
||||
for (i = 0; i < gst_buffer_list_length (buffer_list); i++) {
|
||||
buffer = gst_buffer_list_get (buffer_list, i);
|
||||
gst_rist_src_attach_net_address_meta (src, buffer);
|
||||
gst_rist_src_attach_net_address_meta (bond, buffer);
|
||||
}
|
||||
} else {
|
||||
GstBuffer *buffer = info->data;
|
||||
info->data = buffer = gst_buffer_make_writable (buffer);
|
||||
gst_rist_src_attach_net_address_meta (src, buffer);
|
||||
gst_rist_src_attach_net_address_meta (bond, buffer);
|
||||
}
|
||||
|
||||
g_mutex_unlock (&src->bonds_lock);
|
||||
|
||||
return GST_PAD_PROBE_OK;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_rist_src_setup_rtcp_socket (GstRistSrc * src, RistReceiverBond * bond)
|
||||
{
|
||||
GstPad *pad;
|
||||
GSocket *socket = NULL;
|
||||
GInetAddress *iaddr = NULL;
|
||||
guint port = bond->port + 1;
|
||||
|
||||
g_object_get (bond->rtcp_src, "used-socket", &socket, NULL);
|
||||
if (!socket)
|
||||
return GST_STATE_CHANGE_FAILURE;
|
||||
|
||||
iaddr = g_inet_address_new_from_string (bond->address);
|
||||
|
||||
if (g_inet_address_get_is_multicast (iaddr)) {
|
||||
/* mc-ttl is not supported by dynudpsink */
|
||||
g_socket_set_multicast_ttl (socket, src->multicast_ttl);
|
||||
/* In multicast, send RTCP to the multicast group */
|
||||
bond->rtcp_send_addr = g_inet_socket_address_new (iaddr, port);
|
||||
} else {
|
||||
/* In unicast, send RTCP to the detected sender address */
|
||||
pad = gst_element_get_static_pad (bond->rtcp_src, "src");
|
||||
bond->rtcp_recv_probe = gst_pad_add_probe (pad,
|
||||
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
|
||||
gst_rist_src_on_recv_rtcp, src, NULL);
|
||||
gst_object_unref (pad);
|
||||
}
|
||||
g_object_unref (iaddr);
|
||||
|
||||
pad = gst_element_get_static_pad (bond->rtcp_sink, "sink");
|
||||
bond->rtcp_send_probe = gst_pad_add_probe (pad,
|
||||
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
|
||||
gst_rist_src_on_send_rtcp, src, NULL);
|
||||
gst_object_unref (pad);
|
||||
|
||||
/* share the socket created by the source */
|
||||
g_object_set (bond->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL);
|
||||
g_object_unref (socket);
|
||||
|
||||
gst_element_set_locked_state (bond->rtcp_sink, FALSE);
|
||||
gst_element_sync_state_with_parent (bond->rtcp_sink);
|
||||
|
||||
return GST_STATE_CHANGE_SUCCESS;
|
||||
}
|
||||
|
||||
static GstStateChangeReturn
|
||||
gst_rist_src_start (GstRistSrc * src)
|
||||
{
|
||||
GstPad *pad;
|
||||
GSocket *socket = NULL;
|
||||
gchar *address;
|
||||
guint rtcp_port;
|
||||
GInetAddress *iaddr;
|
||||
gint i;
|
||||
|
||||
if (src->construct_failed) {
|
||||
GST_ELEMENT_ERROR (src, CORE, MISSING_PLUGIN,
|
||||
|
@ -468,39 +623,18 @@ gst_rist_src_start (GstRistSrc * src)
|
|||
return GST_STATE_CHANGE_FAILURE;
|
||||
}
|
||||
|
||||
g_object_get (src->rtcp_src, "used-socket", &socket,
|
||||
"address", &address, "port", &rtcp_port, NULL);
|
||||
for (i = 0; i < src->bonds->len; i++) {
|
||||
RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
|
||||
GObject *session = NULL;
|
||||
|
||||
iaddr = g_inet_address_new_from_string (address);
|
||||
g_free (address);
|
||||
g_signal_emit_by_name (src->rtpbin, "get-session", i, &session);
|
||||
g_object_set (session, "rtcp-min-interval", src->min_rtcp_interval,
|
||||
"rtcp-fraction", src->max_rtcp_bandwidth, NULL);
|
||||
g_object_unref (session);
|
||||
|
||||
if (g_inet_address_get_is_multicast (iaddr)) {
|
||||
/* mc-ttl is not supported by dynudpsink */
|
||||
g_socket_set_multicast_ttl (socket, src->multicast_ttl);
|
||||
/* In multicast, send RTCP to the multicast group */
|
||||
src->rtcp_send_addr = g_inet_socket_address_new (iaddr, rtcp_port);
|
||||
} else {
|
||||
/* In unicast, send RTCP to the detected sender address */
|
||||
pad = gst_element_get_static_pad (src->rtcp_src, "src");
|
||||
src->rtcp_recv_probe = gst_pad_add_probe (pad,
|
||||
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
|
||||
gst_rist_src_on_recv_rtcp, src, NULL);
|
||||
gst_object_unref (pad);
|
||||
if (!gst_rist_src_setup_rtcp_socket (src, bond))
|
||||
return GST_STATE_CHANGE_FAILURE;
|
||||
}
|
||||
g_object_unref (iaddr);
|
||||
|
||||
pad = gst_element_get_static_pad (src->rtcp_sink, "sink");
|
||||
src->rtcp_send_probe = gst_pad_add_probe (pad,
|
||||
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
|
||||
gst_rist_src_on_send_rtcp, src, NULL);
|
||||
gst_object_unref (pad);
|
||||
|
||||
/* share the socket created by the source */
|
||||
g_object_set (src->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL);
|
||||
g_object_unref (socket);
|
||||
|
||||
gst_element_set_locked_state (src->rtcp_sink, FALSE);
|
||||
gst_element_sync_state_with_parent (src->rtcp_sink);
|
||||
|
||||
return GST_STATE_CHANGE_SUCCESS;
|
||||
}
|
||||
|
@ -599,18 +733,25 @@ static void
|
|||
gst_rist_src_stop (GstRistSrc * src)
|
||||
{
|
||||
GstPad *pad;
|
||||
gint i;
|
||||
|
||||
if (src->rtcp_recv_probe) {
|
||||
pad = gst_element_get_static_pad (src->rtcp_src, "src");
|
||||
gst_pad_remove_probe (pad, src->rtcp_recv_probe);
|
||||
src->rtcp_recv_probe = 0;
|
||||
gst_object_unref (pad);
|
||||
for (i = 0; i < src->bonds->len; i++) {
|
||||
RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
|
||||
|
||||
if (bond->rtcp_recv_probe) {
|
||||
pad = gst_element_get_static_pad (bond->rtcp_src, "src");
|
||||
gst_pad_remove_probe (pad, bond->rtcp_recv_probe);
|
||||
bond->rtcp_recv_probe = 0;
|
||||
gst_object_unref (pad);
|
||||
}
|
||||
|
||||
if (bond->rtcp_send_probe) {
|
||||
pad = gst_element_get_static_pad (bond->rtcp_sink, "sink");
|
||||
gst_pad_remove_probe (pad, bond->rtcp_send_probe);
|
||||
bond->rtcp_send_probe = 0;
|
||||
gst_object_unref (pad);
|
||||
}
|
||||
}
|
||||
|
||||
pad = gst_element_get_static_pad (src->rtcp_sink, "sink");
|
||||
gst_pad_remove_probe (pad, src->rtcp_send_probe);
|
||||
src->rtcp_send_probe = 0;
|
||||
gst_object_unref (pad);
|
||||
}
|
||||
|
||||
static GstStateChangeReturn
|
||||
|
@ -647,25 +788,178 @@ gst_rist_src_change_state (GstElement * element, GstStateChange transition)
|
|||
return ret;
|
||||
}
|
||||
|
||||
/* called with bonds lock */
|
||||
static void
|
||||
gst_rist_src_update_bond_address (GstRistSrc * src, RistReceiverBond * bond,
|
||||
const gchar * address, guint port, const gchar * multicast_iface)
|
||||
{
|
||||
g_free (bond->address);
|
||||
g_free (bond->multicast_iface);
|
||||
bond->address = g_strdup (address);
|
||||
bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
|
||||
bond->port = port;
|
||||
|
||||
g_object_set (G_OBJECT (bond->rtp_src), "address", address, "port", port,
|
||||
"multicast-iface", bond->multicast_iface, NULL);
|
||||
g_object_set (G_OBJECT (bond->rtcp_src), "address", address,
|
||||
"port", port + 1, "multicast-iface", bond->multicast_iface, NULL);
|
||||
|
||||
/* TODO add runtime support
|
||||
* - add blocking the pad probe
|
||||
* - update RTCP socket
|
||||
* - cycle elements through NULL state
|
||||
*/
|
||||
}
|
||||
|
||||
/* called with bonds lock */
|
||||
static gchar *
|
||||
gst_rist_src_get_bonds (GstRistSrc * src)
|
||||
{
|
||||
GString *bonds = g_string_new ("");
|
||||
gint i;
|
||||
|
||||
for (i = 0; i < src->bonds->len; i++) {
|
||||
RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
|
||||
if (bonds->len > 0)
|
||||
g_string_append_c (bonds, ':');
|
||||
|
||||
g_string_append_printf (bonds, "%s:%u", bond->address, bond->port);
|
||||
|
||||
if (bond->multicast_iface)
|
||||
g_string_append_printf (bonds, "/%s", bond->multicast_iface);
|
||||
}
|
||||
|
||||
return g_string_free (bonds, FALSE);
|
||||
}
|
||||
|
||||
struct RistAddress
|
||||
{
|
||||
gchar *address;
|
||||
char *multicast_iface;
|
||||
guint port;
|
||||
};
|
||||
|
||||
/* called with bonds lock */
|
||||
static void
|
||||
gst_rist_src_set_bonds (GstRistSrc * src, const gchar * bonds)
|
||||
{
|
||||
GStrv tokens = NULL;
|
||||
struct RistAddress *addrs;
|
||||
gint i;
|
||||
|
||||
if (bonds == NULL)
|
||||
goto missing_address;
|
||||
|
||||
tokens = g_strsplit (bonds, ",", 0);
|
||||
if (tokens[0] == NULL)
|
||||
goto missing_address;
|
||||
|
||||
addrs = g_new0 (struct RistAddress, g_strv_length (tokens));
|
||||
|
||||
/* parse the address list */
|
||||
for (i = 0; tokens[i]; i++) {
|
||||
gchar *address = tokens[i];
|
||||
char *port_ptr, *iface_ptr, *endptr;
|
||||
guint port;
|
||||
|
||||
port_ptr = g_utf8_strrchr (address, -1, ':');
|
||||
iface_ptr = g_utf8_strrchr (address, -1, '/');
|
||||
|
||||
if (!port_ptr)
|
||||
goto bad_parameter;
|
||||
if (!g_ascii_isdigit (port_ptr[1]))
|
||||
goto bad_parameter;
|
||||
|
||||
if (iface_ptr) {
|
||||
if (iface_ptr < port_ptr)
|
||||
goto bad_parameter;
|
||||
iface_ptr[0] = '\0';
|
||||
}
|
||||
|
||||
port = strtol (port_ptr + 1, &endptr, 0);
|
||||
if (endptr[0] != '\0')
|
||||
goto bad_parameter;
|
||||
|
||||
/* port must be a multiple of 2 between 2 and 65534 */
|
||||
if (port < 2 || (port & 1) || port > G_MAXUINT16)
|
||||
goto invalid_port;
|
||||
|
||||
port_ptr[0] = '\0';
|
||||
addrs[i].port = port;
|
||||
addrs[i].address = g_strstrip (address);
|
||||
if (iface_ptr)
|
||||
addrs[i].multicast_iface = g_strstrip (iface_ptr + 1);
|
||||
}
|
||||
|
||||
/* configure the bonds */
|
||||
for (i = 0; tokens[i]; i++) {
|
||||
RistReceiverBond *bond = NULL;
|
||||
|
||||
if (i < src->bonds->len)
|
||||
bond = g_ptr_array_index (src->bonds, i);
|
||||
else
|
||||
bond = gst_rist_src_add_bond (src);
|
||||
|
||||
gst_rist_src_update_bond_address (src, bond, addrs[i].address,
|
||||
addrs[i].port, addrs[i].multicast_iface);
|
||||
}
|
||||
|
||||
g_strfreev (tokens);
|
||||
return;
|
||||
|
||||
missing_address:
|
||||
g_warning ("'bonding-addresses' cannot be empty");
|
||||
g_strfreev (tokens);
|
||||
return;
|
||||
|
||||
bad_parameter:
|
||||
g_warning ("Failed to parse address '%s", tokens[i]);
|
||||
g_strfreev (tokens);
|
||||
g_free (addrs);
|
||||
return;
|
||||
|
||||
invalid_port:
|
||||
g_warning ("RIST port must valid UDP port and a multiple of 2.");
|
||||
g_strfreev (tokens);
|
||||
g_free (addrs);
|
||||
return;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rist_src_set_multicast_loopback (GstRistSrc * src, gboolean loop)
|
||||
{
|
||||
gint i;
|
||||
|
||||
src->multicast_loopback = loop;
|
||||
for (i = 0; i < src->bonds->len; i++) {
|
||||
RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
|
||||
g_object_set (G_OBJECT (bond->rtp_src), "loop", loop, NULL);
|
||||
g_object_set (G_OBJECT (bond->rtcp_src), "loop", loop, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rist_src_get_property (GObject * object, guint prop_id,
|
||||
GValue * value, GParamSpec * pspec)
|
||||
{
|
||||
GstRistSrc *src = GST_RIST_SRC (object);
|
||||
GstElement *session = NULL;
|
||||
GstClockTime interval;
|
||||
GstStructure *sdes;
|
||||
RistReceiverBond *bond;
|
||||
|
||||
if (src->construct_failed)
|
||||
return;
|
||||
|
||||
g_mutex_lock (&src->bonds_lock);
|
||||
|
||||
bond = g_ptr_array_index (src->bonds, 0);
|
||||
|
||||
switch (prop_id) {
|
||||
case PROP_ADDRESS:
|
||||
g_object_get_property (G_OBJECT (src->rtp_src), "address", value);
|
||||
g_value_set_string (value, bond->address);
|
||||
break;
|
||||
|
||||
case PROP_PORT:
|
||||
g_object_get_property (G_OBJECT (src->rtp_src), "port", value);
|
||||
g_value_set_uint (value, bond->port);
|
||||
break;
|
||||
|
||||
case PROP_RECEIVER_BUFFER:
|
||||
|
@ -673,28 +967,19 @@ gst_rist_src_get_property (GObject * object, guint prop_id,
|
|||
break;
|
||||
|
||||
case PROP_REORDER_SECTION:
|
||||
GST_OBJECT_LOCK (src);
|
||||
g_value_set_uint (value, src->reorder_section);
|
||||
GST_OBJECT_UNLOCK (src);
|
||||
break;
|
||||
|
||||
case PROP_MAX_RTX_RETRIES:
|
||||
GST_OBJECT_LOCK (src);
|
||||
g_value_set_uint (value, src->max_rtx_retries);
|
||||
GST_OBJECT_UNLOCK (src);
|
||||
break;
|
||||
|
||||
case PROP_MIN_RTCP_INTERVAL:
|
||||
g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session);
|
||||
g_object_get (session, "rtcp-min-interval", &interval, NULL);
|
||||
g_value_set_uint (value, (guint) (interval / GST_MSECOND));
|
||||
g_object_unref (session);
|
||||
g_value_set_uint (value, (guint) (src->min_rtcp_interval / GST_MSECOND));
|
||||
break;
|
||||
|
||||
case PROP_MAX_RTCP_BANDWIDTH:
|
||||
g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session);
|
||||
g_object_get_property (G_OBJECT (session), "rtcp-fraction", value);
|
||||
g_object_unref (session);
|
||||
g_value_set_double (value, src->max_rtcp_bandwidth);
|
||||
break;
|
||||
|
||||
case PROP_STATS_UPDATE_INTERVAL:
|
||||
|
@ -712,21 +997,27 @@ gst_rist_src_get_property (GObject * object, guint prop_id,
|
|||
break;
|
||||
|
||||
case PROP_MULTICAST_LOOPBACK:
|
||||
g_object_get_property (G_OBJECT (src->rtp_src), "loop", value);
|
||||
g_value_set_boolean (value, src->multicast_loopback);
|
||||
break;
|
||||
|
||||
case PROP_MULTICAST_IFACE:
|
||||
g_object_get_property (G_OBJECT (src->rtp_src), "multicast-iface", value);
|
||||
g_value_set_string (value, bond->multicast_iface);
|
||||
break;
|
||||
|
||||
case PROP_MULTICAST_TTL:
|
||||
g_value_set_int (value, src->multicast_ttl);
|
||||
break;
|
||||
|
||||
case PROP_BONDING_ADDRESSES:
|
||||
g_value_take_string (value, gst_rist_src_get_bonds (src));
|
||||
break;
|
||||
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
g_mutex_unlock (&src->bonds_lock);
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -734,16 +1025,22 @@ gst_rist_src_set_property (GObject * object, guint prop_id,
|
|||
const GValue * value, GParamSpec * pspec)
|
||||
{
|
||||
GstRistSrc *src = GST_RIST_SRC (object);
|
||||
GstElement *session = NULL;
|
||||
GstStructure *sdes;
|
||||
RistReceiverBond *bond;
|
||||
|
||||
if (src->construct_failed)
|
||||
return;
|
||||
|
||||
g_mutex_lock (&src->bonds_lock);
|
||||
|
||||
bond = g_ptr_array_index (src->bonds, 0);
|
||||
|
||||
switch (prop_id) {
|
||||
case PROP_ADDRESS:
|
||||
g_object_set_property (G_OBJECT (src->rtp_src), "address", value);
|
||||
g_object_set_property (G_OBJECT (src->rtcp_src), "address", value);
|
||||
g_free (bond->address);
|
||||
bond->address = g_value_dup_string (value);
|
||||
g_object_set_property (G_OBJECT (bond->rtp_src), "address", value);
|
||||
g_object_set_property (G_OBJECT (bond->rtcp_src), "address", value);
|
||||
break;
|
||||
|
||||
case PROP_PORT:{
|
||||
|
@ -757,8 +1054,9 @@ gst_rist_src_set_property (GObject * object, guint prop_id,
|
|||
return;
|
||||
}
|
||||
|
||||
g_object_set (src->rtp_src, "port", port, NULL);
|
||||
g_object_set (src->rtcp_src, "port", port + 1, NULL);
|
||||
bond->port = port;
|
||||
g_object_set (bond->rtp_src, "port", port, NULL);
|
||||
g_object_set (bond->rtcp_src, "port", port + 1, NULL);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -767,28 +1065,19 @@ gst_rist_src_set_property (GObject * object, guint prop_id,
|
|||
break;
|
||||
|
||||
case PROP_REORDER_SECTION:
|
||||
GST_OBJECT_LOCK (src);
|
||||
src->reorder_section = g_value_get_uint (value);
|
||||
GST_OBJECT_UNLOCK (src);
|
||||
break;
|
||||
|
||||
case PROP_MAX_RTX_RETRIES:
|
||||
GST_OBJECT_LOCK (src);
|
||||
src->max_rtx_retries = g_value_get_uint (value);
|
||||
GST_OBJECT_UNLOCK (src);
|
||||
break;
|
||||
|
||||
case PROP_MIN_RTCP_INTERVAL:
|
||||
g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session);
|
||||
g_object_set (session, "rtcp-min-interval",
|
||||
g_value_get_uint (value) * GST_MSECOND, NULL);
|
||||
g_object_unref (session);
|
||||
src->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND;
|
||||
break;
|
||||
|
||||
case PROP_MAX_RTCP_BANDWIDTH:
|
||||
g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session);
|
||||
g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL);
|
||||
g_object_unref (session);
|
||||
src->max_rtcp_bandwidth = g_value_get_double (value);
|
||||
break;
|
||||
|
||||
case PROP_STATS_UPDATE_INTERVAL:
|
||||
|
@ -803,13 +1092,15 @@ gst_rist_src_set_property (GObject * object, guint prop_id,
|
|||
break;
|
||||
|
||||
case PROP_MULTICAST_LOOPBACK:
|
||||
g_object_set_property (G_OBJECT (src->rtp_src), "loop", value);
|
||||
g_object_set_property (G_OBJECT (src->rtcp_src), "loop", value);
|
||||
gst_rist_src_set_multicast_loopback (src, g_value_get_boolean (value));
|
||||
break;
|
||||
|
||||
case PROP_MULTICAST_IFACE:
|
||||
g_object_set_property (G_OBJECT (src->rtp_src), "multicast-iface", value);
|
||||
g_object_set_property (G_OBJECT (src->rtcp_src),
|
||||
g_free (bond->multicast_iface);
|
||||
bond->multicast_iface = g_value_dup_string (value);
|
||||
g_object_set_property (G_OBJECT (bond->rtp_src),
|
||||
"multicast-iface", value);
|
||||
g_object_set_property (G_OBJECT (bond->rtcp_src),
|
||||
"multicast-iface", value);
|
||||
break;
|
||||
|
||||
|
@ -817,22 +1108,41 @@ gst_rist_src_set_property (GObject * object, guint prop_id,
|
|||
src->multicast_ttl = g_value_get_int (value);
|
||||
break;
|
||||
|
||||
case PROP_BONDING_ADDRESSES:
|
||||
gst_rist_src_set_bonds (src, g_value_get_string (value));
|
||||
break;
|
||||
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
g_mutex_unlock (&src->bonds_lock);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rist_src_finalize (GObject * object)
|
||||
{
|
||||
GstRistSrc *src = GST_RIST_SRC (object);
|
||||
gint i;
|
||||
|
||||
g_mutex_lock (&src->bonds_lock);
|
||||
|
||||
for (i = 0; i < src->bonds->len; i++) {
|
||||
RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
|
||||
g_free (bond->address);
|
||||
g_free (bond->multicast_iface);
|
||||
g_slice_free (RistReceiverBond, bond);
|
||||
}
|
||||
g_ptr_array_free (src->bonds, TRUE);
|
||||
|
||||
if (src->jitterbuffer)
|
||||
gst_object_unref (src->jitterbuffer);
|
||||
|
||||
gst_object_unref (src->rtxbin);
|
||||
|
||||
g_mutex_unlock (&src->bonds_lock);
|
||||
g_mutex_clear (&src->bonds_lock);
|
||||
|
||||
G_OBJECT_CLASS (gst_rist_src_parent_class)->finalize (object);
|
||||
}
|
||||
|
||||
|
@ -925,6 +1235,12 @@ gst_rist_src_class_init (GstRistSrcClass * klass)
|
|||
g_param_spec_int ("multicast-ttl", "Multicast TTL",
|
||||
"The multicast time-to-live parameter.", 0, 255, 1,
|
||||
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
|
||||
|
||||
g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES,
|
||||
g_param_spec_string ("bonding-addresses", "Bonding Addresses",
|
||||
"Comma (,) seperated list of <address>:<port> to receive from. "
|
||||
"Only used if 'enale-bonding' is set.", NULL,
|
||||
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
||||
}
|
||||
|
||||
static GstURIType
|
||||
|
|
Loading…
Reference in a new issue