From c9d9af2fee0d2eeca40d95fd78368dbaedfdb77e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 8 Jun 2023 13:40:47 +0300 Subject: [PATCH] ptp: Set port-reuse socket options before binding the socket Otherwise it only works if GStreamer is binding the first socket on this port. Unfortunately this requires duplicating a bit more of Rust std because `UdpSocket` can only be created already bound without allowing to set any options between socket creation and binding. Part-of: --- .../gstreamer/libs/gst/helpers/ptp/ffi.rs | 74 +++++++ .../gstreamer/libs/gst/helpers/ptp/main.rs | 6 +- .../gstreamer/libs/gst/helpers/ptp/net.rs | 203 ++++++++++++++++-- 3 files changed, 267 insertions(+), 16 deletions(-) diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs index 8e6cc04a75..451f2dd48d 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs @@ -85,6 +85,12 @@ pub mod unix { ))] pub const SOL_SOCKET: c_int = 1; + #[cfg(target_os = "macos")] + pub const FIOCLEX: c_ulong = 0x20006601; + + #[cfg(target_os = "macos")] + pub const SO_NOSIGPIPE: c_int = 0x1022; + #[cfg(any( target_os = "solaris", target_os = "illumos", @@ -145,6 +151,41 @@ pub mod unix { ))] pub const SO_REUSEPORT: c_int = 15; + #[cfg(any(target_os = "freebsd", target_os = "dragonfly", target_os = "netbsd"))] + pub const SOCK_CLOEXEC: c_int = 0x10000000; + #[cfg(target_os = "openbsd")] + pub const SOCK_CLOEXEC: c_int = 0x8000; + #[cfg(any(target_os = "solaris", target_os = "illumos"))] + pub const SOCK_CLOEXEC: c_int = 0x080000; + #[cfg(all( + target_os = "linux", + any(target_arch = "sparc", target_arch = "sparc64"), + ))] + pub const SOCK_CLOEXEC: c_int = 0x400000; + #[cfg(all( + target_os = "linux", + not(any(target_arch = "sparc", target_arch = "sparc64")), + ))] + pub const SOCK_CLOEXEC: c_int = 0x80000; + + #[cfg(any( + target_os = "freebsd", + target_os = "dragonfly", + target_os = "netbsd", + target_os = "openbsd", + target_os = "macos", + ))] + pub const SOCK_DGRAM: c_int = 2; + #[cfg(any(target_os = "solaris", target_os = "illumos"))] + pub const SOCK_DGRAM: c_int = 1; + #[cfg(all(target_os = "linux", any(target_arch = "mips", target_arch = "mips64"),))] + pub const SOCK_DGRAM: c_int = 1; + #[cfg(all( + target_os = "linux", + not(any(target_arch = "mips", target_arch = "mips64")), + ))] + pub const SOCK_DGRAM: c_int = 2; + pub const AF_INET: c_int = 2; #[cfg(any( target_os = "freebsd", @@ -227,6 +268,20 @@ pub mod unix { pub fn setpriority(which: c_int, who: c_int, prio: c_int) -> c_int; + #[cfg_attr(target_os = "netbsd", link_name = "__socket30")] + #[cfg_attr(target_os = "illumos", link_name = "__xnet_socket")] + pub fn socket(domain: c_int, ty: c_int, protocol: c_int) -> c_int; + + #[cfg_attr(target_os = "illumos", link_name = "__xnet_bind")] + #[cfg_attr( + all(target_os = "macos", target_arch = "x86"), + link_name = "bind$UNIX2003" + )] + pub fn bind(socket: c_int, address: *const sockaddr, address_len: u32) -> c_int; + + #[cfg(target_os = "macos")] + pub fn ioctl(fd: c_int, request: c_ulong, ...) -> c_int; + #[cfg(test)] pub fn pipe(pipefd: *mut i32) -> i32; } @@ -665,6 +720,7 @@ pub mod windows { // // XXX: Once meson has cargo subproject support all of the below can be replaced with the windows-sys crate. pub const INVALID_HANDLE_VALUE: HANDLE = (-1 as isize as usize) as HANDLE; + pub const INVALID_SOCKET: SOCKET = (-1 as isize as usize) as SOCKET; pub const STD_INPUT_HANDLE: i32 = -10; pub const STD_OUTPUT_HANDLE: i32 = -11; @@ -827,10 +883,26 @@ pub mod windows { value: *const c_void, option_len: i32, ) -> i32; + + pub fn WSASocketW( + af: i32, + ty: i32, + protocol: i32, + lpprotocolinfo: *const c_void, + g: u32, + dwflags: u32, + ) -> SOCKET; + pub fn bind(s: SOCKET, name: *const SOCKADDR, namelen: i32) -> i32; + pub fn closesocket(socket: SOCKET) -> i32; } pub const AF_INET: u32 = 2; + pub const SOCK_DGRAM: u16 = 2u16; + + pub const WSA_FLAG_OVERLAPPED: u32 = 1u32; + pub const WSA_FLAG_NO_HANDLE_INHERIT: u32 = 128u32; + pub const GAA_FLAG_SKIP_ANYCAST: u32 = 0x0002; pub const GAA_FLAG_SKIP_MULTICAST: u32 = 0x0004; pub const GAA_FLAG_SKIP_DNS_SERVER: u32 = 0x0008; @@ -873,11 +945,13 @@ pub mod windows { pub anonymous: IP_ADAPTER_UNICAST_ADDRESS_LH_0_0, } + // XXX: Actually SOCKADDR_IN but we don't care about others #[repr(C)] pub struct SOCKADDR { pub sa_family: u16, pub sin_port: u16, pub in_addr: IN_ADDR, + pub sin_zero: [u8; 8], } #[repr(C)] diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs index f2b0fb4831..8ce182345f 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs @@ -19,7 +19,7 @@ use std::{ io::{Read, Write}, - net::{Ipv4Addr, SocketAddr, UdpSocket}, + net::{Ipv4Addr, UdpSocket}, }; #[macro_use] @@ -59,7 +59,7 @@ const MSG_TYPE_SEND_TIME_ACK: u8 = 3; /// Create a new `UdpSocket` for the given port and configure it for PTP. fn create_socket(port: u16) -> Result { - let socket = UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, port))) + let socket = net::create_udp_socket(&Ipv4Addr::UNSPECIFIED, port) .with_context(|| format!("Failed to bind socket to port {}", port))?; socket @@ -70,8 +70,6 @@ fn create_socket(port: u16) -> Result { .set_multicast_ttl_v4(1) .context("Failed to set multicast TTL on socket")?; - net::set_reuse(&socket); - Ok(socket) } diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs index b6d4db6fb1..4f83e85c27 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs @@ -262,7 +262,106 @@ mod imp { Ok(if_infos) } - // Join multicast address for a given interface. + /// Create an `UdpSocket` and bind it to the given address but set `SO_REUSEADDR` and/or + /// `SO_REUSEPORT` before doing so. + /// + /// `UdpSocket::bind()` does not allow setting custom options before binding. + pub fn create_udp_socket(addr: &Ipv4Addr, port: u16) -> Result { + use std::os::unix::io::FromRawFd; + + /// Helper struct to keep a raw fd and close it on drop + struct Fd(i32); + impl Drop for Fd { + fn drop(&mut self) { + unsafe { + // SAFETY: The integer is a valid fd by construction. + let _ = close(self.0); + } + } + } + + // SAFETY: Calling socket() is safe at any time and will simply fail if invalid parameters + // are passed. + let fd = unsafe { + #[cfg(any( + target_os = "linux", + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", + target_os = "dragonfly", + target_os = "solaris", + target_os = "illumos", + ))] + let ty = SOCK_DGRAM | SOCK_CLOEXEC; + #[cfg(target_os = "macos")] + let ty = SOCK_DGRAM; + + let res = socket(AF_INET, ty, 0); + if res == -1 { + return Err(io::Error::last_os_error()); + } + Fd(res) + }; + + // SAFETY: A valid socket fd is passed to ioctl() and setsockopt() and the parameters to + // setsockopt() are according to the type expected by SO_NOSIGPIPE. + #[cfg(target_os = "macos")] + unsafe { + let res = ioctl(fd.0, FIOCLEX); + if res == -1 { + return Err(io::Error::last_os_error()); + } + + let val = 1i32; + let res = setsockopt( + fd.0, + SOL_SOCKET, + SO_NOSIGPIPE, + &val as *const _ as *const _, + mem::size_of_val(&val) as _, + ); + if res < 0 { + return Err(io::Error::last_os_error()); + } + } + + // SAFETY: A valid socket fd is passed here. + unsafe { + set_reuse(fd.0); + } + + // SAFETY: A valid socket fd is passed together with a valid sockaddr_in and its size. + unsafe { + let addr = sockaddr_in { + sin_family: AF_INET as _, + sin_port: u16::to_be(port), + sin_addr: in_addr { + s_addr: u32::from_ne_bytes(addr.octets()), + }, + sin_zero: [0u8; 8], + #[cfg(any( + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", + target_os = "dragonfly", + target_os = "macos", + ))] + sin_len: mem::size_of::() as _, + }; + let res = bind( + fd.0, + &addr as *const _ as *const _, + mem::size_of_val(&addr) as _, + ); + if res < 0 { + return Err(io::Error::last_os_error()); + } + } + + unsafe { Ok(UdpSocket::from_raw_fd(mem::ManuallyDrop::new(fd).0)) } + } + + /// Join multicast address for a given interface. pub fn join_multicast_v4( socket: &UdpSocket, addr: &Ipv4Addr, @@ -322,7 +421,9 @@ mod imp { /// Allow multiple sockets to bind to the same address / port. /// /// This is best-effort and might not actually do anything. - pub fn set_reuse(socket: &UdpSocket) { + /// + /// SAFETY: Must be called with a valid socket fd. + unsafe fn set_reuse(socket: i32) { // SAFETY: SO_REUSEADDR takes an i32 value that can be 0/false or 1/true and // enables the given feature on the socket. // @@ -331,7 +432,7 @@ mod imp { unsafe { let v = 1i32; let res = setsockopt( - socket.as_raw_fd(), + socket, SOL_SOCKET, SO_REUSEADDR, &v as *const _ as *const _, @@ -352,7 +453,7 @@ mod imp { unsafe { let v = 1i32; let res = setsockopt( - socket.as_raw_fd(), + socket, SOL_SOCKET, SO_REUSEPORT, &v as *const _ as *const _, @@ -374,10 +475,10 @@ mod imp { use std::{ ffi::{CStr, OsString}, io, marker, mem, - net::UdpSocket, + net::{Ipv4Addr, SocketAddr, UdpSocket}, os::{ raw::*, - windows::{ffi::OsStringExt, io::AsRawSocket}, + windows::{ffi::OsStringExt, io::AsRawSocket, raw::SOCKET}, }, ptr, slice, }; @@ -635,6 +736,70 @@ mod imp { Ok(if_infos) } + /// Create an `UdpSocket` and bind it to the given address but set `SO_REUSEADDR` and/or + /// `SO_REUSEPORT` before doing so. + /// + /// `UdpSocket::bind()` does not allow setting custom options before binding. + pub fn create_udp_socket(addr: &Ipv4Addr, port: u16) -> Result { + use std::os::windows::io::FromRawSocket; + + // XXX: Make sure Rust std is calling WSAStartup() + let _ = UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)))?; + + /// Helper struct to keep a raw socket and close it on drop + struct Socket(SOCKET); + impl Drop for Socket { + fn drop(&mut self) { + unsafe { + // SAFETY: The socket is valid by construction. + let _ = closesocket(self.0); + } + } + } + + // SAFETY: Calling WSASocketW() is safe at any time and will simply fail if invalid parameters + // are passed or something else goes wrong. + let socket = unsafe { + let res = WSASocketW( + AF_INET as _, + SOCK_DGRAM as _, + 0, + ptr::null_mut(), + 0, + WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT, + ); + if res == INVALID_SOCKET { + return Err(io::Error::from_raw_os_error(WSAGetLastError())); + } + Socket(res) + }; + + // SAFETY: A valid socket is passed here. + unsafe { + set_reuse(socket.0); + } + + // SAFETY: A valid socket fd is passed together with a valid SOCKADDR and its size. + unsafe { + let addr = SOCKADDR { + sa_family: AF_INET as _, + sin_port: u16::to_be(port), + in_addr: IN_ADDR { + S_un: IN_ADDR_0 { + S_addr: u32::from_ne_bytes(addr.octets()), + }, + }, + sin_zero: [0; 8], + }; + let res = bind(socket.0, &addr, mem::size_of_val(&addr) as _); + if res < 0 { + return Err(io::Error::from_raw_os_error(WSAGetLastError())); + } + } + + unsafe { Ok(UdpSocket::from_raw_socket(mem::ManuallyDrop::new(socket).0)) } + } + // Join multicast address for a given interface. pub fn join_multicast_v4( socket: &UdpSocket, @@ -679,7 +844,9 @@ mod imp { /// Allow multiple sockets to bind to the same address / port. /// /// This is best-effort and might not actually do anything. - pub fn set_reuse(socket: &UdpSocket) { + /// + /// SAFETY: Must be called with a valid socket. + unsafe fn set_reuse(socket: SOCKET) { // SAFETY: SO_REUSEADDR takes an i32 value that can be 0/false or 1/true and // enables the given feature on the socket. // @@ -688,7 +855,7 @@ mod imp { unsafe { let v = 1i32; let res = setsockopt( - socket.as_raw_socket(), + socket, SOL_SOCKET as i32, SO_REUSEADDR as i32, &v as *const _ as *const _, @@ -716,7 +883,7 @@ mod test { } #[test] - fn test_join_multicast() { + fn test_create_socket_join_multicast() { let ifaces = super::query_interfaces().unwrap(); let iface = if ifaces.is_empty() { return; @@ -724,12 +891,24 @@ mod test { &ifaces[0] }; - let socket = std::net::UdpSocket::bind(std::net::SocketAddr::from(( + let socket = super::create_udp_socket(&std::net::Ipv4Addr::UNSPECIFIED, 0).unwrap(); + super::join_multicast_v4(&socket, &std::net::Ipv4Addr::new(224, 0, 0, 1), iface).unwrap(); + + let local_addr = socket.local_addr().unwrap(); + + let socket2 = std::net::UdpSocket::bind(std::net::SocketAddr::from(( std::net::Ipv4Addr::UNSPECIFIED, 0, ))) .unwrap(); - super::set_reuse(&socket); - super::join_multicast_v4(&socket, &std::net::Ipv4Addr::new(224, 0, 0, 1), iface).unwrap(); + socket2 + .send_to( + &[1, 2, 3, 4], + std::net::SocketAddr::from((std::net::Ipv4Addr::LOCALHOST, local_addr.port())), + ) + .unwrap(); + let mut buf = [0u8; 4]; + socket.recv(&mut buf).unwrap(); + assert_eq!(buf, [1, 2, 3, 4]); } }