threadshare: udp: add multicast-iface property

similar to the non threadshare counterparts, the ts-udpsink can accept
only one multicast interface and the ts-udpsrc can accept a list of
interfaces to be listening on for the multicast.

Use the getifaddrs crate to get the available network interfaces and filter
the desired interfaces from the available interfaces

Reuse a custom api written for PTP helper to join and leave multicast group
for IPv4 based addresses. Continue to use the UdpSocket crate's _multicast_v6
to join/leave an IPv6 multicast group

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1420>
This commit is contained in:
Taruntej Kanakamalla 2024-07-15 20:08:36 +05:30 committed by GStreamer Marge Bot
parent 580e06a002
commit b16379d00b
8 changed files with 780 additions and 71 deletions

14
Cargo.lock generated
View file

@ -2203,6 +2203,17 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "getifaddrs"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5145f1db081f9c1bd68384b4602d701efbd1b1fb8fa1a4938bd8eebfaa34ea76"
dependencies = [
"bitflags 2.6.0",
"libc",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.15" version = "0.2.15"
@ -3066,6 +3077,7 @@ dependencies = [
"concurrent-queue", "concurrent-queue",
"flume", "flume",
"futures", "futures",
"getifaddrs",
"gio", "gio",
"gst-plugin-version-helper", "gst-plugin-version-helper",
"gstreamer", "gstreamer",
@ -3074,6 +3086,7 @@ dependencies = [
"gstreamer-check", "gstreamer-check",
"gstreamer-net", "gstreamer-net",
"gstreamer-rtp", "gstreamer-rtp",
"libc",
"pin-project-lite", "pin-project-lite",
"pkg-config", "pkg-config",
"polling", "polling",
@ -3083,6 +3096,7 @@ dependencies = [
"socket2", "socket2",
"waker-fn", "waker-fn",
"winapi", "winapi",
"windows-sys 0.59.0",
] ]
[[package]] [[package]]

View file

@ -14220,6 +14220,18 @@
"type": "gboolean", "type": "gboolean",
"writable": true "writable": true
}, },
"multicast-iface": {
"blurb": "The network interface on which to join the multicast group. (Supports only single interface)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"qos-dscp": { "qos-dscp": {
"blurb": "Quality of Service, differentiated services code point (-1 default)", "blurb": "Quality of Service, differentiated services code point (-1 default)",
"conditionally-available": false, "conditionally-available": false,
@ -14468,6 +14480,18 @@
"type": "guint", "type": "guint",
"writable": true "writable": true
}, },
"multicast-iface": {
"blurb": "The network interface on which to join the multicast group. This allows multiple interfaces\n separated by comma. (\"eth0,eth1\")",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"port": { "port": {
"blurb": "Port to listen on", "blurb": "Port to listen on",
"conditionally-available": false, "conditionally-available": false,

View file

@ -26,6 +26,9 @@ rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"
slab = "0.4.7" slab = "0.4.7"
socket2 = {features = ["all"], version = "0.5"} socket2 = {features = ["all"], version = "0.5"}
waker-fn = "1.1" waker-fn = "1.1"
getifaddrs = "0.1"
libc = "0.2"
windows-sys = "0.59.0"
# Used by examples # Used by examples
clap = { version = "4", features = ["derive"], optional = true } clap = { version = "4", features = ["derive"], optional = true }

View file

@ -28,6 +28,8 @@ mod tcpclientsrc;
mod udpsink; mod udpsink;
mod udpsrc; mod udpsrc;
pub mod net;
use gst::glib; use gst::glib;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {

View file

@ -0,0 +1,371 @@
// GStreamer
//
// Copyright (C) 2015-2023 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use getifaddrs::Interface;
#[cfg(unix)]
pub mod imp {
use super::*;
use std::{io, mem, net::UdpSocket, os::unix::io::AsRawFd};
use libc::{
in_addr, ip_mreqn, setsockopt, IPPROTO_IP, IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP,
IP_MULTICAST_IF,
};
#[cfg(target_os = "macos")]
use libc::ip_mreq;
#[cfg(any(target_os = "solaris", target_os = "illumos", target_os = "macos"))]
use std::net::IpAddr;
use std::net::Ipv4Addr;
/// Join multicast address for a given interface.
pub fn join_multicast_v4(
socket: &UdpSocket,
addr: &Ipv4Addr,
iface: &Interface,
) -> Result<(), io::Error> {
multicast_group_operation_v4(socket, addr, iface, true)
}
/// Leave multicast address for a given interface.
pub fn leave_multicast_v4(
socket: &UdpSocket,
addr: &Ipv4Addr,
iface: &Interface,
) -> Result<(), io::Error> {
multicast_group_operation_v4(socket, addr, iface, false)
}
fn multicast_group_operation_v4(
socket: &UdpSocket,
addr: &Ipv4Addr,
iface: &Interface,
join: bool,
) -> Result<(), io::Error> {
let index = iface.index.unwrap_or(0);
#[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "macos")))]
{
let group_op: i32 = if join {
IP_ADD_MEMBERSHIP
} else {
IP_DROP_MEMBERSHIP
};
let mreqn = ip_mreqn {
imr_multiaddr: in_addr {
s_addr: u32::from_ne_bytes(addr.octets()),
},
imr_address: in_addr {
s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()),
},
imr_ifindex: index as _,
};
// SAFETY: Requires a valid ip_mreq or ip_mreqn struct to be passed together
// with its size for checking which of the two it is. On errors a negative
// integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
group_op,
&mreqn as *const _ as *const _,
mem::size_of_val(&mreqn) as _,
) < 0
{
return Err(io::Error::last_os_error());
}
}
#[cfg(not(any(target_os = "openbsd", target_os = "dragonfly", target_os = "netbsd")))]
{
let mreqn = ip_mreqn {
imr_multiaddr: in_addr {
s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()),
},
imr_address: in_addr {
s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()),
},
imr_ifindex: index as _,
};
// SAFETY: Requires a valid ip_mreq or ip_mreqn struct to be passed together
// with its size for checking which of the two it is. On errors a negative
// integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
IP_MULTICAST_IF,
&mreqn as *const _ as *const _,
mem::size_of_val(&mreqn) as _,
) < 0
{
return Err(io::Error::last_os_error());
}
}
}
#[cfg(any(target_os = "openbsd", target_os = "dragonfly"))]
{
let addr = in_addr {
s_addr: u32::from_ne_bytes(ip_addr.octets()),
};
// SAFETY: Requires a valid in_addr struct to be passed together with its size for
// checking which of the two it is. On errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
IP_MULTICAST_IF,
&addr as *const _ as *const _,
mem::size_of_val(&addr) as _,
) < 0
{
return Err(io::Error::last_os_error());
}
}
}
#[cfg(target_os = "netbsd")]
{
let idx = (index as u32).to_be();
// SAFETY: Requires a valid in_addr struct or interface index in network byte order
// to be passed together with its size for checking which of the two it is. On
// errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
IP_MULTICAST_IF,
&idx as *const _ as *const _,
mem::size_of_val(&idx) as _,
) < 0
{
return Err(io::Error::last_os_error());
}
}
}
Ok(())
}
#[cfg(any(target_os = "solaris", target_os = "illumos"))]
{
let ip_addr = match iface.address {
IpAddr::V4(ipv4_addr) => ipv4_addr,
IpAddr::V6(_) => return Err(io::Error::other("Interface address is IPv6")),
};
if join {
socket.join_multicast_v4(addr, &ip_addr).with_context(|| {
format!(
"Failed joining multicast group for interface {} at address {}",
iface.name, ip_addr
)
})?;
} else {
socket.leave_multicast_v4(addr, &ip_addr).with_context(|| {
format!(
"Failed leave multicast group for interface {} at address {}",
iface.name, ip_addr
)
})?;
}
// SAFETY: Requires a valid in_addr struct to be passed together with its size for
// checking which of the two it is. On errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
IP_MULTICAST_IF,
&addr as *const _ as *const _,
mem::size_of_val(&addr) as _,
) < 0
{
return Err(io::Error::last_os_error());
}
}
Ok(())
}
#[cfg(target_os = "macos")]
{
let ip_addr = match iface.address {
IpAddr::V4(ipv4_addr) => ipv4_addr,
IpAddr::V6(_) => return Err(io::Error::other("Interface address is IPv6")),
};
let mreq = ip_mreq {
imr_multiaddr: in_addr {
s_addr: u32::from_ne_bytes(addr.octets()),
},
imr_address: in_addr {
s_addr: u32::from_ne_bytes(ip_addr.octets()),
},
};
let mreqn = ip_mreqn {
imr_multiaddr: in_addr {
s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()),
},
imr_address: in_addr {
s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()),
},
imr_ifindex: index as _,
};
let group_op: i32 = if join {
IP_ADD_MEMBERSHIP
} else {
IP_DROP_MEMBERSHIP
};
// SAFETY: Requires a valid ip_mreq struct to be passed together with its size for checking
// validity. On errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
group_op,
&mreq as *const _ as *const _,
mem::size_of_val(&mreq) as _,
) < 0
{
return Err(io::Error::last_os_error());
}
}
// SAFETY: Requires a valid ip_mreqn struct to be passed together
// with its size for checking which of the two it is. On errors a negative
// integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
IP_MULTICAST_IF,
&mreqn as *const _ as *const _,
mem::size_of_val(&mreqn) as _,
) < 0
{
return Err(io::Error::last_os_error());
}
}
Ok(())
}
}
}
#[cfg(windows)]
pub mod imp {
use super::*;
use std::{
io, mem,
net::{Ipv4Addr, UdpSocket},
os::windows::io::AsRawSocket,
};
use windows_sys::Win32::Networking::WinSock::{
setsockopt, WSAGetLastError, IN_ADDR, IN_ADDR_0, IPPROTO_IP, IP_ADD_MEMBERSHIP,
IP_DROP_MEMBERSHIP, IP_MREQ, IP_MULTICAST_IF,
};
/// Join multicast address for a given interface.
pub fn join_multicast_v4(
socket: &UdpSocket,
addr: &Ipv4Addr,
iface: &Interface,
) -> Result<(), io::Error> {
multicast_group_operation_v4(socket, addr, iface, IP_ADD_MEMBERSHIP)
// let ip_addr = Ipv4Addr::new(0, 0, 0, iface.index.unwrap() as u8);
// socket.join_multicast_v4(addr, &ip_addr).unwrap();
// return Ok(());
}
/// Leave multicast address for a given interface.
pub fn leave_multicast_v4(
socket: &UdpSocket,
addr: &Ipv4Addr,
iface: &Interface,
) -> Result<(), io::Error> {
multicast_group_operation_v4(socket, addr, iface, IP_DROP_MEMBERSHIP)
// let ip_addr = Ipv4Addr::new(0, 0, 0, iface.index.unwrap() as u8);
// socket.leave_multicast_v4(addr, &ip_addr).unwrap();
// return Ok(());
}
fn multicast_group_operation_v4(
socket: &UdpSocket,
addr: &Ipv4Addr,
iface: &Interface,
group_op: i32,
) -> Result<(), io::Error> {
let index = iface.index.unwrap_or(0);
let mreq = IP_MREQ {
imr_multiaddr: IN_ADDR {
S_un: IN_ADDR_0 {
S_addr: u32::from_ne_bytes(addr.octets()),
},
},
imr_interface: IN_ADDR {
S_un: IN_ADDR_0 {
S_addr: u32::from_ne_bytes(Ipv4Addr::new(0, 0, 0, index as u8).octets()),
},
},
};
// SAFETY: Requires a valid ip_mreq struct to be passed together with its size for checking
// validity. On errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_socket() as usize,
IPPROTO_IP,
group_op,
&mreq as *const _ as *const _,
mem::size_of_val(&mreq) as _,
) < 0
{
return Err(io::Error::from_raw_os_error(WSAGetLastError()));
}
}
let ip_addr = IN_ADDR {
S_un: IN_ADDR_0 {
S_addr: u32::from_ne_bytes(Ipv4Addr::new(0, 0, 0, index as u8).octets()),
},
};
// SAFETY: Requires a valid IN_ADDR struct to be passed together with its size for checking
// which of the two it is. On errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_socket() as usize,
IPPROTO_IP,
IP_MULTICAST_IF,
&ip_addr as *const _ as *const _,
mem::size_of_val(&ip_addr) as _,
) < 0
{
return Err(io::Error::last_os_error());
}
}
Ok(())
}
}

