/* GStreamer RIST plugin * Copyright (C) 2019 Net Insight AB * Author: Nicolas Dufresne * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ /** * SECTION:element-ristsrc * @title: ristsrc * @see_also: ristsink * * This element implements RIST TR-06-1 Simple Profile receiver. The stream * produced by this element will be RTP payloaded. This element also implements * the URI scheme `rist://` allowing to render RIST streams in GStreamer based * media players. The RIST URI handler also allows setting properties through * the URI query. * * It also implements part of the RIST TR-06-2 Main Profile receiver. The * tunneling, multiplexing and encryption parts of the specification are not * included. This element will accept the RIST RTP header extension and restore * the null MPEG-TS packets if the extension is included. It will not currently * use the sequence number extension when sending RTCP NACK requests. * * ## Example gst-launch line * |[ * gst-launch-1.0 ristsrc address=0.0.0.0 port=5004 ! rtpmp2tdepay ! udpsink * gst-play-1.0 "rist://0.0.0.0:5004?receiver-buffer=700" * ]| * * Additionally, this element supports link bonding, which means it * can receive the same stream from multiple addresses. Each address * will be mapped to its own RTP session. In order to enable bonding * support, one need to configure the list of addresses through * "bonding-addresses" properties. * * ## Example gst-launch line for bonding * |[ * gst-launch-1.0 ristsrc bonding-addresses="10.0.0.1:5004,11.0.0.1:5006" ! rtpmp2tdepay ! udpsink * gst-play-1.0 "rist://0.0.0.0:5004?bonding-addresses=10.0.0.1:5004,11.0.0.1:5006" * ]| */ /* using GValueArray, which has not replacement */ #define GLIB_DISABLE_DEPRECATION_WARNINGS #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include /* for strtol() */ #include #ifdef HAVE_SYS_SOCKET_H #include #endif #include "gstrist.h" GST_DEBUG_CATEGORY_STATIC (gst_rist_src_debug); #define GST_CAT_DEFAULT gst_rist_src_debug enum { PROP_ADDRESS = 1, PROP_PORT, PROP_RECEIVER_BUFFER, PROP_REORDER_SECTION, PROP_MAX_RTX_RETRIES, PROP_MIN_RTCP_INTERVAL, PROP_MAX_RTCP_BANDWIDTH, PROP_STATS_UPDATE_INTERVAL, PROP_STATS, PROP_CNAME, PROP_MULTICAST_LOOPBACK, PROP_MULTICAST_IFACE, PROP_MULTICAST_TTL, PROP_BONDING_ADDRESSES }; static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-rtp")); typedef struct { guint session; gchar *address; gchar *multicast_iface; guint port; GstElement *rtcp_src; GstElement *rtp_src; GstElement *rtcp_sink; GstElement *rtx_receive; gulong rtcp_recv_probe; gulong rtcp_send_probe; GSocketAddress *rtcp_send_addr; } RistReceiverBond; struct _GstRistSrc { GstBin parent; GstUri *uri; /* Common elements in the pipeline */ GstElement *rtpbin; GstPad *srcpad; GstElement *rtxbin; GstElement *rtx_funnel; GstElement *rtpdeext; /* Common properties, protected by bonds_lock */ guint reorder_section; guint max_rtx_retries; GstClockTime min_rtcp_interval; gdouble max_rtcp_bandwidth; gint multicast_loopback; gint multicast_ttl; /* Bonds */ GPtrArray *bonds; /* this is needed as setting sibling properties will try to take the object * lock. Thus, any properties that affects the bonds will be protected with * that lock instead of the object lock. */ GMutex bonds_lock; /* For stats */ guint stats_interval; guint32 rtp_ssrc; GstClockID stats_cid; GstElement *jitterbuffer; /* This is set whenever there is a pipeline construction failure, and used * to fail state changes later */ gboolean construct_failed; const gchar *missing_plugin; }; static void gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data); G_DEFINE_TYPE_WITH_CODE (GstRistSrc, gst_rist_src, GST_TYPE_BIN, G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rist_src_uri_init); GST_DEBUG_CATEGORY_INIT (gst_rist_src_debug, "ristsrc", 0, "RIST Source")); GST_ELEMENT_REGISTER_DEFINE (ristsrc, "ristsrc", GST_RANK_PRIMARY, GST_TYPE_RIST_SRC); /* called with bonds lock */ static RistReceiverBond * gst_rist_src_add_bond (GstRistSrc * src) { RistReceiverBond *bond = g_slice_new0 (RistReceiverBond); GstPad *pad, *gpad; gchar name[32]; bond->session = src->bonds->len; bond->address = g_strdup ("0.0.0.0"); g_snprintf (name, 32, "rist_rtx_receive%u", bond->session); bond->rtx_receive = gst_element_factory_make ("ristrtxreceive", name); gst_bin_add (GST_BIN (src->rtxbin), bond->rtx_receive); g_snprintf (name, 32, "sink_%u", bond->session); gst_element_link_pads (bond->rtx_receive, "src", src->rtx_funnel, name); g_snprintf (name, 32, "sink_%u", bond->session); pad = gst_element_get_static_pad (bond->rtx_receive, "sink"); gpad = gst_ghost_pad_new (name, pad); gst_object_unref (pad); gst_element_add_pad (src->rtxbin, gpad); g_snprintf (name, 32, "rist_rtp_udpsrc%u", bond->session); bond->rtp_src = gst_element_factory_make ("udpsrc", name); g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session); bond->rtcp_src = gst_element_factory_make ("udpsrc", name); g_snprintf (name, 32, "rist_rtcp_dynudpsink%u", bond->session); bond->rtcp_sink = gst_element_factory_make ("dynudpsink", name); if (!bond->rtp_src || !bond->rtcp_src || !bond->rtcp_sink) { g_clear_object (&bond->rtp_src); g_clear_object (&bond->rtcp_src); g_clear_object (&bond->rtcp_sink); g_slice_free (RistReceiverBond, bond); src->missing_plugin = "udp"; return NULL; } gst_bin_add_many (GST_BIN (src), bond->rtp_src, bond->rtcp_src, bond->rtcp_sink, NULL); g_object_set (bond->rtcp_sink, "sync", FALSE, "async", FALSE, NULL); gst_element_set_locked_state (bond->rtcp_sink, TRUE); g_snprintf (name, 32, "recv_rtp_sink_%u", bond->session); gst_element_link_pads (bond->rtp_src, "src", src->rtpbin, name); g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session); gst_element_link_pads (bond->rtcp_src, "src", src->rtpbin, name); g_snprintf (name, 32, "send_rtcp_src_%u", bond->session); gst_element_link_pads (src->rtpbin, name, bond->rtcp_sink, "sink"); g_ptr_array_add (src->bonds, bond); return bond; } static void gst_rist_src_pad_added (GstRistSrc * src, GstPad * new_pad, GstElement * rtpbin) { GST_TRACE_OBJECT (src, "New pad '%s'.", GST_PAD_NAME (new_pad)); if (g_str_has_prefix (GST_PAD_NAME (new_pad), "recv_rtp_src_0_")) { GST_DEBUG_OBJECT (src, "Using new pad '%s' as ghost pad target.", GST_PAD_NAME (new_pad)); gst_ghost_pad_set_target (GST_GHOST_PAD (src->srcpad), new_pad); } } static GstCaps * gst_rist_src_request_pt_map (GstRistSrc * src, guint session_id, guint pt, GstElement * rtpbin) { const GstRTPPayloadInfo *pt_info; GstCaps *ret; pt_info = gst_rtp_payload_info_for_pt (pt); if (!pt_info || !pt_info->clock_rate) return NULL; ret = gst_caps_new_simple ("application/x-rtp", "media", G_TYPE_STRING, pt_info->media, "encoding_name", G_TYPE_STRING, pt_info->encoding_name, "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL); /* FIXME add sprop-parameter-set if any */ g_warn_if_fail (pt_info->encoding_parameters == NULL); return ret; } static GstElement * gst_rist_src_request_aux_receiver (GstRistSrc * src, guint session_id, GstElement * rtpbin) { return gst_object_ref (src->rtxbin); } /* Overrides the nack creation. Right now we don't send mixed NACKS type, we * simply send a set of range NACK if it takes less space, or allow adding * more seqnum. */ static guint gst_rist_src_on_sending_nacks (GObject * session, guint sender_ssrc, guint media_ssrc, GArray * nacks, GstBuffer * buffer, gpointer user_data) { GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT; GstRTCPPacket packet; guint8 *app_data; guint nacked_seqnums = 0; guint range_size = 0; guint n_rg_nacks = 0; guint n_fb_nacks = 0; guint16 seqnum; guint i; gint diff; /* We'll assume that range will be best, and find how many generic NACK * would have been created. If this number ends up being smaller, we will * just remove the APP packet and return 0, leaving it to RTPSession to * create the generic NACK.*/ gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp); if (!gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet)) /* exit because the packet is full, will put next request in a * further packet */ goto done; gst_rtcp_packet_app_set_ssrc (&packet, media_ssrc); gst_rtcp_packet_app_set_name (&packet, "RIST"); if (!gst_rtcp_packet_app_set_data_length (&packet, 1)) { gst_rtcp_packet_remove (&packet); GST_WARNING ("no range nacks fit in the packet"); goto done; } app_data = gst_rtcp_packet_app_get_data (&packet); for (i = 0; i < nacks->len; i = nacked_seqnums) { guint j; seqnum = g_array_index (nacks, guint16, i); if (!gst_rtcp_packet_app_set_data_length (&packet, n_rg_nacks + 1)) break; n_rg_nacks++; nacked_seqnums++; for (j = i + 1; j < nacks->len; j++) { guint16 next_seqnum = g_array_index (nacks, guint16, j); diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum); GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, next_seqnum, diff); if (diff > (j - i)) break; nacked_seqnums++; } range_size = j - i - 1; GST_WRITE_UINT32_BE (app_data, seqnum << 16 | range_size); app_data += 4; } /* count how many FB NACK it would take to wrap nacked_seqnums */ seqnum = g_array_index (nacks, guint16, 0); n_fb_nacks = 1; for (i = 1; i < nacked_seqnums; i++) { guint16 next_seqnum = g_array_index (nacks, guint16, i); diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum); if (diff > 16) { n_fb_nacks++; seqnum = next_seqnum; } } if (n_fb_nacks <= n_rg_nacks) { GST_DEBUG ("Not sending %u range nacks, as %u FB nacks will be smaller", n_rg_nacks, n_fb_nacks); gst_rtcp_packet_remove (&packet); nacked_seqnums = 0; goto done; } GST_DEBUG ("Sent %u seqnums into %u Range NACKs", nacked_seqnums, n_rg_nacks); done: gst_rtcp_buffer_unmap (&rtcp); return nacked_seqnums; } static void gst_rist_src_on_new_ssrc (GstRistSrc * src, guint session_id, guint ssrc, GstElement * rtpbin) { GObject *session = NULL; GObject *source = NULL; g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session); g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source); if (ssrc & 1) { GST_DEBUG ("Disabling RTCP and probation on RTX stream " "(SSRC %u on session %u)", ssrc, session_id); g_object_set (source, "disable-rtcp", TRUE, "probation", 0, NULL); } else { g_signal_connect (session, "on-sending-nacks", (GCallback) gst_rist_src_on_sending_nacks, NULL); } g_object_unref (source); g_object_unref (session); } static void gst_rist_src_new_jitterbuffer (GstRistSrc * src, GstElement * jitterbuffer, guint session, guint ssrc, GstElement * rtpbin) { if (session != 0) { GST_WARNING_OBJECT (rtpbin, "Unexpected jitterbuffer created."); return; } g_object_set (jitterbuffer, "rtx-delay", src->reorder_section, "rtx-max-retries", src->max_rtx_retries, NULL); if ((ssrc & 1) == 0) { GST_INFO_OBJECT (src, "Saving jitterbuffer for session %u ssrc %u", session, ssrc); g_clear_object (&src->jitterbuffer); src->jitterbuffer = gst_object_ref (jitterbuffer); src->rtp_ssrc = ssrc; } } static void gst_rist_src_init (GstRistSrc * src) { GstPad *pad, *gpad; GstStructure *sdes = NULL; RistReceiverBond *bond; g_mutex_init (&src->bonds_lock); src->bonds = g_ptr_array_new (); /* Construct the RIST RTP receiver pipeline. * * udpsrc -> [recv_rtp_sink_%u] -------- [recv_rtp_src_%u_%u_%u] * | rtpbin | * udpsrc -> [recv_rtcp_sink_%u] -------- [send_rtcp_src_%u] -> udpsink * * This pipeline is fixed for now, note that optionally an FEC stream could * be added later. */ src->srcpad = gst_ghost_pad_new_no_target_from_template ("src", gst_static_pad_template_get (&src_templ)); gst_element_add_pad (GST_ELEMENT (src), src->srcpad); src->rtpbin = gst_element_factory_make ("rtpbin", "rist_recv_rtpbin"); if (!src->rtpbin) { src->missing_plugin = "rtpmanager"; goto missing_plugin; } /* RIST specification says the SDES should only contain the CNAME */ g_object_get (src->rtpbin, "sdes", &sdes, NULL); gst_structure_remove_field (sdes, "tool"); gst_bin_add (GST_BIN (src), src->rtpbin); g_object_set (src->rtpbin, "do-retransmission", TRUE, "rtp-profile", 3 /* AVPF */ , "sdes", sdes, NULL); gst_structure_free (sdes); g_signal_connect_object (src->rtpbin, "request-pt-map", G_CALLBACK (gst_rist_src_request_pt_map), src, G_CONNECT_SWAPPED); g_signal_connect_object (src->rtpbin, "request-aux-receiver", G_CALLBACK (gst_rist_src_request_aux_receiver), src, G_CONNECT_SWAPPED); src->rtxbin = gst_bin_new ("rist_recv_rtxbin"); g_object_ref_sink (src->rtxbin); src->rtx_funnel = gst_element_factory_make ("funnel", "rist_rtx_funnel"); gst_bin_add (GST_BIN (src->rtxbin), src->rtx_funnel); src->rtpdeext = gst_element_factory_make ("ristrtpdeext", "rist_rtp_de_ext"); gst_bin_add (GST_BIN (src->rtxbin), src->rtpdeext); gst_element_link (src->rtx_funnel, src->rtpdeext); pad = gst_element_get_static_pad (src->rtpdeext, "src"); gpad = gst_ghost_pad_new ("src_0", pad); gst_object_unref (pad); gst_element_add_pad (src->rtxbin, gpad); g_signal_connect_object (src->rtpbin, "pad-added", G_CALLBACK (gst_rist_src_pad_added), src, G_CONNECT_SWAPPED); g_signal_connect_object (src->rtpbin, "on-new-ssrc", G_CALLBACK (gst_rist_src_on_new_ssrc), src, G_CONNECT_SWAPPED); g_signal_connect_object (src->rtpbin, "new-jitterbuffer", G_CALLBACK (gst_rist_src_new_jitterbuffer), src, G_CONNECT_SWAPPED); bond = gst_rist_src_add_bond (src); if (!bond) goto missing_plugin; return; missing_plugin: { GST_ERROR_OBJECT (src, "'%s' plugin is missing.", src->missing_plugin); src->construct_failed = TRUE; } } static void gst_rist_src_handle_message (GstBin * bin, GstMessage * message) { switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_STREAM_START: case GST_MESSAGE_EOS: /* drop stream-start & eos from our internal udp sink(s); https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1368 */ gst_message_unref (message); break; default: GST_BIN_CLASS (gst_rist_src_parent_class)->handle_message (bin, message); break; } } static GstPadProbeReturn gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GstRistSrc *src = GST_RIST_SRC (user_data); GstBuffer *buffer; GstNetAddressMeta *meta; GstElement *rtcp_src; RistReceiverBond *bond = NULL; gint i; rtcp_src = GST_ELEMENT (gst_pad_get_parent (pad)); g_mutex_lock (&src->bonds_lock); for (i = 0; i < src->bonds->len; i++) { RistReceiverBond *b = g_ptr_array_index (src->bonds, i); if (b->rtcp_src == rtcp_src) { bond = b; break; } } gst_object_unref (rtcp_src); if (!bond) { GST_WARNING_OBJECT (src, "Unexpected RTCP source."); g_mutex_unlock (&src->bonds_lock); return GST_PAD_PROBE_OK; } if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) { GstBufferList *buffer_list = info->data; buffer = gst_buffer_list_get (buffer_list, 0); } else { buffer = info->data; } meta = gst_buffer_get_net_address_meta (buffer); g_clear_object (&bond->rtcp_send_addr); bond->rtcp_send_addr = g_object_ref (meta->addr); g_mutex_unlock (&src->bonds_lock); return GST_PAD_PROBE_OK; } /* called with bonds lock */ static inline void gst_rist_src_attach_net_address_meta (RistReceiverBond * bond, GstBuffer * buffer) { if (bond->rtcp_send_addr) gst_buffer_add_net_address_meta (buffer, bond->rtcp_send_addr); } static GstPadProbeReturn gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GstRistSrc *src = GST_RIST_SRC (user_data); GstElement *rtcp_sink; RistReceiverBond *bond = NULL; gint i; rtcp_sink = GST_ELEMENT (gst_pad_get_parent (pad)); g_mutex_lock (&src->bonds_lock); for (i = 0; i < src->bonds->len; i++) { RistReceiverBond *b = g_ptr_array_index (src->bonds, i); if (b->rtcp_sink == rtcp_sink) { bond = b; break; } } gst_object_unref (rtcp_sink); if (!bond) { GST_WARNING_OBJECT (src, "Unexpected RTCP sink."); g_mutex_unlock (&src->bonds_lock); return GST_PAD_PROBE_OK; } if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) { GstBufferList *buffer_list = info->data; GstBuffer *buffer; gint i; info->data = buffer_list = gst_buffer_list_make_writable (buffer_list); for (i = 0; i < gst_buffer_list_length (buffer_list); i++) { buffer = gst_buffer_list_get (buffer_list, i); gst_rist_src_attach_net_address_meta (bond, buffer); } } else { GstBuffer *buffer = info->data; info->data = buffer = gst_buffer_make_writable (buffer); gst_rist_src_attach_net_address_meta (bond, buffer); } g_mutex_unlock (&src->bonds_lock); return GST_PAD_PROBE_OK; } static gboolean gst_rist_src_setup_rtcp_socket (GstRistSrc * src, RistReceiverBond * bond) { GstPad *pad; GSocket *socket = NULL; GInetAddress *iaddr = NULL; guint port = bond->port + 1; GError *error = NULL; g_object_get (bond->rtcp_src, "used-socket", &socket, NULL); if (!socket) return GST_STATE_CHANGE_FAILURE; iaddr = g_inet_address_new_from_string (bond->address); if (!iaddr) { GList *results; GResolver *resolver = NULL; resolver = g_resolver_get_default (); results = g_resolver_lookup_by_name (resolver, bond->address, NULL, &error); if (!results) { g_object_unref (resolver); goto dns_resolve_failed; } iaddr = G_INET_ADDRESS (g_object_ref (results->data)); g_resolver_free_addresses (results); g_object_unref (resolver); } if (g_inet_address_get_is_multicast (iaddr)) { /* mc-ttl is not supported by dynudpsink */ g_socket_set_multicast_ttl (socket, src->multicast_ttl); /* In multicast, send RTCP to the multicast group */ bond->rtcp_send_addr = g_inet_socket_address_new (iaddr, port); } else { /* In unicast, send RTCP to the detected sender address */ pad = gst_element_get_static_pad (bond->rtcp_src, "src"); bond->rtcp_recv_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, gst_rist_src_on_recv_rtcp, src, NULL); gst_object_unref (pad); } g_object_unref (iaddr); pad = gst_element_get_static_pad (bond->rtcp_sink, "sink"); bond->rtcp_send_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, gst_rist_src_on_send_rtcp, src, NULL); gst_object_unref (pad); if (bond->multicast_iface) { #ifdef SO_BINDTODEVICE if (setsockopt (g_socket_get_fd (socket), SOL_SOCKET, SO_BINDTODEVICE, bond->multicast_iface, strlen (bond->multicast_iface)) < 0) GST_WARNING_OBJECT (src, "setsockopt SO_BINDTODEVICE failed: %s", strerror (errno)); #else GST_WARNING_OBJECT (src, "Tried to set a multicast interface while" " GStreamer was compiled on a platform without SO_BINDTODEVICE"); #endif } /* share the socket created by the source */ g_object_set (bond->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL); g_object_unref (socket); gst_element_set_locked_state (bond->rtcp_sink, FALSE); gst_element_sync_state_with_parent (bond->rtcp_sink); return GST_STATE_CHANGE_SUCCESS; dns_resolve_failed: GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, ("Could not resolve hostname '%s'", GST_STR_NULL (bond->address)), ("DNS resolver reported: %s", error->message)); g_error_free (error); return GST_STATE_CHANGE_FAILURE; } static GstStateChangeReturn gst_rist_src_start (GstRistSrc * src) { gint i; if (src->construct_failed) { GST_ELEMENT_ERROR (src, CORE, MISSING_PLUGIN, ("Your GStreamer installation is missing plugin '%s'", src->missing_plugin), (NULL)); return GST_STATE_CHANGE_FAILURE; } for (i = 0; i < src->bonds->len; i++) { RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); GObject *session = NULL; g_signal_emit_by_name (src->rtpbin, "get-session", i, &session); g_object_set (session, "rtcp-min-interval", src->min_rtcp_interval, "rtcp-fraction", src->max_rtcp_bandwidth, NULL); g_object_unref (session); if (!gst_rist_src_setup_rtcp_socket (src, bond)) return GST_STATE_CHANGE_FAILURE; } return GST_STATE_CHANGE_SUCCESS; } static GstStructure * gst_rist_src_create_stats (GstRistSrc * src) { GstStructure *ret; GValueArray *session_stats; guint64 total_dropped = 0, total_received = 0, recovered = 0, lost = 0; guint64 duplicates = 0, rtx_sent = 0, rtt = 0; gint i; ret = gst_structure_new_empty ("rist/x-receiver-stats"); session_stats = g_value_array_new (src->bonds->len); for (i = 0; i < src->bonds->len; i++) { GObject *session = NULL, *source = NULL; GstStructure *sstats = NULL, *stats; const gchar *rtp_from = NULL, *rtcp_from = NULL; guint64 dropped = 0, received = 0; GValue value = G_VALUE_INIT; g_signal_emit_by_name (src->rtpbin, "get-internal-session", i, &session); if (!session) continue; stats = gst_structure_new_empty ("rist/x-receiver-session-stats"); g_signal_emit_by_name (session, "get-source-by-ssrc", src->rtp_ssrc, &source); if (source) { gint packet_lost; g_object_get (source, "stats", &sstats, NULL); gst_structure_get_int (sstats, "packets-lost", &packet_lost); dropped = MAX (packet_lost, 0); gst_structure_get_uint64 (sstats, "packets-received", &received); rtp_from = gst_structure_get_string (sstats, "rtp-from"); rtcp_from = gst_structure_get_string (sstats, "rtcp-from"); } g_object_unref (session); gst_structure_set (stats, "session-id", G_TYPE_INT, i, "rtp-from", G_TYPE_STRING, rtp_from ? rtp_from : "", "rtcp-from", G_TYPE_STRING, rtcp_from ? rtcp_from : "", "dropped", G_TYPE_UINT64, MAX (dropped, 0), "received", G_TYPE_UINT64, received, NULL); if (sstats) gst_structure_free (sstats); g_clear_object (&source); g_value_init (&value, GST_TYPE_STRUCTURE); g_value_take_boxed (&value, stats); g_value_array_append (session_stats, &value); g_value_unset (&value); total_dropped += dropped; } if (src->jitterbuffer) { GstStructure *stats; g_object_get (src->jitterbuffer, "stats", &stats, NULL); gst_structure_get (stats, "num-pushed", G_TYPE_UINT64, &total_received, "num-lost", G_TYPE_UINT64, &lost, "rtx-count", G_TYPE_UINT64, &rtx_sent, "num-duplicates", G_TYPE_UINT64, &duplicates, "rtx-success-count", G_TYPE_UINT64, &recovered, "rtx-rtt", G_TYPE_UINT64, &rtt, NULL); gst_structure_free (stats); } gst_structure_set (ret, "dropped", G_TYPE_UINT64, total_dropped, "received", G_TYPE_UINT64, total_received, "recovered", G_TYPE_UINT64, recovered, "permanently-lost", G_TYPE_UINT64, lost, "duplicates", G_TYPE_UINT64, duplicates, "retransmission-requests-sent", G_TYPE_UINT64, rtx_sent, "rtx-roundtrip-time", G_TYPE_UINT64, rtt, "session-stats", G_TYPE_VALUE_ARRAY, session_stats, NULL); g_value_array_free (session_stats); return ret; } static gboolean gst_rist_src_dump_stats (GstClock * clock, GstClockTime time, GstClockID id, gpointer user_data) { GstRistSrc *src = GST_RIST_SRC (user_data); GstStructure *stats = gst_rist_src_create_stats (src); gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (src), stats); gst_structure_free (stats); return TRUE; } static void gst_rist_src_enable_stats_interval (GstRistSrc * src) { GstClock *clock; GstClockTime start, interval; if (src->stats_interval == 0) return; interval = src->stats_interval * GST_MSECOND; clock = gst_system_clock_obtain (); start = gst_clock_get_time (clock) + interval; src->stats_cid = gst_clock_new_periodic_id (clock, start, interval); gst_clock_id_wait_async (src->stats_cid, gst_rist_src_dump_stats, gst_object_ref (src), (GDestroyNotify) gst_object_unref); gst_object_unref (clock); } static void gst_rist_src_disable_stats_interval (GstRistSrc * src) { if (src->stats_cid) { gst_clock_id_unschedule (src->stats_cid); gst_clock_id_unref (src->stats_cid); src->stats_cid = NULL; } } static void gst_rist_src_stop (GstRistSrc * src) { GstPad *pad; gint i; for (i = 0; i < src->bonds->len; i++) { RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); if (bond->rtcp_recv_probe) { pad = gst_element_get_static_pad (bond->rtcp_src, "src"); gst_pad_remove_probe (pad, bond->rtcp_recv_probe); bond->rtcp_recv_probe = 0; gst_object_unref (pad); } if (bond->rtcp_send_probe) { pad = gst_element_get_static_pad (bond->rtcp_sink, "sink"); gst_pad_remove_probe (pad, bond->rtcp_send_probe); bond->rtcp_send_probe = 0; gst_object_unref (pad); } } } static GstStateChangeReturn gst_rist_src_change_state (GstElement * element, GstStateChange transition) { GstRistSrc *src = GST_RIST_SRC (element); GstStateChangeReturn ret; switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: gst_rist_src_disable_stats_interval (src); break; default: break; } ret = GST_ELEMENT_CLASS (gst_rist_src_parent_class)->change_state (element, transition); switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: gst_rist_src_start (src); break; case GST_STATE_CHANGE_READY_TO_PAUSED: gst_rist_src_enable_stats_interval (src); break; case GST_STATE_CHANGE_READY_TO_NULL: gst_rist_src_stop (src); break; default: break; } return ret; } /* called with bonds lock */ static void gst_rist_src_update_bond_address (GstRistSrc * src, RistReceiverBond * bond, const gchar * address, guint port, const gchar * multicast_iface) { g_free (bond->address); g_free (bond->multicast_iface); bond->address = g_strdup (address); bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL; bond->port = port; g_object_set (G_OBJECT (bond->rtp_src), "address", address, "port", port, "multicast-iface", bond->multicast_iface, NULL); g_object_set (G_OBJECT (bond->rtcp_src), "address", address, "port", port + 1, "multicast-iface", bond->multicast_iface, NULL); /* TODO add runtime support * - add blocking the pad probe * - update RTCP socket * - cycle elements through NULL state */ } /* called with bonds lock */ static gchar * gst_rist_src_get_bonds (GstRistSrc * src) { GString *bonds = g_string_new (""); gint i; for (i = 0; i < src->bonds->len; i++) { RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); if (bonds->len > 0) g_string_append_c (bonds, ':'); g_string_append_printf (bonds, "%s:%u", bond->address, bond->port); if (bond->multicast_iface) g_string_append_printf (bonds, "/%s", bond->multicast_iface); } return g_string_free (bonds, FALSE); } struct RistAddress { gchar *address; char *multicast_iface; guint port; }; /* called with bonds lock */ static void gst_rist_src_set_bonds (GstRistSrc * src, const gchar * bonds) { GStrv tokens = NULL; struct RistAddress *addrs; gint i; if (bonds == NULL) goto missing_address; tokens = g_strsplit (bonds, ",", 0); if (tokens[0] == NULL) goto missing_address; addrs = g_new0 (struct RistAddress, g_strv_length (tokens)); /* parse the address list */ for (i = 0; tokens[i]; i++) { gchar *address = tokens[i]; char *port_ptr, *iface_ptr, *endptr; guint port; port_ptr = g_utf8_strrchr (address, -1, ':'); iface_ptr = g_utf8_strrchr (address, -1, '/'); if (!port_ptr) goto bad_parameter; if (!g_ascii_isdigit (port_ptr[1])) goto bad_parameter; if (iface_ptr) { if (iface_ptr < port_ptr) goto bad_parameter; iface_ptr[0] = '\0'; } port = strtol (port_ptr + 1, &endptr, 0); if (endptr[0] != '\0') goto bad_parameter; /* port must be a multiple of 2 between 2 and 65534 */ if (port < 2 || (port & 1) || port > G_MAXUINT16) goto invalid_port; port_ptr[0] = '\0'; addrs[i].port = port; addrs[i].address = g_strstrip (address); if (iface_ptr) addrs[i].multicast_iface = g_strstrip (iface_ptr + 1); } /* configure the bonds */ for (i = 0; tokens[i]; i++) { RistReceiverBond *bond = NULL; if (i < src->bonds->len) bond = g_ptr_array_index (src->bonds, i); else bond = gst_rist_src_add_bond (src); gst_rist_src_update_bond_address (src, bond, addrs[i].address, addrs[i].port, addrs[i].multicast_iface); } g_strfreev (tokens); return; missing_address: g_warning ("'bonding-addresses' cannot be empty"); g_strfreev (tokens); return; bad_parameter: g_warning ("Failed to parse address '%s", tokens[i]); g_strfreev (tokens); g_free (addrs); return; invalid_port: g_warning ("RIST port must valid UDP port and a multiple of 2."); g_strfreev (tokens); g_free (addrs); return; } static void gst_rist_src_set_multicast_loopback (GstRistSrc * src, gboolean loop) { gint i; src->multicast_loopback = loop; for (i = 0; i < src->bonds->len; i++) { RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); g_object_set (G_OBJECT (bond->rtp_src), "loop", loop, NULL); g_object_set (G_OBJECT (bond->rtcp_src), "loop", loop, NULL); } } static void gst_rist_src_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstRistSrc *src = GST_RIST_SRC (object); GstStructure *sdes; RistReceiverBond *bond; if (src->construct_failed) return; g_mutex_lock (&src->bonds_lock); bond = g_ptr_array_index (src->bonds, 0); switch (prop_id) { case PROP_ADDRESS: g_value_set_string (value, bond->address); break; case PROP_PORT: g_value_set_uint (value, bond->port); break; case PROP_RECEIVER_BUFFER: g_object_get_property (G_OBJECT (src->rtpbin), "latency", value); break; case PROP_REORDER_SECTION: g_value_set_uint (value, src->reorder_section); break; case PROP_MAX_RTX_RETRIES: g_value_set_uint (value, src->max_rtx_retries); break; case PROP_MIN_RTCP_INTERVAL: g_value_set_uint (value, (guint) (src->min_rtcp_interval / GST_MSECOND)); break; case PROP_MAX_RTCP_BANDWIDTH: g_value_set_double (value, src->max_rtcp_bandwidth); break; case PROP_STATS_UPDATE_INTERVAL: g_value_set_uint (value, src->stats_interval); break; case PROP_STATS: g_value_take_boxed (value, gst_rist_src_create_stats (src)); break; case PROP_CNAME: g_object_get (src->rtpbin, "sdes", &sdes, NULL); g_value_set_string (value, gst_structure_get_string (sdes, "cname")); gst_structure_free (sdes); break; case PROP_MULTICAST_LOOPBACK: g_value_set_boolean (value, src->multicast_loopback); break; case PROP_MULTICAST_IFACE: g_value_set_string (value, bond->multicast_iface); break; case PROP_MULTICAST_TTL: g_value_set_int (value, src->multicast_ttl); break; case PROP_BONDING_ADDRESSES: g_value_take_string (value, gst_rist_src_get_bonds (src)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } g_mutex_unlock (&src->bonds_lock); } static void gst_rist_src_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstRistSrc *src = GST_RIST_SRC (object); GstStructure *sdes; RistReceiverBond *bond; if (src->construct_failed) return; g_mutex_lock (&src->bonds_lock); bond = g_ptr_array_index (src->bonds, 0); switch (prop_id) { case PROP_ADDRESS: g_free (bond->address); bond->address = g_value_dup_string (value); g_object_set_property (G_OBJECT (bond->rtp_src), "address", value); g_object_set_property (G_OBJECT (bond->rtcp_src), "address", value); break; case PROP_PORT:{ guint port = g_value_get_uint (value); /* According to 5.1.1, RTP receiver port most be even number and RTCP * port should be the RTP port + 1 */ if (port & 0x1) { g_warning ("Invalid RIST port %u, should be an even number.", port); return; } bond->port = port; g_object_set (bond->rtp_src, "port", port, NULL); g_object_set (bond->rtcp_src, "port", port + 1, NULL); break; } case PROP_RECEIVER_BUFFER: g_object_set (src->rtpbin, "latency", g_value_get_uint (value), NULL); break; case PROP_REORDER_SECTION: src->reorder_section = g_value_get_uint (value); break; case PROP_MAX_RTX_RETRIES: src->max_rtx_retries = g_value_get_uint (value); break; case PROP_MIN_RTCP_INTERVAL: src->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND; break; case PROP_MAX_RTCP_BANDWIDTH: src->max_rtcp_bandwidth = g_value_get_double (value); break; case PROP_STATS_UPDATE_INTERVAL: src->stats_interval = g_value_get_uint (value); break; case PROP_CNAME: g_object_get (src->rtpbin, "sdes", &sdes, NULL); gst_structure_set_value (sdes, "cname", value); g_object_set (src->rtpbin, "sdes", sdes, NULL); gst_structure_free (sdes); break; case PROP_MULTICAST_LOOPBACK: gst_rist_src_set_multicast_loopback (src, g_value_get_boolean (value)); break; case PROP_MULTICAST_IFACE: g_free (bond->multicast_iface); bond->multicast_iface = g_value_dup_string (value); g_object_set_property (G_OBJECT (bond->rtp_src), "multicast-iface", value); g_object_set_property (G_OBJECT (bond->rtcp_src), "multicast-iface", value); break; case PROP_MULTICAST_TTL: src->multicast_ttl = g_value_get_int (value); break; case PROP_BONDING_ADDRESSES: gst_rist_src_set_bonds (src, g_value_get_string (value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } g_mutex_unlock (&src->bonds_lock); } static void gst_rist_src_finalize (GObject * object) { GstRistSrc *src = GST_RIST_SRC (object); gint i; g_mutex_lock (&src->bonds_lock); for (i = 0; i < src->bonds->len; i++) { RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); g_free (bond->address); g_free (bond->multicast_iface); g_slice_free (RistReceiverBond, bond); } g_ptr_array_free (src->bonds, TRUE); g_clear_object (&src->jitterbuffer); g_clear_object (&src->rtxbin); g_mutex_unlock (&src->bonds_lock); g_mutex_clear (&src->bonds_lock); G_OBJECT_CLASS (gst_rist_src_parent_class)->finalize (object); } static void gst_rist_src_class_init (GstRistSrcClass * klass) { GstBinClass *bin_class = (GstBinClass *) klass; GstElementClass *element_class = (GstElementClass *) klass; GObjectClass *object_class = (GObjectClass *) klass; gst_element_class_set_metadata (element_class, "RIST Source", "Source/Network", "Source that implements RIST TR-06-1 streaming specification", "Nicolas Dufresne handle_message = gst_rist_src_handle_message; element_class->change_state = gst_rist_src_change_state; object_class->get_property = gst_rist_src_get_property; object_class->set_property = gst_rist_src_set_property; object_class->finalize = gst_rist_src_finalize; g_object_class_install_property (object_class, PROP_ADDRESS, g_param_spec_string ("address", "Address", "Address to receive packets from (can be IPv4 or IPv6).", "0.0.0.0", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (object_class, PROP_PORT, g_param_spec_uint ("port", "Port", "The port to listen for RTP packets, " "the RTCP port is this value + 1. This port must be an even number.", 2, 65534, 5004, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); g_object_class_install_property (object_class, PROP_RECEIVER_BUFFER, g_param_spec_uint ("receiver-buffer", "Receiver Buffer", "Buffering duration (in ms)", 0, G_MAXUINT, 1000, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); g_object_class_install_property (object_class, PROP_REORDER_SECTION, g_param_spec_uint ("reorder-section", "Recorder Section", "Time to wait before sending retransmission request (in ms)", 0, G_MAXUINT, 70, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); g_object_class_install_property (object_class, PROP_MAX_RTX_RETRIES, g_param_spec_uint ("max-rtx-retries", "Maximum Retransmission Retries", "The maximum number of retransmission requests for a lost packet.", 0, G_MAXUINT, 7, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL, g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal", "The minimum interval (in ms) between two successive RTCP packets", 0, 100, 100, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH, g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth", "The maximum bandwidth used for RTCP as a fraction of RTP bandwdith", 0.0, 0.05, 0.05, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL, g_param_spec_uint ("stats-update-interval", "Statistics Update Interval", "The interval between 'stats' update notification (in ms) (0 disabled)", 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); g_object_class_install_property (object_class, PROP_STATS, g_param_spec_boxed ("stats", "Statistics", "Statistic in a GstStructure named 'rist/x-receiver-stats'", GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (object_class, PROP_CNAME, g_param_spec_string ("cname", "CName", "Set the CNAME in the SDES block of the receiver report.", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | GST_PARAM_DOC_SHOW_DEFAULT)); g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK, g_param_spec_boolean ("multicast-loopback", "Multicast Loopback", "When enabled, the packets will be received locally.", FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); g_object_class_install_property (object_class, PROP_MULTICAST_IFACE, g_param_spec_string ("multicast-iface", "multicast-iface", "The multicast interface to use to send packets.", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (object_class, PROP_MULTICAST_TTL, g_param_spec_int ("multicast-ttl", "Multicast TTL", "The multicast time-to-live parameter.", 0, 255, 1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES, g_param_spec_string ("bonding-addresses", "Bonding Addresses", "Comma (,) separated list of
: to receive from. " "Only used if 'enable-bonding' is set.", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static GstURIType gst_rist_src_uri_get_type (GType type) { return GST_URI_SRC; } static const gchar *const * gst_rist_src_uri_get_protocols (GType type) { static const char *protocols[] = { "rist", NULL }; return protocols; } static gchar * gst_rist_src_uri_get_uri (GstURIHandler * handler) { GstRistSrc *src = GST_RIST_SRC (handler); gchar *uri = NULL; GST_OBJECT_LOCK (src); if (src->uri) uri = gst_uri_to_string (src->uri); GST_OBJECT_UNLOCK (src); return uri; } static void gst_rist_src_uri_query_foreach (const gchar * key, const gchar * value, GObject * src) { if (g_str_equal (key, "async-handling")) { GST_WARNING_OBJECT (src, "Setting '%s' property from URI is not allowed.", key); return; } GST_DEBUG_OBJECT (src, "Setting property '%s' to '%s'", key, value); gst_util_set_object_arg (src, key, value); } static gboolean gst_rist_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, GError ** error) { GstRistSrc *src = GST_RIST_SRC (handler); GstUri *gsturi; GHashTable *query_table; if (GST_STATE (src) >= GST_STATE_PAUSED) { g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE, "Changing the URI on ristsrc when it is running is not supported"); GST_ERROR_OBJECT (src, "%s", (*error)->message); return FALSE; } if (!(gsturi = gst_uri_from_string (uri))) { g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI, "Could not parse URI"); GST_ERROR_OBJECT (src, "%s", (*error)->message); gst_uri_unref (gsturi); return FALSE; } GST_OBJECT_LOCK (src); if (src->uri) gst_uri_unref (src->uri); src->uri = gst_uri_ref (gsturi); GST_OBJECT_UNLOCK (src); g_object_set (src, "address", gst_uri_get_host (gsturi), NULL); if (gst_uri_get_port (gsturi)) g_object_set (src, "port", gst_uri_get_port (gsturi), NULL); query_table = gst_uri_get_query_table (gsturi); if (query_table) g_hash_table_foreach (query_table, (GHFunc) gst_rist_src_uri_query_foreach, src); gst_uri_unref (gsturi); return TRUE; } static void gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data) { GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface; iface->get_type = gst_rist_src_uri_get_type; iface->get_protocols = gst_rist_src_uri_get_protocols; iface->get_uri = gst_rist_src_uri_get_uri; iface->set_uri = gst_rist_src_uri_set_uri; }