diff --git a/Cargo.lock b/Cargo.lock index a9f734e6..9dabf278 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2203,6 +2203,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getifaddrs" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5145f1db081f9c1bd68384b4602d701efbd1b1fb8fa1a4938bd8eebfaa34ea76" +dependencies = [ + "bitflags 2.6.0", + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -3066,6 +3077,7 @@ dependencies = [ "concurrent-queue", "flume", "futures", + "getifaddrs", "gio", "gst-plugin-version-helper", "gstreamer", @@ -3074,6 +3086,7 @@ dependencies = [ "gstreamer-check", "gstreamer-net", "gstreamer-rtp", + "libc", "pin-project-lite", "pkg-config", "polling", @@ -3083,6 +3096,7 @@ dependencies = [ "socket2", "waker-fn", "winapi", + "windows-sys 0.59.0", ] [[package]] diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index b5944592..edc6b65c 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -14220,6 +14220,18 @@ "type": "gboolean", "writable": true }, + "multicast-iface": { + "blurb": "The network interface on which to join the multicast group. (Supports only single interface)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, "qos-dscp": { "blurb": "Quality of Service, differentiated services code point (-1 default)", "conditionally-available": false, @@ -14468,6 +14480,18 @@ "type": "guint", "writable": true }, + "multicast-iface": { + "blurb": "The network interface on which to join the multicast group. This allows multiple interfaces\n separated by comma. (\"eth0,eth1\")", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, "port": { "blurb": "Port to listen on", "conditionally-available": false, diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index fda531ac..9bff3dfe 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -26,6 +26,9 @@ rustix = { version = "0.38.2", default-features = false, features = ["std", "fs" slab = "0.4.7" socket2 = {features = ["all"], version = "0.5"} waker-fn = "1.1" +getifaddrs = "0.1" +libc = "0.2" +windows-sys = "0.59.0" # Used by examples clap = { version = "4", features = ["derive"], optional = true } diff --git a/generic/threadshare/src/lib.rs b/generic/threadshare/src/lib.rs index 35f2e5a8..808fb29b 100644 --- a/generic/threadshare/src/lib.rs +++ b/generic/threadshare/src/lib.rs @@ -28,6 +28,8 @@ mod tcpclientsrc; mod udpsink; mod udpsrc; +pub mod net; + use gst::glib; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { diff --git a/generic/threadshare/src/net.rs b/generic/threadshare/src/net.rs new file mode 100644 index 00000000..d9b4a28f --- /dev/null +++ b/generic/threadshare/src/net.rs @@ -0,0 +1,371 @@ +// GStreamer +// +// Copyright (C) 2015-2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use getifaddrs::Interface; + +#[cfg(unix)] +pub mod imp { + use super::*; + + use std::{io, mem, net::UdpSocket, os::unix::io::AsRawFd}; + + use libc::{ + in_addr, ip_mreqn, setsockopt, IPPROTO_IP, IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP, + IP_MULTICAST_IF, + }; + + #[cfg(target_os = "macos")] + use libc::ip_mreq; + + #[cfg(any(target_os = "solaris", target_os = "illumos", target_os = "macos"))] + use std::net::IpAddr; + use std::net::Ipv4Addr; + + /// Join multicast address for a given interface. + pub fn join_multicast_v4( + socket: &UdpSocket, + addr: &Ipv4Addr, + iface: &Interface, + ) -> Result<(), io::Error> { + multicast_group_operation_v4(socket, addr, iface, true) + } + + /// Leave multicast address for a given interface. + pub fn leave_multicast_v4( + socket: &UdpSocket, + addr: &Ipv4Addr, + iface: &Interface, + ) -> Result<(), io::Error> { + multicast_group_operation_v4(socket, addr, iface, false) + } + + fn multicast_group_operation_v4( + socket: &UdpSocket, + addr: &Ipv4Addr, + iface: &Interface, + join: bool, + ) -> Result<(), io::Error> { + let index = iface.index.unwrap_or(0); + + #[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "macos")))] + { + let group_op: i32 = if join { + IP_ADD_MEMBERSHIP + } else { + IP_DROP_MEMBERSHIP + }; + + let mreqn = ip_mreqn { + imr_multiaddr: in_addr { + s_addr: u32::from_ne_bytes(addr.octets()), + }, + imr_address: in_addr { + s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()), + }, + imr_ifindex: index as _, + }; + + // SAFETY: Requires a valid ip_mreq or ip_mreqn struct to be passed together + // with its size for checking which of the two it is. On errors a negative + // integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + group_op, + &mreqn as *const _ as *const _, + mem::size_of_val(&mreqn) as _, + ) < 0 + { + return Err(io::Error::last_os_error()); + } + } + + #[cfg(not(any(target_os = "openbsd", target_os = "dragonfly", target_os = "netbsd")))] + { + let mreqn = ip_mreqn { + imr_multiaddr: in_addr { + s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()), + }, + imr_address: in_addr { + s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()), + }, + imr_ifindex: index as _, + }; + + // SAFETY: Requires a valid ip_mreq or ip_mreqn struct to be passed together + // with its size for checking which of the two it is. On errors a negative + // integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + IP_MULTICAST_IF, + &mreqn as *const _ as *const _, + mem::size_of_val(&mreqn) as _, + ) < 0 + { + return Err(io::Error::last_os_error()); + } + } + } + #[cfg(any(target_os = "openbsd", target_os = "dragonfly"))] + { + let addr = in_addr { + s_addr: u32::from_ne_bytes(ip_addr.octets()), + }; + + // SAFETY: Requires a valid in_addr struct to be passed together with its size for + // checking which of the two it is. On errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + IP_MULTICAST_IF, + &addr as *const _ as *const _, + mem::size_of_val(&addr) as _, + ) < 0 + { + return Err(io::Error::last_os_error()); + } + } + } + #[cfg(target_os = "netbsd")] + { + let idx = (index as u32).to_be(); + + // SAFETY: Requires a valid in_addr struct or interface index in network byte order + // to be passed together with its size for checking which of the two it is. On + // errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + IP_MULTICAST_IF, + &idx as *const _ as *const _, + mem::size_of_val(&idx) as _, + ) < 0 + { + return Err(io::Error::last_os_error()); + } + } + } + + Ok(()) + } + + #[cfg(any(target_os = "solaris", target_os = "illumos"))] + { + let ip_addr = match iface.address { + IpAddr::V4(ipv4_addr) => ipv4_addr, + IpAddr::V6(_) => return Err(io::Error::other("Interface address is IPv6")), + }; + + if join { + socket.join_multicast_v4(addr, &ip_addr).with_context(|| { + format!( + "Failed joining multicast group for interface {} at address {}", + iface.name, ip_addr + ) + })?; + } else { + socket.leave_multicast_v4(addr, &ip_addr).with_context(|| { + format!( + "Failed leave multicast group for interface {} at address {}", + iface.name, ip_addr + ) + })?; + } + + // SAFETY: Requires a valid in_addr struct to be passed together with its size for + // checking which of the two it is. On errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + IP_MULTICAST_IF, + &addr as *const _ as *const _, + mem::size_of_val(&addr) as _, + ) < 0 + { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) + } + + #[cfg(target_os = "macos")] + { + let ip_addr = match iface.address { + IpAddr::V4(ipv4_addr) => ipv4_addr, + IpAddr::V6(_) => return Err(io::Error::other("Interface address is IPv6")), + }; + + let mreq = ip_mreq { + imr_multiaddr: in_addr { + s_addr: u32::from_ne_bytes(addr.octets()), + }, + imr_address: in_addr { + s_addr: u32::from_ne_bytes(ip_addr.octets()), + }, + }; + + let mreqn = ip_mreqn { + imr_multiaddr: in_addr { + s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()), + }, + imr_address: in_addr { + s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()), + }, + imr_ifindex: index as _, + }; + + let group_op: i32 = if join { + IP_ADD_MEMBERSHIP + } else { + IP_DROP_MEMBERSHIP + }; + + // SAFETY: Requires a valid ip_mreq struct to be passed together with its size for checking + // validity. On errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + group_op, + &mreq as *const _ as *const _, + mem::size_of_val(&mreq) as _, + ) < 0 + { + return Err(io::Error::last_os_error()); + } + } + + // SAFETY: Requires a valid ip_mreqn struct to be passed together + // with its size for checking which of the two it is. On errors a negative + // integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + IP_MULTICAST_IF, + &mreqn as *const _ as *const _, + mem::size_of_val(&mreqn) as _, + ) < 0 + { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) + } + } +} + +#[cfg(windows)] +pub mod imp { + use super::*; + + use std::{ + io, mem, + net::{Ipv4Addr, UdpSocket}, + os::windows::io::AsRawSocket, + }; + + use windows_sys::Win32::Networking::WinSock::{ + setsockopt, WSAGetLastError, IN_ADDR, IN_ADDR_0, IPPROTO_IP, IP_ADD_MEMBERSHIP, + IP_DROP_MEMBERSHIP, IP_MREQ, IP_MULTICAST_IF, + }; + + /// Join multicast address for a given interface. + pub fn join_multicast_v4( + socket: &UdpSocket, + addr: &Ipv4Addr, + iface: &Interface, + ) -> Result<(), io::Error> { + multicast_group_operation_v4(socket, addr, iface, IP_ADD_MEMBERSHIP) + // let ip_addr = Ipv4Addr::new(0, 0, 0, iface.index.unwrap() as u8); + // socket.join_multicast_v4(addr, &ip_addr).unwrap(); + // return Ok(()); + } + + /// Leave multicast address for a given interface. + pub fn leave_multicast_v4( + socket: &UdpSocket, + addr: &Ipv4Addr, + iface: &Interface, + ) -> Result<(), io::Error> { + multicast_group_operation_v4(socket, addr, iface, IP_DROP_MEMBERSHIP) + // let ip_addr = Ipv4Addr::new(0, 0, 0, iface.index.unwrap() as u8); + // socket.leave_multicast_v4(addr, &ip_addr).unwrap(); + // return Ok(()); + } + + fn multicast_group_operation_v4( + socket: &UdpSocket, + addr: &Ipv4Addr, + iface: &Interface, + group_op: i32, + ) -> Result<(), io::Error> { + let index = iface.index.unwrap_or(0); + + let mreq = IP_MREQ { + imr_multiaddr: IN_ADDR { + S_un: IN_ADDR_0 { + S_addr: u32::from_ne_bytes(addr.octets()), + }, + }, + imr_interface: IN_ADDR { + S_un: IN_ADDR_0 { + S_addr: u32::from_ne_bytes(Ipv4Addr::new(0, 0, 0, index as u8).octets()), + }, + }, + }; + + // SAFETY: Requires a valid ip_mreq struct to be passed together with its size for checking + // validity. On errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_socket() as usize, + IPPROTO_IP, + group_op, + &mreq as *const _ as *const _, + mem::size_of_val(&mreq) as _, + ) < 0 + { + return Err(io::Error::from_raw_os_error(WSAGetLastError())); + } + } + + let ip_addr = IN_ADDR { + S_un: IN_ADDR_0 { + S_addr: u32::from_ne_bytes(Ipv4Addr::new(0, 0, 0, index as u8).octets()), + }, + }; + + // SAFETY: Requires a valid IN_ADDR struct to be passed together with its size for checking + // which of the two it is. On errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_socket() as usize, + IPPROTO_IP, + IP_MULTICAST_IF, + &ip_addr as *const _ as *const _, + mem::size_of_val(&ip_addr) as _, + ) < 0 + { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) + } +} diff --git a/generic/threadshare/src/socket.rs b/generic/threadshare/src/socket.rs index c79320ef..9c0e62c9 100644 --- a/generic/threadshare/src/socket.rs +++ b/generic/threadshare/src/socket.rs @@ -99,6 +99,10 @@ impl Socket { self.clock = clock; self.base_time = base_time; } + + pub fn get(&self) -> &T { + &self.reader + } } #[derive(Debug)] diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index 97fb498b..dae1a2c0 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -28,13 +28,14 @@ use gst::{element_error, error_msg}; use std::sync::LazyLock; +use crate::net; use crate::runtime::executor::block_on_or_add_sub_task; use crate::runtime::prelude::*; use crate::runtime::{self, Async, Context, PadSink}; use crate::socket::{wrap_socket, GioSocketWrapper}; use std::collections::BTreeSet; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -57,6 +58,7 @@ const DEFAULT_QOS_DSCP: i32 = -1; const DEFAULT_CLIENTS: &str = ""; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; +const DEFAULT_MULTICAST_IFACE: Option<&str> = None; #[derive(Debug, Clone, Copy)] struct SocketConf { @@ -92,6 +94,7 @@ struct Settings { qos_dscp: i32, context: String, context_wait: Duration, + multicast_iface: Option, } impl Default for Settings { @@ -110,6 +113,7 @@ impl Default for Settings { qos_dscp: DEFAULT_QOS_DSCP, context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, + multicast_iface: DEFAULT_MULTICAST_IFACE.map(Into::into), } } } @@ -128,7 +132,7 @@ struct UdpSinkPadHandler(Arc>); impl UdpSinkPadHandler { fn prepare( &self, - _imp: &UdpSink, + imp: &UdpSink, socket: Option>, socket_v6: Option>, settings: &Settings, @@ -141,6 +145,58 @@ impl UdpSinkPadHandler { inner.socket = socket; inner.socket_v6 = socket_v6; + if let Some(multicast_iface) = &settings.multicast_iface { + gst::debug!( + CAT, + imp = imp, + "searching for interface: {}", + multicast_iface + ); + + // The 'InterfaceFilter::name' only checks for the 'name' field , it does not check + // whether the given name with the interface 'description' (Friendly Name) on Windows + + // So we first get all the interfaces and then apply filter + // for name and description (Friendly Name) of each interface. + + let ifaces = getifaddrs::getifaddrs().map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to find interface {}: {}", multicast_iface, err] + ) + })?; + + let iface_filter = ifaces.filter(|i| { + let ip_ver = if i.address.is_ipv4() { "IPv4" } else { "IPv6" }; + + if &i.name == multicast_iface { + gst::debug!( + CAT, + imp = imp, + "Found interface: {}, version: {ip_ver}", + i.name, + ); + true + } else { + #[cfg(windows)] + if &i.description == multicast_iface { + gst::debug!( + CAT, + imp = imp, + "Found interface: {}, version: {ip_ver}", + i.description, + ); + return true; + } + + gst::trace!(CAT, imp = imp, "skipping interface {}", i.name); + false + } + }); + + inner.multicast_ifaces = iface_filter.collect(); + } + for addr in inner.clients.iter() { inner.configure_client(addr)?; } @@ -365,6 +421,7 @@ struct UdpSinkPadHandlerInner { clients: BTreeSet, socket_conf: SocketConf, segment: Option, + multicast_ifaces: Vec, } impl Default for UdpSinkPadHandlerInner { @@ -381,6 +438,7 @@ impl Default for UdpSinkPadHandlerInner { )]), socket_conf: Default::default(), segment: None, + multicast_ifaces: Vec::::new(), } } } @@ -391,62 +449,103 @@ impl UdpSinkPadHandlerInner { if client.ip().is_multicast() { match client.ip() { IpAddr::V4(addr) => { - if let Some(socket) = self.socket.as_ref() { - if self.socket_conf.auto_multicast { + let Some(socket) = self.socket.as_ref() else { + return Ok(()); + }; + + if self.socket_conf.auto_multicast { + for iface in &self.multicast_ifaces { + if !iface.address.is_ipv4() { + gst::debug!( + CAT, + "Skipping the IPv6 version of the interface {}", + iface.name + ); + continue; + } + + gst::debug!(CAT, "interface {} joining the multicast", iface.name); + net::imp::join_multicast_v4(socket.as_ref(), &addr, iface).map_err( + |err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to join multicast group on iface {} for {:?}: {}", + iface.name, + client, + err + ] + ) + }, + )?; + } + } + + if self.socket_conf.multicast_loop { + socket.as_ref().set_multicast_loop_v4(true).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop for {:?}: {}", client, err] + ) + })?; + } + + socket + .as_ref() + .set_multicast_ttl_v4(self.socket_conf.ttl_mc) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast ttl for {:?}: {}", client, err] + ) + })?; + } + IpAddr::V6(addr) => { + let Some(socket) = self.socket.as_ref() else { + return Err(error_msg!( + gst::ResourceError::OpenWrite, + ["Socket not available"] + )); + }; + + if self.socket_conf.auto_multicast { + for iface in &self.multicast_ifaces { + if !iface.address.is_ipv6() { + gst::debug!( + CAT, + "Skipping the IPv4 version of the interface {}", + iface.name + ); + continue; + } + + gst::debug!(CAT, "interface {} joining the multicast", iface.name); socket .as_ref() - .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) + .join_multicast_v6(&addr, iface.index.unwrap_or(0)) .map_err(|err| { error_msg!( gst::ResourceError::OpenWrite, [ - "Failed to join multicast group for {:?}: {}", + "Failed to join multicast group on iface {} for {:?}: {}", + iface.name, client, err ] ) })?; } - if self.socket_conf.multicast_loop { - socket.as_ref().set_multicast_loop_v4(true).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast loop for {:?}: {}", client, err] - ) - })?; - } + } - socket - .as_ref() - .set_multicast_ttl_v4(self.socket_conf.ttl_mc) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast ttl for {:?}: {}", client, err] - ) - })?; - } - } - IpAddr::V6(addr) => { - if let Some(socket) = self.socket_v6.as_ref() { - if self.socket_conf.auto_multicast { - socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group for {:?}: {}", client, err] - ) - })?; - } - if self.socket_conf.multicast_loop { - socket.as_ref().set_multicast_loop_v6(true).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast loop for {:?}: {}", client, err] - ) - })?; - } - /* FIXME no API for set_multicast_ttl_v6 ? */ + if self.socket_conf.multicast_loop { + socket.as_ref().set_multicast_loop_v6(true).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop for {:?}: {}", client, err] + ) + })?; } + /* FIXME no API for set_multicast_ttl_v6 ? */ } } } else { @@ -487,12 +586,27 @@ impl UdpSinkPadHandlerInner { if client.ip().is_multicast() { match client.ip() { IpAddr::V4(addr) => { - if let Some(socket) = self.socket.as_ref() { - if self.socket_conf.auto_multicast { - socket - .as_ref() - .leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { + let Some(socket) = self.socket.as_ref() else { + return Err(error_msg!( + gst::ResourceError::OpenWrite, + ["Socket not available"] + )); + }; + + if self.socket_conf.auto_multicast { + for iface in &self.multicast_ifaces { + if !iface.address.is_ipv4() { + gst::debug!( + CAT, + "Skipping the IPv6 version of the interface {}", + iface.name + ); + continue; + } + + gst::debug!(CAT, "interface {} leaving the multicast", iface.name); + net::imp::leave_multicast_v4(socket.as_ref(), &addr, iface).map_err( + |err| { error_msg!( gst::ResourceError::OpenWrite, [ @@ -501,16 +615,34 @@ impl UdpSinkPadHandlerInner { err ] ) - })?; + }, + )?; } } } IpAddr::V6(addr) => { - if let Some(socket) = self.socket_v6.as_ref() { - if self.socket_conf.auto_multicast { + let Some(socket) = self.socket.as_ref() else { + return Err(error_msg!( + gst::ResourceError::OpenWrite, + ["Socket not available"] + )); + }; + + if self.socket_conf.auto_multicast { + for iface in &self.multicast_ifaces { + if !iface.address.is_ipv6() { + gst::debug!( + CAT, + "Skipping the IPv4 version of the interface {}", + iface.name + ); + continue; + } + + gst::debug!(CAT, "interface {} leaving the multicast", iface.name); socket .as_ref() - .leave_multicast_v6(&addr, 0) + .leave_multicast_v6(&addr, iface.index.unwrap_or(0)) .map_err(|err| { error_msg!( gst::ResourceError::OpenWrite, @@ -952,6 +1084,11 @@ impl ObjectImpl for UdpSink { .blurb("A comma separated list of host:port pairs with destinations") .default_value(Some(DEFAULT_CLIENTS)) .build(), + glib::ParamSpecString::builder("multicast-iface") + .nick("Multicast Interface") + .blurb("The network interface on which to join the multicast group. (Supports only single interface)") + .default_value(DEFAULT_MULTICAST_IFACE) + .build(), ] }); @@ -1119,6 +1256,9 @@ impl ObjectImpl for UdpSink { value.get::().expect("type checked upstream").into(), ); } + "multicast-iface" => { + settings.multicast_iface = value.get().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -1164,6 +1304,7 @@ impl ObjectImpl for UdpSink { } "context" => settings.context.to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), + "multicast-iface" => settings.multicast_iface.to_value(), _ => unimplemented!(), } } diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index bff3eb02..14a71393 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -35,6 +35,7 @@ use std::time::Duration; use crate::runtime::prelude::*; use crate::runtime::{task, Async, Context, PadSrc, Task, TaskState}; +use crate::net; use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead}; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::pin_mut; @@ -51,6 +52,7 @@ const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true; const DEFAULT_MULTICAST_LOOP: bool = true; const DEFAULT_BUFFER_SIZE: u32 = 0; +const DEFAULT_MULTICAST_IFACE: Option<&str> = None; #[derive(Debug, Default)] struct State { @@ -71,6 +73,7 @@ struct Settings { retrieve_sender_address: bool, multicast_loop: bool, buffer_size: u32, + multicast_iface: Option, } impl Default for Settings { @@ -88,6 +91,7 @@ impl Default for Settings { retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS, multicast_loop: DEFAULT_MULTICAST_LOOP, buffer_size: DEFAULT_BUFFER_SIZE, + multicast_iface: DEFAULT_MULTICAST_IFACE.map(Into::into), } } } @@ -194,6 +198,8 @@ struct UdpSrcTask { need_initial_events: bool, need_segment: bool, event_receiver: Receiver, + multicast_ifaces: Vec, + multicast_addr: Option, } impl UdpSrcTask { @@ -205,6 +211,8 @@ impl UdpSrcTask { need_initial_events: true, need_segment: true, event_receiver, + multicast_ifaces: Vec::::new(), + multicast_addr: None, } } } @@ -258,7 +266,10 @@ impl TaskImpl for UdpSrcTask { ["Invalid address '{}' set: {}", addr, err] )); } - Ok(addr) => addr, + Ok(addr) => { + self.multicast_addr = Some(addr); + addr + } }, }; let port = settings.port; @@ -363,18 +374,91 @@ impl TaskImpl for UdpSrcTask { })?; if addr.is_multicast() { - // TODO: Multicast interface configuration, going to be tricky + if let Some(multicast_iface) = &settings.multicast_iface { + let multi_ifaces: Vec = + multicast_iface.split(',').map(|s| s.to_string()).collect(); + + let iter = getifaddrs::getifaddrs().map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to get interfaces: {}", err] + ) + })?; + + iter.for_each(|iface| { + let ip_ver = if iface.address.is_ipv4() { + "IPv4" + } else { + "IPv6" + }; + + for m in &multi_ifaces { + if &iface.name == m { + self.multicast_ifaces.push(iface.clone()); + gst::debug!( + CAT, + obj = self.element, + "Interface {m} available, version: {ip_ver}" + ); + } else { + // check if name matches the interface description (Friendly name) on Windows + #[cfg(windows)] + if &iface.description == m { + self.multicast_ifaces.push(iface.clone()); + gst::debug!( + CAT, + obj = self.element, + "Interface {m} available, version: {ip_ver}" + ); + } + } + } + }); + } + + if self.multicast_ifaces.is_empty() { + gst::warning!( + CAT, + obj = self.element, + "No suitable network interfaces found, adding default iface" + ); + + self.multicast_ifaces.push(getifaddrs::Interface { + name: "default".to_owned(), + #[cfg(windows)] + description: "default".to_owned(), + address: IpAddr::V4(Ipv4Addr::UNSPECIFIED), + #[cfg(not(windows))] + associated_address: None, + netmask: None, + flags: getifaddrs::InterfaceFlags::UP, + index: Some(0), + }); + } + match addr { IpAddr::V4(addr) => { - socket - .as_ref() - .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to join multicast group: {}", err] - ) - })?; + for iface in &self.multicast_ifaces { + if !iface.address.is_ipv4() { + gst::debug!( + CAT, + "Skipping the IPv6 version of the interface {}", + iface.name + ); + continue; + } + + gst::debug!(CAT, "interface {} joining the multicast", iface.name); + // use the custom written API to be able to pass the interface index + // for all types of target OS + net::imp::join_multicast_v4(socket.as_ref(), &addr, iface) + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to join multicast group: {}", err] + ) + })?; + } socket .as_ref() @@ -391,12 +475,27 @@ impl TaskImpl for UdpSrcTask { })?; } IpAddr::V6(addr) => { - socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to join multicast group: {}", err] - ) - })?; + for iface in &self.multicast_ifaces { + if !iface.address.is_ipv6() { + gst::debug!( + CAT, + "Skipping the IPv4 version of the interface {}", + iface.name + ); + continue; + } + + gst::debug!(CAT, "interface {} joining the multicast", iface.name); + socket + .as_ref() + .join_multicast_v6(&addr, iface.index.unwrap_or(0)) + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to join multicast group: {}", err] + ) + })?; + } socket .as_ref() @@ -466,6 +565,47 @@ impl TaskImpl for UdpSrcTask { async move { gst::debug!(CAT, obj = self.element, "Unpreparing Task"); let udpsrc = self.element.imp(); + if let Some(reader) = &self.socket { + let socket = &reader.get().0; + if let Some(addr) = self.multicast_addr { + match addr { + IpAddr::V4(addr) => { + for iface in &self.multicast_ifaces { + if !iface.address.is_ipv4() { + gst::debug!( + CAT, + "Skipping the IPv6 version of the interface {}", + iface.name + ); + continue; + } + + gst::debug!(CAT, "interface {} leaving the multicast", iface.name); + net::imp::leave_multicast_v4(socket.as_ref(), &addr, iface) + .unwrap(); + } + } + IpAddr::V6(addr) => { + for iface in &self.multicast_ifaces { + if !iface.address.is_ipv6() { + gst::debug!( + CAT, + "Skipping the IPv4 version of the interface {}", + iface.name + ); + continue; + } + + gst::debug!(CAT, "interface {} leaving the multicast", iface.name); + socket + .as_ref() + .leave_multicast_v6(&addr, iface.index.unwrap_or(0)) + .unwrap(); + } + } + } + } + } udpsrc.settings.lock().unwrap().used_socket = None; self.element.notify("used-socket"); } @@ -825,6 +965,12 @@ impl ObjectImpl for UdpSrc { .maximum(u32::MAX) .default_value(DEFAULT_BUFFER_SIZE) .build(), + glib::ParamSpecString::builder("multicast-iface") + .nick("Multicast Interface") + .blurb("The network interface on which to join the multicast group. This allows multiple interfaces + separated by comma. (\"eth0,eth1\")") + .default_value(DEFAULT_MULTICAST_IFACE) + .build(), ]; @@ -898,6 +1044,9 @@ impl ObjectImpl for UdpSrc { "buffer-size" => { settings.buffer_size = value.get().expect("type checked upstream"); } + "multicast-iface" => { + settings.multicast_iface = value.get().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -925,6 +1074,7 @@ impl ObjectImpl for UdpSrc { "retrieve-sender-address" => settings.retrieve_sender_address.to_value(), "loop" => settings.multicast_loop.to_value(), "buffer-size" => settings.buffer_size.to_value(), + "multicast-iface" => settings.multicast_iface.to_value(), _ => unimplemented!(), } }