View file

@ -99,6 +99,10 @@ impl<T: SocketRead> Socket<T> {
self.clock = clock; self.clock = clock;
self.base_time = base_time; self.base_time = base_time;
} }
pub fn get(&self) -> &T {
&self.reader
}
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -28,13 +28,14 @@ use gst::{element_error, error_msg};
use std::sync::LazyLock; use std::sync::LazyLock;
use crate::net;
use crate::runtime::executor::block_on_or_add_sub_task; use crate::runtime::executor::block_on_or_add_sub_task;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{self, Async, Context, PadSink}; use crate::runtime::{self, Async, Context, PadSink};
use crate::socket::{wrap_socket, GioSocketWrapper}; use crate::socket::{wrap_socket, GioSocketWrapper};
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
@ -57,6 +58,7 @@ const DEFAULT_QOS_DSCP: i32 = -1;
const DEFAULT_CLIENTS: &str = ""; const DEFAULT_CLIENTS: &str = "";
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
const DEFAULT_MULTICAST_IFACE: Option<&str> = None;
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
struct SocketConf { struct SocketConf {
@ -92,6 +94,7 @@ struct Settings {
qos_dscp: i32, qos_dscp: i32,
context: String, context: String,
context_wait: Duration, context_wait: Duration,
multicast_iface: Option<String>,
} }
impl Default for Settings { impl Default for Settings {
@ -110,6 +113,7 @@ impl Default for Settings {
qos_dscp: DEFAULT_QOS_DSCP, qos_dscp: DEFAULT_QOS_DSCP,
context: DEFAULT_CONTEXT.into(), context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT, context_wait: DEFAULT_CONTEXT_WAIT,
multicast_iface: DEFAULT_MULTICAST_IFACE.map(Into::into),
} }
} }
} }
@ -128,7 +132,7 @@ struct UdpSinkPadHandler(Arc<futures::lock::Mutex<UdpSinkPadHandlerInner>>);
impl UdpSinkPadHandler { impl UdpSinkPadHandler {
fn prepare( fn prepare(
&self, &self,
_imp: &UdpSink, imp: &UdpSink,
socket: Option<Async<UdpSocket>>, socket: Option<Async<UdpSocket>>,
socket_v6: Option<Async<UdpSocket>>, socket_v6: Option<Async<UdpSocket>>,
settings: &Settings, settings: &Settings,
@ -141,6 +145,58 @@ impl UdpSinkPadHandler {
inner.socket = socket; inner.socket = socket;
inner.socket_v6 = socket_v6; inner.socket_v6 = socket_v6;
if let Some(multicast_iface) = &settings.multicast_iface {
gst::debug!(
CAT,
imp = imp,
"searching for interface: {}",
multicast_iface
);
// The 'InterfaceFilter::name' only checks for the 'name' field , it does not check
// whether the given name with the interface 'description' (Friendly Name) on Windows
// So we first get all the interfaces and then apply filter
// for name and description (Friendly Name) of each interface.
let ifaces = getifaddrs::getifaddrs().map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to find interface {}: {}", multicast_iface, err]
)
})?;
let iface_filter = ifaces.filter(|i| {
let ip_ver = if i.address.is_ipv4() { "IPv4" } else { "IPv6" };
if &i.name == multicast_iface {
gst::debug!(
CAT,
imp = imp,
"Found interface: {}, version: {ip_ver}",
i.name,
);
true
} else {
#[cfg(windows)]
if &i.description == multicast_iface {
gst::debug!(
CAT,
imp = imp,
"Found interface: {}, version: {ip_ver}",
i.description,
);
return true;
}
gst::trace!(CAT, imp = imp, "skipping interface {}", i.name);
false
}
});
inner.multicast_ifaces = iface_filter.collect();
}
for addr in inner.clients.iter() { for addr in inner.clients.iter() {
inner.configure_client(addr)?; inner.configure_client(addr)?;
} }
@ -365,6 +421,7 @@ struct UdpSinkPadHandlerInner {
clients: BTreeSet<SocketAddr>, clients: BTreeSet<SocketAddr>,
socket_conf: SocketConf, socket_conf: SocketConf,
segment: Option<gst::Segment>, segment: Option<gst::Segment>,
multicast_ifaces: Vec<getifaddrs::Interface>,
} }
impl Default for UdpSinkPadHandlerInner { impl Default for UdpSinkPadHandlerInner {
@ -381,6 +438,7 @@ impl Default for UdpSinkPadHandlerInner {
)]), )]),
socket_conf: Default::default(), socket_conf: Default::default(),
segment: None, segment: None,
multicast_ifaces: Vec::<getifaddrs::Interface>::new(),
} }
} }
} }
@ -391,62 +449,103 @@ impl UdpSinkPadHandlerInner {
if client.ip().is_multicast() { if client.ip().is_multicast() {
match client.ip() { match client.ip() {
IpAddr::V4(addr) => { IpAddr::V4(addr) => {
if let Some(socket) = self.socket.as_ref() { let Some(socket) = self.socket.as_ref() else {
if self.socket_conf.auto_multicast { return Ok(());
};
if self.socket_conf.auto_multicast {
for iface in &self.multicast_ifaces {
if !iface.address.is_ipv4() {
gst::debug!(
CAT,
"Skipping the IPv6 version of the interface {}",
iface.name
);
continue;
}
gst::debug!(CAT, "interface {} joining the multicast", iface.name);
net::imp::join_multicast_v4(socket.as_ref(), &addr, iface).map_err(
|err| {
error_msg!(
gst::ResourceError::OpenWrite,
[
"Failed to join multicast group on iface {} for {:?}: {}",
iface.name,
client,
err
]
)
},
)?;
}
}
if self.socket_conf.multicast_loop {
socket.as_ref().set_multicast_loop_v4(true).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
["Failed to set multicast loop for {:?}: {}", client, err]
)
})?;
}
socket
.as_ref()
.set_multicast_ttl_v4(self.socket_conf.ttl_mc)
.map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
["Failed to set multicast ttl for {:?}: {}", client, err]
)
})?;
}
IpAddr::V6(addr) => {
let Some(socket) = self.socket.as_ref() else {
return Err(error_msg!(
gst::ResourceError::OpenWrite,
["Socket not available"]
));
};
if self.socket_conf.auto_multicast {
for iface in &self.multicast_ifaces {
if !iface.address.is_ipv6() {
gst::debug!(
CAT,
"Skipping the IPv4 version of the interface {}",
iface.name
);
continue;
}
gst::debug!(CAT, "interface {} joining the multicast", iface.name);
socket socket
.as_ref() .as_ref()
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) .join_multicast_v6(&addr, iface.index.unwrap_or(0))
.map_err(|err| { .map_err(|err| {
error_msg!( error_msg!(
gst::ResourceError::OpenWrite, gst::ResourceError::OpenWrite,
[ [
"Failed to join multicast group for {:?}: {}", "Failed to join multicast group on iface {} for {:?}: {}",
iface.name,
client, client,
err err
] ]
) )
})?; })?;
} }
if self.socket_conf.multicast_loop { }
socket.as_ref().set_multicast_loop_v4(true).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
["Failed to set multicast loop for {:?}: {}", client, err]
)
})?;
}
socket if self.socket_conf.multicast_loop {
.as_ref() socket.as_ref().set_multicast_loop_v6(true).map_err(|err| {
.set_multicast_ttl_v4(self.socket_conf.ttl_mc) error_msg!(
.map_err(|err| { gst::ResourceError::OpenWrite,
error_msg!( ["Failed to set multicast loop for {:?}: {}", client, err]
gst::ResourceError::OpenWrite, )
["Failed to set multicast ttl for {:?}: {}", client, err] })?;
)
})?;
}
}
IpAddr::V6(addr) => {
if let Some(socket) = self.socket_v6.as_ref() {
if self.socket_conf.auto_multicast {
socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
["Failed to join multicast group for {:?}: {}", client, err]
)
})?;
}
if self.socket_conf.multicast_loop {
socket.as_ref().set_multicast_loop_v6(true).map_err(|err| {
error_msg!(
gst::ResourceError::OpenWrite,
["Failed to set multicast loop for {:?}: {}", client, err]
)
})?;
}
/* FIXME no API for set_multicast_ttl_v6 ? */
} }
/* FIXME no API for set_multicast_ttl_v6 ? */
} }
} }
} else { } else {
@ -487,12 +586,27 @@ impl UdpSinkPadHandlerInner {
if client.ip().is_multicast() { if client.ip().is_multicast() {
match client.ip() { match client.ip() {
IpAddr::V4(addr) => { IpAddr::V4(addr) => {
if let Some(socket) = self.socket.as_ref() { let Some(socket) = self.socket.as_ref() else {
if self.socket_conf.auto_multicast { return Err(error_msg!(
socket gst::ResourceError::OpenWrite,
.as_ref() ["Socket not available"]
.leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) ));
.map_err(|err| { };
if self.socket_conf.auto_multicast {
for iface in &self.multicast_ifaces {
if !iface.address.is_ipv4() {
gst::debug!(
CAT,
"Skipping the IPv6 version of the interface {}",
iface.name
);
continue;
}
gst::debug!(CAT, "interface {} leaving the multicast", iface.name);
net::imp::leave_multicast_v4(socket.as_ref(), &addr, iface).map_err(
|err| {
error_msg!( error_msg!(
gst::ResourceError::OpenWrite, gst::ResourceError::OpenWrite,
[ [
@ -501,16 +615,34 @@ impl UdpSinkPadHandlerInner {
err err
] ]
) )
})?; },
)?;
} }
} }
} }
IpAddr::V6(addr) => { IpAddr::V6(addr) => {
if let Some(socket) = self.socket_v6.as_ref() { let Some(socket) = self.socket.as_ref() else {
if self.socket_conf.auto_multicast { return Err(error_msg!(
gst::ResourceError::OpenWrite,
["Socket not available"]
));
};
if self.socket_conf.auto_multicast {
for iface in &self.multicast_ifaces {
if !iface.address.is_ipv6() {
gst::debug!(
CAT,
"Skipping the IPv4 version of the interface {}",
iface.name
);
continue;
}
gst::debug!(CAT, "interface {} leaving the multicast", iface.name);
socket socket
.as_ref() .as_ref()
.leave_multicast_v6(&addr, 0) .leave_multicast_v6(&addr, iface.index.unwrap_or(0))
.map_err(|err| { .map_err(|err| {
error_msg!( error_msg!(
gst::ResourceError::OpenWrite, gst::ResourceError::OpenWrite,
@ -952,6 +1084,11 @@ impl ObjectImpl for UdpSink {
.blurb("A comma separated list of host:port pairs with destinations") .blurb("A comma separated list of host:port pairs with destinations")
.default_value(Some(DEFAULT_CLIENTS)) .default_value(Some(DEFAULT_CLIENTS))
.build(), .build(),
glib::ParamSpecString::builder("multicast-iface")
.nick("Multicast Interface")
.blurb("The network interface on which to join the multicast group. (Supports only single interface)")
.default_value(DEFAULT_MULTICAST_IFACE)
.build(),
] ]
}); });
@ -1119,6 +1256,9 @@ impl ObjectImpl for UdpSink {
value.get::<u32>().expect("type checked upstream").into(), value.get::<u32>().expect("type checked upstream").into(),
); );
} }
"multicast-iface" => {
settings.multicast_iface = value.get().expect("type checked upstream");
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -1164,6 +1304,7 @@ impl ObjectImpl for UdpSink {
} }
"context" => settings.context.to_value(), "context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(), "context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"multicast-iface" => settings.multicast_iface.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -35,6 +35,7 @@ use std::time::Duration;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{task, Async, Context, PadSrc, Task, TaskState}; use crate::runtime::{task, Async, Context, PadSrc, Task, TaskState};
use crate::net;
use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead}; use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead};
use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::pin_mut; use futures::pin_mut;
@ -51,6 +52,7 @@ const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true; const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true;
const DEFAULT_MULTICAST_LOOP: bool = true; const DEFAULT_MULTICAST_LOOP: bool = true;
const DEFAULT_BUFFER_SIZE: u32 = 0; const DEFAULT_BUFFER_SIZE: u32 = 0;
const DEFAULT_MULTICAST_IFACE: Option<&str> = None;
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct State { struct State {
@ -71,6 +73,7 @@ struct Settings {
retrieve_sender_address: bool, retrieve_sender_address: bool,
multicast_loop: bool, multicast_loop: bool,
buffer_size: u32, buffer_size: u32,
multicast_iface: Option<String>,
} }
impl Default for Settings { impl Default for Settings {
@ -88,6 +91,7 @@ impl Default for Settings {
retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS, retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS,
multicast_loop: DEFAULT_MULTICAST_LOOP, multicast_loop: DEFAULT_MULTICAST_LOOP,
buffer_size: DEFAULT_BUFFER_SIZE, buffer_size: DEFAULT_BUFFER_SIZE,
multicast_iface: DEFAULT_MULTICAST_IFACE.map(Into::into),
} }
} }
} }
@ -194,6 +198,8 @@ struct UdpSrcTask {
need_initial_events: bool, need_initial_events: bool,
need_segment: bool, need_segment: bool,
event_receiver: Receiver<gst::Event>, event_receiver: Receiver<gst::Event>,
multicast_ifaces: Vec<getifaddrs::Interface>,
multicast_addr: Option<IpAddr>,
} }
impl UdpSrcTask { impl UdpSrcTask {
@ -205,6 +211,8 @@ impl UdpSrcTask {
need_initial_events: true, need_initial_events: true,
need_segment: true, need_segment: true,
event_receiver, event_receiver,
multicast_ifaces: Vec::<getifaddrs::Interface>::new(),
multicast_addr: None,
} }
} }
} }
@ -258,7 +266,10 @@ impl TaskImpl for UdpSrcTask {
["Invalid address '{}' set: {}", addr, err] ["Invalid address '{}' set: {}", addr, err]
)); ));
} }
Ok(addr) => addr, Ok(addr) => {
self.multicast_addr = Some(addr);
addr
}
}, },
}; };
let port = settings.port; let port = settings.port;
@ -363,18 +374,91 @@ impl TaskImpl for UdpSrcTask {
})?; })?;
if addr.is_multicast() { if addr.is_multicast() {
// TODO: Multicast interface configuration, going to be tricky if let Some(multicast_iface) = &settings.multicast_iface {
let multi_ifaces: Vec<String> =
multicast_iface.split(',').map(|s| s.to_string()).collect();
let iter = getifaddrs::getifaddrs().map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to get interfaces: {}", err]
)
})?;
iter.for_each(|iface| {
let ip_ver = if iface.address.is_ipv4() {
"IPv4"
} else {
"IPv6"
};
for m in &multi_ifaces {
if &iface.name == m {
self.multicast_ifaces.push(iface.clone());
gst::debug!(
CAT,
obj = self.element,
"Interface {m} available, version: {ip_ver}"
);
} else {
// check if name matches the interface description (Friendly name) on Windows
#[cfg(windows)]
if &iface.description == m {
self.multicast_ifaces.push(iface.clone());
gst::debug!(
CAT,
obj = self.element,
"Interface {m} available, version: {ip_ver}"
);
}
}
}
});
}
if self.multicast_ifaces.is_empty() {
gst::warning!(
CAT,
obj = self.element,
"No suitable network interfaces found, adding default iface"
);
self.multicast_ifaces.push(getifaddrs::Interface {
name: "default".to_owned(),
#[cfg(windows)]
description: "default".to_owned(),
address: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
#[cfg(not(windows))]
associated_address: None,
netmask: None,
flags: getifaddrs::InterfaceFlags::UP,
index: Some(0),
});
}
match addr { match addr {
IpAddr::V4(addr) => { IpAddr::V4(addr) => {
socket for iface in &self.multicast_ifaces {
.as_ref() if !iface.address.is_ipv4() {
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) gst::debug!(
.map_err(|err| { CAT,
gst::error_msg!( "Skipping the IPv6 version of the interface {}",
gst::ResourceError::OpenRead, iface.name
["Failed to join multicast group: {}", err] );
) continue;
})?; }
gst::debug!(CAT, "interface {} joining the multicast", iface.name);
// use the custom written API to be able to pass the interface index
// for all types of target OS
net::imp::join_multicast_v4(socket.as_ref(), &addr, iface)
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
socket socket
.as_ref() .as_ref()
@ -391,12 +475,27 @@ impl TaskImpl for UdpSrcTask {
})?; })?;
} }
IpAddr::V6(addr) => { IpAddr::V6(addr) => {
socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { for iface in &self.multicast_ifaces {
gst::error_msg!( if !iface.address.is_ipv6() {
gst::ResourceError::OpenRead, gst::debug!(
["Failed to join multicast group: {}", err] CAT,
) "Skipping the IPv4 version of the interface {}",
})?; iface.name
);
continue;
}
gst::debug!(CAT, "interface {} joining the multicast", iface.name);
socket
.as_ref()
.join_multicast_v6(&addr, iface.index.unwrap_or(0))
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
socket socket
.as_ref() .as_ref()
@ -466,6 +565,47 @@ impl TaskImpl for UdpSrcTask {
async move { async move {
gst::debug!(CAT, obj = self.element, "Unpreparing Task"); gst::debug!(CAT, obj = self.element, "Unpreparing Task");
let udpsrc = self.element.imp(); let udpsrc = self.element.imp();
if let Some(reader) = &self.socket {
let socket = &reader.get().0;
if let Some(addr) = self.multicast_addr {
match addr {
IpAddr::V4(addr) => {
for iface in &self.multicast_ifaces {
if !iface.address.is_ipv4() {
gst::debug!(
CAT,
"Skipping the IPv6 version of the interface {}",
iface.name
);
continue;
}
gst::debug!(CAT, "interface {} leaving the multicast", iface.name);
net::imp::leave_multicast_v4(socket.as_ref(), &addr, iface)
.unwrap();
}
}
IpAddr::V6(addr) => {
for iface in &self.multicast_ifaces {
if !iface.address.is_ipv6() {
gst::debug!(
CAT,
"Skipping the IPv4 version of the interface {}",
iface.name
);
continue;
}
gst::debug!(CAT, "interface {} leaving the multicast", iface.name);
socket
.as_ref()
.leave_multicast_v6(&addr, iface.index.unwrap_or(0))
.unwrap();
}
}
}
}
}
udpsrc.settings.lock().unwrap().used_socket = None; udpsrc.settings.lock().unwrap().used_socket = None;
self.element.notify("used-socket"); self.element.notify("used-socket");
} }
@ -825,6 +965,12 @@ impl ObjectImpl for UdpSrc {
.maximum(u32::MAX) .maximum(u32::MAX)
.default_value(DEFAULT_BUFFER_SIZE) .default_value(DEFAULT_BUFFER_SIZE)
.build(), .build(),
glib::ParamSpecString::builder("multicast-iface")
.nick("Multicast Interface")
.blurb("The network interface on which to join the multicast group. This allows multiple interfaces
separated by comma. (\"eth0,eth1\")")
.default_value(DEFAULT_MULTICAST_IFACE)
.build(),
]; ];
@ -898,6 +1044,9 @@ impl ObjectImpl for UdpSrc {
"buffer-size" => { "buffer-size" => {
settings.buffer_size = value.get().expect("type checked upstream"); settings.buffer_size = value.get().expect("type checked upstream");
} }
"multicast-iface" => {
settings.multicast_iface = value.get().expect("type checked upstream");
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -925,6 +1074,7 @@ impl ObjectImpl for UdpSrc {
"retrieve-sender-address" => settings.retrieve_sender_address.to_value(), "retrieve-sender-address" => settings.retrieve_sender_address.to_value(),
"loop" => settings.multicast_loop.to_value(), "loop" => settings.multicast_loop.to_value(),
"buffer-size" => settings.buffer_size.to_value(), "buffer-size" => settings.buffer_size.to_value(),
"multicast-iface" => settings.multicast_iface.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }