threadshare: udpsrc: add multicast-iface property

Filter the valid interfaces from the given list.
To join the multicast, use the interface ip address in case of IPv4
and interface index in case of IPv6 address
This commit is contained in:
Taruntej Kanakamalla 2024-01-02 12:57:19 +05:30
parent 9f27bde36a
commit 7733a45e2b
4 changed files with 189 additions and 15 deletions

92
Cargo.lock generated
View file

@ -1444,6 +1444,23 @@ dependencies = [
"system-deps",
]
[[package]]
name = "default-net"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85dc7576d8346d3c86ad64dc64d26d0f6c970ba4795b850f15ee94467d8e53eb"
dependencies = [
"dlopen2",
"libc",
"memalloc",
"netlink-packet-core",
"netlink-packet-route",
"netlink-sys",
"once_cell",
"system-configuration",
"windows",
]
[[package]]
name = "der"
version = "0.6.1"
@ -1490,6 +1507,17 @@ dependencies = [
"subtle",
]
[[package]]
name = "dlopen2"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09b4f5f101177ff01b8ec4ecc81eead416a8aa42819a2869311b3420fa114ffa"
dependencies = [
"libc",
"once_cell",
"winapi",
]
[[package]]
name = "dssim-core"
version = "3.2.8"
@ -2776,6 +2804,7 @@ dependencies = [
"cfg-if",
"clap",
"concurrent-queue",
"default-net",
"flume",
"futures",
"gio",
@ -4458,6 +4487,12 @@ dependencies = [
"digest 0.10.7",
]
[[package]]
name = "memalloc"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df39d232f5c40b0891c10216992c2f250c054105cb1e56f0fc9032db6203ecc1"
[[package]]
name = "memchr"
version = "2.7.1"
@ -4600,6 +4635,54 @@ dependencies = [
"tempfile",
]
[[package]]
name = "netlink-packet-core"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72724faf704479d67b388da142b186f916188505e7e0b26719019c525882eda4"
dependencies = [
"anyhow",
"byteorder",
"netlink-packet-utils",
]
[[package]]
name = "netlink-packet-route"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053998cea5a306971f88580d0829e90f270f940befd7cf928da179d4187a5a66"
dependencies = [
"anyhow",
"bitflags 1.3.2",
"byteorder",
"libc",
"netlink-packet-core",
"netlink-packet-utils",
]
[[package]]
name = "netlink-packet-utils"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ede8a08c71ad5a95cdd0e4e52facd37190977039a4704eb82a283f713747d34"
dependencies = [
"anyhow",
"byteorder",
"paste",
"thiserror",
]
[[package]]
name = "netlink-sys"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6471bf08e7ac0135876a9581bf3217ef0333c191c128d34878079f42ee150411"
dependencies = [
"bytes",
"libc",
"log",
]
[[package]]
name = "new_debug_unreachable"
version = "1.0.6"
@ -7010,6 +7093,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f"
dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-core"
version = "0.52.0"

View file

@ -10032,6 +10032,18 @@
"type": "guint",
"writable": true
},
"multicast-iface": {
"blurb": "The network interface on which to join the multicast group. This allows multiple interfaces separated by comma. ( e.g. eth0,eth1,wlan0)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"port": {
"blurb": "Port to listen on",
"conditionally-available": false,

View file

@ -27,6 +27,7 @@ rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"
slab = "0.4.7"
socket2 = {features = ["all"], version = "0.5"}
waker-fn = "1.1"
default-net = "0.21.0"
# Used by examples
clap = { version = "4", features = ["derive"], optional = true }

View file

@ -49,6 +49,7 @@ const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true;
const DEFAULT_MULTICAST_IFACE: Option<&str> = None;
#[derive(Debug, Clone)]
struct Settings {
@ -62,6 +63,7 @@ struct Settings {
context: String,
context_wait: Duration,
retrieve_sender_address: bool,
multicast_iface: Option<String>,
}
impl Default for Settings {
@ -77,6 +79,7 @@ impl Default for Settings {
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS,
multicast_iface: DEFAULT_MULTICAST_IFACE.map(Into::into),
}
}
}
@ -327,26 +330,83 @@ impl TaskImpl for UdpSrcTask {
})?;
if addr.is_multicast() {
// TODO: Multicast interface configuration, going to be tricky
match addr {
IpAddr::V4(addr) => {
socket
.as_ref()
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| {
// If the 'multicast-iface' property is specified,
// fetch all the interfaces available and filter the available
// interfaces among the ones specified in the `multicast-iface`
if let Some(iface_name) = &settings.multicast_iface {
let preferred_ifaces: Vec<&str> = iface_name.split(",").collect();
let all_ifaces = default_net::get_interfaces();
let ifaces = all_ifaces.iter().filter(|iface| {
preferred_ifaces.iter().position(|&p| {
iface.name == p.to_string() ||
iface.friendly_name == Some(p.to_string())
}).is_some()
});
for iface in ifaces {
match addr {
IpAddr::V4(addr) => {
let ip_addr = if iface.ipv4.is_empty() {
// No Ipv4 address on this interface, use 0.0.0.0
// as the default address to join multicast
Ipv4Addr::new(0,0,0,0)
} else {
// Use the first ipv4 address in the list
gst::debug!(CAT, obj:self.element, "Joining {} ({:?}) to multicast", iface.name, iface.friendly_name);
iface.ipv4.first().unwrap().addr
};
//TODO: Change this to use interface index instead of the IP address
// Would be ideal if we could rewrite/re-use `join_multicast_v4`
// in gstreamer/libs/gst/helpers/ptp/net.rs
// We could use `socket2::join_multicast_v4` but it is
// not supported for all the OS.
socket
.as_ref()
.join_multicast_v4(&addr, &ip_addr)
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["{} {:?} Failed to join multicast group: {}", iface.name, iface.friendly_name, err]
)
})?;
}
IpAddr::V6(addr) => {
let idx = iface.index;
gst::debug!(CAT, obj:self.element, "Joining {} ({:?}) to multicast", iface.name, iface.friendly_name);
socket.as_ref().join_multicast_v6(&addr, idx).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
}
}
} else {
match addr {
IpAddr::V4(addr) => {
socket
.as_ref()
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
IpAddr::V6(addr) => {
socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
IpAddr::V6(addr) => {
socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
}
}
}
@ -679,6 +739,11 @@ impl ObjectImpl for UdpSrc {
.blurb("Whether to retrieve the sender address and add it to buffers as meta. Disabling this might result in minor performance improvements in certain scenarios")
.default_value(DEFAULT_RETRIEVE_SENDER_ADDRESS)
.build(),
glib::ParamSpecString::builder("multicast-iface")
.nick("Multicast Interface")
.blurb("The network interface on which to join the multicast group. This allows multiple interfaces separated by comma. ( e.g. eth0,eth1,wlan0)")
.default_value(DEFAULT_MULTICAST_IFACE)
.build(),
];
#[cfg(not(windows))]
@ -745,6 +810,9 @@ impl ObjectImpl for UdpSrc {
"retrieve-sender-address" => {
settings.retrieve_sender_address = value.get().expect("type checked upstream");
}
"multicast-iface" => {
settings.multicast_iface = value.get().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
@ -770,6 +838,7 @@ impl ObjectImpl for UdpSrc {
"context" => settings.context.to_value(),
"context-wait" => (settings.context_wait.as_millis() as u32).to_value(),
"retrieve-sender-address" => settings.retrieve_sender_address.to_value(),
"multicast-iface" => settings.multicast_iface.to_value(),
_ => unimplemented!(),
}
}