udpsrc: Add support for IGMPv3 SSM

Adding "multicast-source" property to support Source Specific Muliticast
RFC 4604. The source can be multiple address with '+' (for positive
filter) or '-' (negative filter) prefix, or URI query can be used.
Note that negative filter is not implemented yet and it will be
ignored

Example:
gst-launch-1.0 uridecodebin \
  uri=udp://{ADDRESS}:PORT?multicast-source=+SOURCE0+SOURCE1

Inspired by:
https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2620

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3485>
This commit is contained in:
Seungha Yang 2022-11-22 23:06:02 +09:00 committed by GStreamer Marge Bot
parent 5797fa09af
commit 3374f2f44d
8 changed files with 349 additions and 91 deletions

View file

@ -24707,6 +24707,18 @@
"type": "gchararray",
"writable": true
},
"multicast-source": {
"blurb": "List of source to receive the stream with '+' (positive filter) or '-' (negative filter, ignored for now) prefix (e.g., \"+SOURCE0+SOURCE1+SOURCE2\"). Alternatively, user can use URI query with the key value \"multicast-source\"",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"port": {
"blurb": "The port to receive the packets from, 0=allocate",
"conditionally-available": false,

View file

@ -23,12 +23,15 @@
#include "gstudpelements.h"
GST_DEBUG_CATEGORY (gst_udp_debug);
static gboolean
plugin_init (GstPlugin * plugin)
{
gboolean ret = FALSE;
GST_DEBUG_CATEGORY_INIT (gst_udp_debug, "udp", 0, "udp");
ret |= GST_ELEMENT_REGISTER (udpsink, plugin);
ret |= GST_ELEMENT_REGISTER (multiudpsink, plugin);
ret |= GST_ELEMENT_REGISTER (dynudpsink, plugin);

View file

@ -28,81 +28,109 @@
#include "gstudpnetutils.h"
GST_DEBUG_CATEGORY_EXTERN (gst_udp_debug);
#define GST_CAT_DEFAULT gst_udp_debug
gboolean
gst_udp_parse_uri (const gchar * uristr, gchar ** host, guint16 * port)
gst_udp_parse_uri (const gchar * uristr, gchar ** host, guint16 * port,
GPtrArray * source_list)
{
gchar *protocol, *location_start;
gchar *location, *location_end;
gchar *colptr;
GstUri *uri;
const gchar *protocol;
uri = gst_uri_from_string (uristr);
if (!uri) {
GST_ERROR ("Invalid URI string %s", uristr);
return FALSE;
}
/* consider no protocol to be udp:// */
protocol = gst_uri_get_protocol (uristr);
if (!protocol)
goto no_protocol;
if (strcmp (protocol, "udp") != 0)
goto wrong_protocol;
g_free (protocol);
location_start = gst_uri_get_location (uristr);
if (!location_start)
return FALSE;
GST_DEBUG ("got location '%s'", location_start);
/* VLC compatibility, strip everything before the @ sign. VLC uses that as the
* remote address. */
location = g_strstr_len (location_start, -1, "@");
if (location == NULL)
location = location_start;
else
location += 1;
if (location[0] == '[') {
GST_DEBUG ("parse IPV6 address '%s'", location);
location_end = strchr (location, ']');
if (location_end == NULL)
goto wrong_address;
*host = g_strndup (location + 1, location_end - location - 1);
colptr = strrchr (location_end, ':');
} else {
GST_DEBUG ("parse IPV4 address '%s'", location);
colptr = strrchr (location, ':');
if (colptr != NULL) {
*host = g_strndup (location, colptr - location);
} else {
*host = g_strdup (location);
}
}
GST_DEBUG ("host set to '%s'", *host);
if (colptr != NULL) {
*port = g_ascii_strtoll (colptr + 1, NULL, 10);
} else {
*port = 0;
}
g_free (location_start);
return TRUE;
/* ERRORS */
no_protocol:
{
protocol = gst_uri_get_scheme (uri);
if (!protocol) {
GST_ERROR ("error parsing uri %s: no protocol", uristr);
return FALSE;
}
wrong_protocol:
{
goto error;
} else if (g_strcmp0 (protocol, "udp")) {
GST_ERROR ("error parsing uri %s: wrong protocol (%s != udp)", uristr,
protocol);
g_free (protocol);
return FALSE;
goto error;
}
wrong_address:
{
GST_ERROR ("error parsing uri %s", uristr);
g_free (location);
return FALSE;
*host = g_strdup (gst_uri_get_host (uri));
if (*host == NULL) {
GST_ERROR ("Unknown host");
goto error;
}
GST_DEBUG ("host set to '%s'", *host);
*port = gst_uri_get_port (uri);
if (source_list) {
const gchar *source = gst_uri_get_query_value (uri, "multicast-source");
if (source)
gst_udp_parse_multicast_source (source, source_list);
}
gst_uri_unref (uri);
return TRUE;
error:
gst_uri_unref (uri);
return FALSE;
}
static gboolean
gst_udp_source_filter_equal_func (gconstpointer a, gconstpointer b)
{
return g_strcmp0 ((const gchar *) a, (const gchar *) b) == 0;
}
gboolean
gst_udp_parse_multicast_source (const gchar * multicast_source,
GPtrArray * source_list)
{
gchar **list;
guint i;
gboolean found = FALSE;
if (!multicast_source || !source_list)
return FALSE;
GST_DEBUG ("Parsing multicast source \"%s\"", multicast_source);
list = g_strsplit_set (multicast_source, "+-", 0);
for (i = 0; list[i] != NULL; i++) {
gchar *prefix;
gboolean is_positive = FALSE;
if (*list[i] == '\0')
continue;
prefix = g_strrstr (multicast_source, list[i]);
g_assert (prefix);
/* Begin without '+' or '-' prefix, assume it's positive filter */
if (prefix == multicast_source) {
GST_WARNING ("%s without prefix, assuming that it's positive filter",
list[i]);
is_positive = TRUE;
} else if (*(prefix - 1) == '+') {
is_positive = TRUE;
}
if (is_positive &&
!g_ptr_array_find_with_equal_func (source_list, list[i],
gst_udp_source_filter_equal_func, NULL)) {
GST_DEBUG ("Add multicast-source %s", list[i]);
/* Moves ownership to array */
g_ptr_array_add (source_list, g_strdup (list[i]));
found = TRUE;
}
}
g_strfreev (list);
return found;
}

View file

@ -23,7 +23,13 @@
#ifndef __GST_UDP_NET_UTILS_H__
#define __GST_UDP_NET_UTILS_H__
gboolean gst_udp_parse_uri (const gchar *uristr, gchar **host, guint16 *port);
gboolean gst_udp_parse_uri (const gchar *uristr,
gchar **host,
guint16 *port,
GPtrArray * source_list);
gboolean gst_udp_parse_multicast_source (const gchar * multicast_source,
GPtrArray * source_list);
#endif /* __GST_UDP_NET_UTILS_H__*/

View file

@ -133,7 +133,7 @@ gst_udpsink_set_uri (GstUDPSink * sink, const gchar * uri, GError ** error)
gst_multiudpsink_remove (GST_MULTIUDPSINK (sink), sink->host, sink->port);
if (!gst_udp_parse_uri (uri, &host, &port))
if (!gst_udp_parse_uri (uri, &host, &port, NULL))
goto wrong_uri;
g_free (sink->host);

View file

@ -571,6 +571,7 @@ static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
#define UDP_DEFAULT_LOOP TRUE
#define UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS TRUE
#define UDP_DEFAULT_MTU (1492)
#define UDP_DEFAULT_MULTICAST_SOURCE NULL
enum
{
@ -594,6 +595,7 @@ enum
PROP_RETRIEVE_SENDER_ADDRESS,
PROP_MTU,
PROP_SOCKET_TIMESTAMP,
PROP_MULTICAST_SOURCE,
};
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
@ -780,6 +782,22 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass)
GST_SOCKET_TIMESTAMP_MODE, GST_SOCKET_TIMESTAMP_MODE_REALTIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstUDPSrc:multicast-source:
*
* List of source to receive the stream from. (IGMPv3 SSM RFC 4604)
*
* Since: 1.24
*/
g_object_class_install_property (gobject_class, PROP_MULTICAST_SOURCE,
g_param_spec_string ("multicast-source", "Multicast source",
"List of source to receive the stream with \'+\' (positive filter) or"
" \'-\' (negative filter, ignored for now) prefix "
"(e.g., \"+SOURCE0+SOURCE1+SOURCE2\"). Alternatively, user can use "
"URI query with the key value \"multicast-source\"",
UDP_DEFAULT_MULTICAST_SOURCE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_static_pad_template (gstelement_class, &src_template);
gst_element_class_set_static_metadata (gstelement_class,
@ -822,6 +840,8 @@ gst_udpsrc_init (GstUDPSrc * udpsrc)
udpsrc->loop = UDP_DEFAULT_LOOP;
udpsrc->retrieve_sender_address = UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS;
udpsrc->mtu = UDP_DEFAULT_MTU;
udpsrc->source_list =
g_ptr_array_new_with_free_func ((GDestroyNotify) g_free);
/* configure basesrc to be a live source */
gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
@ -864,6 +884,9 @@ gst_udpsrc_finalize (GObject * object)
gst_memory_unref (udpsrc->extra_mem);
udpsrc->extra_mem = NULL;
g_ptr_array_unref (udpsrc->source_list);
g_free (udpsrc->multicast_source);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -1240,9 +1263,16 @@ gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri, GError ** error)
{
gchar *address;
guint16 port;
gboolean source_updated = FALSE;
gchar *new_source = NULL;
if (!gst_udp_parse_uri (uri, &address, &port))
GST_OBJECT_LOCK (src);
g_ptr_array_set_size (src->source_list, 0);
if (!gst_udp_parse_uri (uri, &address, &port, src->source_list)) {
GST_OBJECT_UNLOCK (src);
goto wrong_uri;
}
if (port == (guint16) - 1)
port = UDP_DEFAULT_PORT;
@ -1251,8 +1281,33 @@ gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri, GError ** error)
src->address = address;
src->port = port;
if (src->source_list->len > 0) {
GString *str = g_string_new (NULL);
guint i;
/* FIXME: gst_udp_parse_uri() will handle only positive filters for now */
for (i = 0; i < src->source_list->len; i++) {
gchar *s = g_ptr_array_index (src->source_list, i);
g_string_append_c (str, '+');
g_string_append (str, s);
}
new_source = g_string_free (str, FALSE);
}
if (g_strcmp0 (src->multicast_source, new_source) != 0)
source_updated = TRUE;
g_free (src->multicast_source);
src->multicast_source = new_source;
g_free (src->uri);
src->uri = g_strdup (uri);
GST_OBJECT_UNLOCK (src);
if (source_updated)
g_object_notify (G_OBJECT (src), "multicast-source");
return TRUE;
@ -1375,6 +1430,17 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
case PROP_SOCKET_TIMESTAMP:
udpsrc->socket_timestamp_mode = g_value_get_enum (value);
break;
case PROP_MULTICAST_SOURCE:
GST_OBJECT_LOCK (udpsrc);
g_free (udpsrc->multicast_source);
udpsrc->multicast_source = g_value_dup_string (value);
g_ptr_array_set_size (udpsrc->source_list, 0);
if (udpsrc->multicast_source) {
gst_udp_parse_multicast_source (udpsrc->multicast_source,
udpsrc->source_list);
}
GST_OBJECT_UNLOCK (udpsrc);
break;
default:
break;
}
@ -1441,6 +1507,11 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_SOCKET_TIMESTAMP:
g_value_set_enum (value, udpsrc->socket_timestamp_mode);
break;
case PROP_MULTICAST_SOURCE:
GST_OBJECT_LOCK (udpsrc);
g_value_set_string (value, udpsrc->multicast_source);
GST_OBJECT_UNLOCK (udpsrc);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -1519,6 +1590,11 @@ gst_udpsrc_open (GstUDPSrc * src)
GSocketAddress *bind_saddr;
GError *err = NULL;
if (src->source_addrs) {
g_list_free_full (src->source_addrs, (GDestroyNotify) g_object_unref);
src->source_addrs = NULL;
}
gst_udpsrc_create_cancellable (src);
if (src->socket == NULL) {
@ -1658,19 +1734,48 @@ gst_udpsrc_open (GstUDPSrc * src)
&&
g_inet_address_get_is_multicast (g_inet_socket_address_get_address
(src->addr))) {
guint i;
GList *iter;
for (i = 0; i < src->source_list->len; i++) {
gchar *source_addr_str = g_ptr_array_index (src->source_list, i);
GInetAddress *source_addr = gst_udpsrc_resolve (src, source_addr_str);
if (!source_addr) {
GST_WARNING_OBJECT (src, "Couldn't resolve address %s",
source_addr_str);
} else {
GST_DEBUG_OBJECT (src, "Adding multicast-source %s", source_addr_str);
src->source_addrs = g_list_append (src->source_addrs, source_addr);
}
}
if (src->multi_iface) {
GStrv multi_ifaces = g_strsplit (src->multi_iface, ",", -1);
gchar **ifaces = multi_ifaces;
while (*ifaces) {
g_strstrip (*ifaces);
GST_DEBUG_OBJECT (src, "joining multicast group %s interface %s",
src->address, *ifaces);
if (!g_socket_join_multicast_group (src->used_socket,
g_inet_socket_address_get_address (src->addr),
FALSE, *ifaces, &err)) {
g_strfreev (multi_ifaces);
goto membership;
if (!src->source_addrs) {
if (!g_socket_join_multicast_group (src->used_socket,
g_inet_socket_address_get_address (src->addr),
FALSE, *ifaces, &err)) {
g_strfreev (multi_ifaces);
goto membership;
}
} else {
for (iter = src->source_addrs; iter; iter = g_list_next (iter)) {
GInetAddress *source_addr = (GInetAddress *) iter->data;
if (!g_socket_join_multicast_group_ssm (src->used_socket,
g_inet_socket_address_get_address (src->addr),
source_addr, *ifaces, &err)) {
g_strfreev (multi_ifaces);
goto membership;
}
}
}
ifaces++;
@ -1678,9 +1783,23 @@ gst_udpsrc_open (GstUDPSrc * src)
g_strfreev (multi_ifaces);
} else {
GST_DEBUG_OBJECT (src, "joining multicast group %s", src->address);
if (!g_socket_join_multicast_group (src->used_socket,
g_inet_socket_address_get_address (src->addr), FALSE, NULL, &err))
goto membership;
if (!src->source_addrs) {
if (!g_socket_join_multicast_group (src->used_socket,
g_inet_socket_address_get_address (src->addr), FALSE, NULL,
&err)) {
goto membership;
}
} else {
for (iter = src->source_addrs; iter; iter = g_list_next (iter)) {
GInetAddress *source_addr = (GInetAddress *) iter->data;
if (!g_socket_join_multicast_group_ssm (src->used_socket,
g_inet_socket_address_get_address (src->addr),
source_addr, NULL, &err)) {
goto membership;
}
}
}
}
if (g_inet_address_get_family (g_inet_socket_address_get_address
@ -1857,6 +1976,7 @@ gst_udpsrc_close (GstUDPSrc * src)
g_inet_address_get_is_multicast (g_inet_socket_address_get_address
(src->addr))) {
GError *err = NULL;
GList *iter;
if (src->multi_iface) {
GStrv multi_ifaces = g_strsplit (src->multi_iface, ",", -1);
@ -1865,25 +1985,55 @@ gst_udpsrc_close (GstUDPSrc * src)
g_strstrip (*ifaces);
GST_DEBUG_OBJECT (src, "leaving multicast group %s interface %s",
src->address, *ifaces);
if (!g_socket_leave_multicast_group (src->used_socket,
g_inet_socket_address_get_address (src->addr),
FALSE, *ifaces, &err)) {
GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
err->message);
g_clear_error (&err);
if (!src->source_addrs) {
if (!g_socket_leave_multicast_group (src->used_socket,
g_inet_socket_address_get_address (src->addr),
FALSE, *ifaces, &err)) {
GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
err->message);
g_clear_error (&err);
}
} else {
for (iter = src->source_addrs; iter; iter = g_list_next (iter)) {
GInetAddress *source_addr = (GInetAddress *) iter->data;
if (!g_socket_leave_multicast_group_ssm (src->used_socket,
g_inet_socket_address_get_address (src->addr),
source_addr, *ifaces, &err)) {
GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
err->message);
g_clear_error (&err);
}
}
}
ifaces++;
}
g_strfreev (multi_ifaces);
} else {
GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->address);
if (!g_socket_leave_multicast_group (src->used_socket,
g_inet_socket_address_get_address (src->addr), FALSE,
NULL, &err)) {
GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
err->message);
g_clear_error (&err);
if (!src->source_addrs) {
if (!g_socket_leave_multicast_group (src->used_socket,
g_inet_socket_address_get_address (src->addr), FALSE,
NULL, &err)) {
GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
err->message);
g_clear_error (&err);
}
} else {
for (iter = src->source_addrs; iter; iter = g_list_next (iter)) {
GInetAddress *source_addr = (GInetAddress *) iter->data;
if (!g_socket_leave_multicast_group_ssm (src->used_socket,
g_inet_socket_address_get_address (src->addr),
source_addr, NULL, &err)) {
GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
err->message);
g_clear_error (&err);
}
}
}
}
}
@ -1896,6 +2046,11 @@ gst_udpsrc_close (GstUDPSrc * src)
}
}
if (src->source_addrs) {
g_list_free_full (src->source_addrs, (GDestroyNotify) g_object_unref);
src->source_addrs = NULL;
}
g_object_unref (src->used_socket);
src->used_socket = NULL;
g_object_unref (src->addr);

View file

@ -65,6 +65,7 @@ struct _GstUDPSrc {
/* our sockets */
GSocket *used_socket; /* hot */
GInetSocketAddress *addr; /* hot */
GList *source_addrs;
GCancellable *cancellable; /* hot */
@ -83,6 +84,7 @@ struct _GstUDPSrc {
gboolean reuse;
gboolean loop;
GstSocketTimestampMode socket_timestamp_mode;
gchar *multicast_source;
/* stats */
guint max_size;
@ -97,6 +99,7 @@ struct _GstUDPSrc {
GstMemory *extra_mem;
gchar *uri;
GPtrArray *source_list;
};
struct _GstUDPSrcClass {

View file

@ -247,6 +247,55 @@ send_failure:
GST_END_TEST;
static void
on_multicast_source_updated (GObject * src, GParamSpec * pspec, guint * count)
{
*count += 1;
}
GST_START_TEST (test_udpsrc_multicast_source)
{
GstElement *src;
guint count = 0;
gchar *multicast_source = NULL;
src = gst_check_setup_element ("udpsrc");
g_signal_connect (G_OBJECT (src), "notify::multicast-source",
(GCallback) on_multicast_source_updated, &count);
/* Set uri without multicast-source */
g_object_set (src, "uri", "udp://127.0.0.1:5004", NULL);
fail_unless_equals_int (count, 0);
g_object_get (src, "multicast-source", &multicast_source, NULL);
fail_unless (multicast_source == NULL);
/* Sets source filter explicitly */
g_object_set (src, "multicast-source", "+127.0.0.2+127.0.0.3", NULL);
fail_unless_equals_int (count, 1);
g_object_get (src, "multicast-source", &multicast_source, NULL);
fail_unless_equals_string (multicast_source, "+127.0.0.2+127.0.0.3");
g_clear_pointer (&multicast_source, g_free);
/* Uri with source filters */
g_object_set (src, "uri", "udp://127.0.0.1:5004?multicast-source=+127.0.0.2",
NULL);
fail_unless_equals_int (count, 2);
g_object_get (src, "multicast-source", &multicast_source, NULL);
fail_unless_equals_string (multicast_source, "+127.0.0.2");
g_clear_pointer (&multicast_source, g_free);
/* New uri will reset source filters */
g_object_set (src, "uri", "udp://127.0.0.1:5004", NULL);
fail_unless_equals_int (count, 3);
g_object_get (src, "multicast-source", &multicast_source, NULL);
fail_unless (multicast_source == NULL);
gst_object_unref (src);
}
GST_END_TEST;
static Suite *
udpsrc_suite (void)
{
@ -256,6 +305,8 @@ udpsrc_suite (void)
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_udpsrc_empty_packet);
tcase_add_test (tc_chain, test_udpsrc);
tcase_add_test (tc_chain, test_udpsrc_multicast_source);
return s;
}