ptp: Set port-reuse socket options before binding the socket

Otherwise it only works if GStreamer is binding the first socket on this
port.

Unfortunately this requires duplicating a bit more of Rust std because
`UdpSocket` can only be created already bound without allowing to set
any options between socket creation and binding.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4807>
This commit is contained in:
Sebastian Dröge 2023-06-08 13:40:47 +03:00 committed by GStreamer Marge Bot
parent 5e455e21cf
commit c9d9af2fee
3 changed files with 267 additions and 16 deletions

View file

@ -85,6 +85,12 @@ pub mod unix {
))] ))]
pub const SOL_SOCKET: c_int = 1; pub const SOL_SOCKET: c_int = 1;
#[cfg(target_os = "macos")]
pub const FIOCLEX: c_ulong = 0x20006601;
#[cfg(target_os = "macos")]
pub const SO_NOSIGPIPE: c_int = 0x1022;
#[cfg(any( #[cfg(any(
target_os = "solaris", target_os = "solaris",
target_os = "illumos", target_os = "illumos",
@ -145,6 +151,41 @@ pub mod unix {
))] ))]
pub const SO_REUSEPORT: c_int = 15; pub const SO_REUSEPORT: c_int = 15;
#[cfg(any(target_os = "freebsd", target_os = "dragonfly", target_os = "netbsd"))]
pub const SOCK_CLOEXEC: c_int = 0x10000000;
#[cfg(target_os = "openbsd")]
pub const SOCK_CLOEXEC: c_int = 0x8000;
#[cfg(any(target_os = "solaris", target_os = "illumos"))]
pub const SOCK_CLOEXEC: c_int = 0x080000;
#[cfg(all(
target_os = "linux",
any(target_arch = "sparc", target_arch = "sparc64"),
))]
pub const SOCK_CLOEXEC: c_int = 0x400000;
#[cfg(all(
target_os = "linux",
not(any(target_arch = "sparc", target_arch = "sparc64")),
))]
pub const SOCK_CLOEXEC: c_int = 0x80000;
#[cfg(any(
target_os = "freebsd",
target_os = "dragonfly",
target_os = "netbsd",
target_os = "openbsd",
target_os = "macos",
))]
pub const SOCK_DGRAM: c_int = 2;
#[cfg(any(target_os = "solaris", target_os = "illumos"))]
pub const SOCK_DGRAM: c_int = 1;
#[cfg(all(target_os = "linux", any(target_arch = "mips", target_arch = "mips64"),))]
pub const SOCK_DGRAM: c_int = 1;
#[cfg(all(
target_os = "linux",
not(any(target_arch = "mips", target_arch = "mips64")),
))]
pub const SOCK_DGRAM: c_int = 2;
pub const AF_INET: c_int = 2; pub const AF_INET: c_int = 2;
#[cfg(any( #[cfg(any(
target_os = "freebsd", target_os = "freebsd",
@ -227,6 +268,20 @@ pub mod unix {
pub fn setpriority(which: c_int, who: c_int, prio: c_int) -> c_int; pub fn setpriority(which: c_int, who: c_int, prio: c_int) -> c_int;
#[cfg_attr(target_os = "netbsd", link_name = "__socket30")]
#[cfg_attr(target_os = "illumos", link_name = "__xnet_socket")]
pub fn socket(domain: c_int, ty: c_int, protocol: c_int) -> c_int;
#[cfg_attr(target_os = "illumos", link_name = "__xnet_bind")]
#[cfg_attr(
all(target_os = "macos", target_arch = "x86"),
link_name = "bind$UNIX2003"
)]
pub fn bind(socket: c_int, address: *const sockaddr, address_len: u32) -> c_int;
#[cfg(target_os = "macos")]
pub fn ioctl(fd: c_int, request: c_ulong, ...) -> c_int;
#[cfg(test)] #[cfg(test)]
pub fn pipe(pipefd: *mut i32) -> i32; pub fn pipe(pipefd: *mut i32) -> i32;
} }
@ -665,6 +720,7 @@ pub mod windows {
// //
// XXX: Once meson has cargo subproject support all of the below can be replaced with the windows-sys crate. // XXX: Once meson has cargo subproject support all of the below can be replaced with the windows-sys crate.
pub const INVALID_HANDLE_VALUE: HANDLE = (-1 as isize as usize) as HANDLE; pub const INVALID_HANDLE_VALUE: HANDLE = (-1 as isize as usize) as HANDLE;
pub const INVALID_SOCKET: SOCKET = (-1 as isize as usize) as SOCKET;
pub const STD_INPUT_HANDLE: i32 = -10; pub const STD_INPUT_HANDLE: i32 = -10;
pub const STD_OUTPUT_HANDLE: i32 = -11; pub const STD_OUTPUT_HANDLE: i32 = -11;
@ -827,10 +883,26 @@ pub mod windows {
value: *const c_void, value: *const c_void,
option_len: i32, option_len: i32,
) -> i32; ) -> i32;
pub fn WSASocketW(
af: i32,
ty: i32,
protocol: i32,
lpprotocolinfo: *const c_void,
g: u32,
dwflags: u32,
) -> SOCKET;
pub fn bind(s: SOCKET, name: *const SOCKADDR, namelen: i32) -> i32;
pub fn closesocket(socket: SOCKET) -> i32;
} }
pub const AF_INET: u32 = 2; pub const AF_INET: u32 = 2;
pub const SOCK_DGRAM: u16 = 2u16;
pub const WSA_FLAG_OVERLAPPED: u32 = 1u32;
pub const WSA_FLAG_NO_HANDLE_INHERIT: u32 = 128u32;
pub const GAA_FLAG_SKIP_ANYCAST: u32 = 0x0002; pub const GAA_FLAG_SKIP_ANYCAST: u32 = 0x0002;
pub const GAA_FLAG_SKIP_MULTICAST: u32 = 0x0004; pub const GAA_FLAG_SKIP_MULTICAST: u32 = 0x0004;
pub const GAA_FLAG_SKIP_DNS_SERVER: u32 = 0x0008; pub const GAA_FLAG_SKIP_DNS_SERVER: u32 = 0x0008;
@ -873,11 +945,13 @@ pub mod windows {
pub anonymous: IP_ADAPTER_UNICAST_ADDRESS_LH_0_0, pub anonymous: IP_ADAPTER_UNICAST_ADDRESS_LH_0_0,
} }
// XXX: Actually SOCKADDR_IN but we don't care about others
#[repr(C)] #[repr(C)]
pub struct SOCKADDR { pub struct SOCKADDR {
pub sa_family: u16, pub sa_family: u16,
pub sin_port: u16, pub sin_port: u16,
pub in_addr: IN_ADDR, pub in_addr: IN_ADDR,
pub sin_zero: [u8; 8],
} }
#[repr(C)] #[repr(C)]

View file

@ -19,7 +19,7 @@
use std::{ use std::{
io::{Read, Write}, io::{Read, Write},
net::{Ipv4Addr, SocketAddr, UdpSocket}, net::{Ipv4Addr, UdpSocket},
}; };
#[macro_use] #[macro_use]
@ -59,7 +59,7 @@ const MSG_TYPE_SEND_TIME_ACK: u8 = 3;
/// Create a new `UdpSocket` for the given port and configure it for PTP. /// Create a new `UdpSocket` for the given port and configure it for PTP.
fn create_socket(port: u16) -> Result<UdpSocket, Error> { fn create_socket(port: u16) -> Result<UdpSocket, Error> {
let socket = UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, port))) let socket = net::create_udp_socket(&Ipv4Addr::UNSPECIFIED, port)
.with_context(|| format!("Failed to bind socket to port {}", port))?; .with_context(|| format!("Failed to bind socket to port {}", port))?;
socket socket
@ -70,8 +70,6 @@ fn create_socket(port: u16) -> Result<UdpSocket, Error> {
.set_multicast_ttl_v4(1) .set_multicast_ttl_v4(1)
.context("Failed to set multicast TTL on socket")?; .context("Failed to set multicast TTL on socket")?;
net::set_reuse(&socket);
Ok(socket) Ok(socket)
} }

View file

@ -262,7 +262,106 @@ mod imp {
Ok(if_infos) Ok(if_infos)
} }
// Join multicast address for a given interface. /// Create an `UdpSocket` and bind it to the given address but set `SO_REUSEADDR` and/or
/// `SO_REUSEPORT` before doing so.
///
/// `UdpSocket::bind()` does not allow setting custom options before binding.
pub fn create_udp_socket(addr: &Ipv4Addr, port: u16) -> Result<UdpSocket, io::Error> {
use std::os::unix::io::FromRawFd;
/// Helper struct to keep a raw fd and close it on drop
struct Fd(i32);
impl Drop for Fd {
fn drop(&mut self) {
unsafe {
// SAFETY: The integer is a valid fd by construction.
let _ = close(self.0);
}
}
}
// SAFETY: Calling socket() is safe at any time and will simply fail if invalid parameters
// are passed.
let fd = unsafe {
#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly",
target_os = "solaris",
target_os = "illumos",
))]
let ty = SOCK_DGRAM | SOCK_CLOEXEC;
#[cfg(target_os = "macos")]
let ty = SOCK_DGRAM;
let res = socket(AF_INET, ty, 0);
if res == -1 {
return Err(io::Error::last_os_error());
}
Fd(res)
};
// SAFETY: A valid socket fd is passed to ioctl() and setsockopt() and the parameters to
// setsockopt() are according to the type expected by SO_NOSIGPIPE.
#[cfg(target_os = "macos")]
unsafe {
let res = ioctl(fd.0, FIOCLEX);
if res == -1 {
return Err(io::Error::last_os_error());
}
let val = 1i32;
let res = setsockopt(
fd.0,
SOL_SOCKET,
SO_NOSIGPIPE,
&val as *const _ as *const _,
mem::size_of_val(&val) as _,
);
if res < 0 {
return Err(io::Error::last_os_error());
}
}
// SAFETY: A valid socket fd is passed here.
unsafe {
set_reuse(fd.0);
}
// SAFETY: A valid socket fd is passed together with a valid sockaddr_in and its size.
unsafe {
let addr = sockaddr_in {
sin_family: AF_INET as _,
sin_port: u16::to_be(port),
sin_addr: in_addr {
s_addr: u32::from_ne_bytes(addr.octets()),
},
sin_zero: [0u8; 8],
#[cfg(any(
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly",
target_os = "macos",
))]
sin_len: mem::size_of::<sockaddr_in>() as _,
};
let res = bind(
fd.0,
&addr as *const _ as *const _,
mem::size_of_val(&addr) as _,
);
if res < 0 {
return Err(io::Error::last_os_error());
}
}
unsafe { Ok(UdpSocket::from_raw_fd(mem::ManuallyDrop::new(fd).0)) }
}
/// Join multicast address for a given interface.
pub fn join_multicast_v4( pub fn join_multicast_v4(
socket: &UdpSocket, socket: &UdpSocket,
addr: &Ipv4Addr, addr: &Ipv4Addr,
@ -322,7 +421,9 @@ mod imp {
/// Allow multiple sockets to bind to the same address / port. /// Allow multiple sockets to bind to the same address / port.
/// ///
/// This is best-effort and might not actually do anything. /// This is best-effort and might not actually do anything.
pub fn set_reuse(socket: &UdpSocket) { ///
/// SAFETY: Must be called with a valid socket fd.
unsafe fn set_reuse(socket: i32) {
// SAFETY: SO_REUSEADDR takes an i32 value that can be 0/false or 1/true and // SAFETY: SO_REUSEADDR takes an i32 value that can be 0/false or 1/true and
// enables the given feature on the socket. // enables the given feature on the socket.
// //
@ -331,7 +432,7 @@ mod imp {
unsafe { unsafe {
let v = 1i32; let v = 1i32;
let res = setsockopt( let res = setsockopt(
socket.as_raw_fd(), socket,
SOL_SOCKET, SOL_SOCKET,
SO_REUSEADDR, SO_REUSEADDR,
&v as *const _ as *const _, &v as *const _ as *const _,
@ -352,7 +453,7 @@ mod imp {
unsafe { unsafe {
let v = 1i32; let v = 1i32;
let res = setsockopt( let res = setsockopt(
socket.as_raw_fd(), socket,
SOL_SOCKET, SOL_SOCKET,
SO_REUSEPORT, SO_REUSEPORT,
&v as *const _ as *const _, &v as *const _ as *const _,
@ -374,10 +475,10 @@ mod imp {
use std::{ use std::{
ffi::{CStr, OsString}, ffi::{CStr, OsString},
io, marker, mem, io, marker, mem,
net::UdpSocket, net::{Ipv4Addr, SocketAddr, UdpSocket},
os::{ os::{
raw::*, raw::*,
windows::{ffi::OsStringExt, io::AsRawSocket}, windows::{ffi::OsStringExt, io::AsRawSocket, raw::SOCKET},
}, },
ptr, slice, ptr, slice,
}; };
@ -635,6 +736,70 @@ mod imp {
Ok(if_infos) Ok(if_infos)
} }
/// Create an `UdpSocket` and bind it to the given address but set `SO_REUSEADDR` and/or
/// `SO_REUSEPORT` before doing so.
///
/// `UdpSocket::bind()` does not allow setting custom options before binding.
pub fn create_udp_socket(addr: &Ipv4Addr, port: u16) -> Result<UdpSocket, io::Error> {
use std::os::windows::io::FromRawSocket;
// XXX: Make sure Rust std is calling WSAStartup()
let _ = UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)))?;
/// Helper struct to keep a raw socket and close it on drop
struct Socket(SOCKET);
impl Drop for Socket {
fn drop(&mut self) {
unsafe {
// SAFETY: The socket is valid by construction.
let _ = closesocket(self.0);
}
}
}
// SAFETY: Calling WSASocketW() is safe at any time and will simply fail if invalid parameters
// are passed or something else goes wrong.
let socket = unsafe {
let res = WSASocketW(
AF_INET as _,
SOCK_DGRAM as _,
0,
ptr::null_mut(),
0,
WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT,
);
if res == INVALID_SOCKET {
return Err(io::Error::from_raw_os_error(WSAGetLastError()));
}
Socket(res)
};
// SAFETY: A valid socket is passed here.
unsafe {
set_reuse(socket.0);
}
// SAFETY: A valid socket fd is passed together with a valid SOCKADDR and its size.
unsafe {
let addr = SOCKADDR {
sa_family: AF_INET as _,
sin_port: u16::to_be(port),
in_addr: IN_ADDR {
S_un: IN_ADDR_0 {
S_addr: u32::from_ne_bytes(addr.octets()),
},
},
sin_zero: [0; 8],
};
let res = bind(socket.0, &addr, mem::size_of_val(&addr) as _);
if res < 0 {
return Err(io::Error::from_raw_os_error(WSAGetLastError()));
}
}
unsafe { Ok(UdpSocket::from_raw_socket(mem::ManuallyDrop::new(socket).0)) }
}
// Join multicast address for a given interface. // Join multicast address for a given interface.
pub fn join_multicast_v4( pub fn join_multicast_v4(
socket: &UdpSocket, socket: &UdpSocket,
@ -679,7 +844,9 @@ mod imp {
/// Allow multiple sockets to bind to the same address / port. /// Allow multiple sockets to bind to the same address / port.
/// ///
/// This is best-effort and might not actually do anything. /// This is best-effort and might not actually do anything.
pub fn set_reuse(socket: &UdpSocket) { ///
/// SAFETY: Must be called with a valid socket.
unsafe fn set_reuse(socket: SOCKET) {
// SAFETY: SO_REUSEADDR takes an i32 value that can be 0/false or 1/true and // SAFETY: SO_REUSEADDR takes an i32 value that can be 0/false or 1/true and
// enables the given feature on the socket. // enables the given feature on the socket.
// //
@ -688,7 +855,7 @@ mod imp {
unsafe { unsafe {
let v = 1i32; let v = 1i32;
let res = setsockopt( let res = setsockopt(
socket.as_raw_socket(), socket,
SOL_SOCKET as i32, SOL_SOCKET as i32,
SO_REUSEADDR as i32, SO_REUSEADDR as i32,
&v as *const _ as *const _, &v as *const _ as *const _,
@ -716,7 +883,7 @@ mod test {
} }
#[test] #[test]
fn test_join_multicast() { fn test_create_socket_join_multicast() {
let ifaces = super::query_interfaces().unwrap(); let ifaces = super::query_interfaces().unwrap();
let iface = if ifaces.is_empty() { let iface = if ifaces.is_empty() {
return; return;
@ -724,12 +891,24 @@ mod test {
&ifaces[0] &ifaces[0]
}; };
let socket = std::net::UdpSocket::bind(std::net::SocketAddr::from(( let socket = super::create_udp_socket(&std::net::Ipv4Addr::UNSPECIFIED, 0).unwrap();
super::join_multicast_v4(&socket, &std::net::Ipv4Addr::new(224, 0, 0, 1), iface).unwrap();
let local_addr = socket.local_addr().unwrap();
let socket2 = std::net::UdpSocket::bind(std::net::SocketAddr::from((
std::net::Ipv4Addr::UNSPECIFIED, std::net::Ipv4Addr::UNSPECIFIED,
0, 0,
))) )))
.unwrap(); .unwrap();
super::set_reuse(&socket); socket2
super::join_multicast_v4(&socket, &std::net::Ipv4Addr::new(224, 0, 0, 1), iface).unwrap(); .send_to(
&[1, 2, 3, 4],
std::net::SocketAddr::from((std::net::Ipv4Addr::LOCALHOST, local_addr.port())),
)
.unwrap();
let mut buf = [0u8; 4];
socket.recv(&mut buf).unwrap();
assert_eq!(buf, [1, 2, 3, 4]);
} }
} }