mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-02-16 12:55:13 +00:00
generic: threadshare: port to polling 3.1.0
Also use `rustix` & `std::ffi` instead of `libc`. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1336>
This commit is contained in:
parent
fe4273ca2a
commit
436798b360
9 changed files with 551 additions and 112 deletions
|
@ -10,10 +10,10 @@ rust-version = "1.70"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-task = "4.3.0"
|
async-task = "4.3.0"
|
||||||
concurrent-queue = "2"
|
cfg-if = "1"
|
||||||
|
concurrent-queue = "2.2.0"
|
||||||
flume = "0.11"
|
flume = "0.11"
|
||||||
futures = "0.3.21"
|
futures = "0.3.28"
|
||||||
libc = "0.2"
|
|
||||||
gio = { git = "https://github.com/gtk-rs/gtk-rs-core" }
|
gio = { git = "https://github.com/gtk-rs/gtk-rs-core" }
|
||||||
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||||
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||||
|
@ -21,8 +21,9 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst
|
||||||
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||||
once_cell = "1"
|
once_cell = "1"
|
||||||
pin-project-lite = "0.2.0"
|
pin-project-lite = "0.2.0"
|
||||||
polling = "2.2.0"
|
polling = "3.1.0"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
rustix = { version = "0.38.2", default-features = false, features = ["std", "fs", "net"] }
|
||||||
slab = "0.4.7"
|
slab = "0.4.7"
|
||||||
socket2 = {features = ["all"], version = "0.5"}
|
socket2 = {features = ["all"], version = "0.5"}
|
||||||
waker-fn = "1.1"
|
waker-fn = "1.1"
|
||||||
|
|
|
@ -21,7 +21,7 @@ use glib::ffi::{gboolean, gpointer, GList, GType};
|
||||||
use gst::glib;
|
use gst::glib;
|
||||||
|
|
||||||
use gst::ffi::GstClockTime;
|
use gst::ffi::GstClockTime;
|
||||||
use libc::{c_int, c_uint, c_ulonglong, c_ushort, c_void};
|
use std::ffi::{c_int, c_uint, c_ulonglong, c_ushort, c_void};
|
||||||
|
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
|
|
|
@ -18,20 +18,20 @@ use std::task::{Context, Poll};
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
use std::{
|
use std::{
|
||||||
os::unix::io::{AsRawFd, RawFd},
|
os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd},
|
||||||
os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
|
os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
|
||||||
path::Path,
|
path::Path,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
use std::os::windows::io::{AsRawSocket, RawSocket};
|
use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
|
||||||
|
|
||||||
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
|
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
|
||||||
|
|
||||||
use crate::runtime::RUNTIME_CAT;
|
use crate::runtime::RUNTIME_CAT;
|
||||||
|
|
||||||
use super::scheduler::{self, Scheduler};
|
use super::scheduler::{self, Scheduler};
|
||||||
use super::{Reactor, Readable, ReadableOwned, Source, Writable, WritableOwned};
|
use super::{Reactor, Readable, ReadableOwned, Registration, Source, Writable, WritableOwned};
|
||||||
|
|
||||||
/// Async adapter for I/O types.
|
/// Async adapter for I/O types.
|
||||||
///
|
///
|
||||||
|
@ -103,11 +103,11 @@ pub struct Async<T: Send + 'static> {
|
||||||
impl<T: Send + 'static> Unpin for Async<T> {}
|
impl<T: Send + 'static> Unpin for Async<T> {}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
impl<T: AsRawFd + Send + 'static> Async<T> {
|
impl<T: AsFd + Send + 'static> Async<T> {
|
||||||
/// Creates an async I/O handle.
|
/// Creates an async I/O handle.
|
||||||
///
|
///
|
||||||
/// This method will put the handle in non-blocking mode and register it in
|
/// This method will put the handle in non-blocking mode and register it in
|
||||||
/// [epoll]/[kqueue]/[event ports]/[wepoll].
|
/// [epoll]/[kqueue]/[event ports]/[IOCP].
|
||||||
///
|
///
|
||||||
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
|
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
|
||||||
/// `AsRawSocket`.
|
/// `AsRawSocket`.
|
||||||
|
@ -115,22 +115,33 @@ impl<T: AsRawFd + Send + 'static> Async<T> {
|
||||||
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
|
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
|
||||||
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
|
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
|
||||||
/// [event ports]: https://illumos.org/man/port_create
|
/// [event ports]: https://illumos.org/man/port_create
|
||||||
/// [wepoll]: https://github.com/piscisaureus/wepoll
|
/// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
|
||||||
pub fn new(io: T) -> io::Result<Async<T>> {
|
pub fn new(io: T) -> io::Result<Async<T>> {
|
||||||
let fd = io.as_raw_fd();
|
|
||||||
|
|
||||||
// Put the file descriptor in non-blocking mode.
|
// Put the file descriptor in non-blocking mode.
|
||||||
unsafe {
|
let fd = io.as_fd();
|
||||||
let mut res = libc::fcntl(fd, libc::F_GETFL);
|
cfg_if::cfg_if! {
|
||||||
if res != -1 {
|
// ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
|
||||||
res = libc::fcntl(fd, libc::F_SETFL, res | libc::O_NONBLOCK);
|
// for now, as with the standard library, because it seems to behave
|
||||||
}
|
// differently depending on the platform.
|
||||||
if res == -1 {
|
// https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
|
||||||
return Err(io::Error::last_os_error());
|
// https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
|
||||||
|
// https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
|
||||||
|
if #[cfg(target_os = "linux")] {
|
||||||
|
rustix::io::ioctl_fionbio(fd, true)?;
|
||||||
|
} else {
|
||||||
|
let previous = rustix::fs::fcntl_getfl(fd)?;
|
||||||
|
let new = previous | rustix::fs::OFlags::NONBLOCK;
|
||||||
|
if new != previous {
|
||||||
|
rustix::fs::fcntl_setfl(fd, new)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let source = Reactor::with_mut(|reactor| reactor.insert_io(fd))?;
|
// SAFETY: It is impossible to drop the I/O source while it is registered through
|
||||||
|
// this type.
|
||||||
|
let registration = unsafe { Registration::new(fd) };
|
||||||
|
|
||||||
|
let source = Reactor::with_mut(|reactor| reactor.insert_io(registration))?;
|
||||||
Ok(Async {
|
Ok(Async {
|
||||||
source,
|
source,
|
||||||
io: Some(io),
|
io: Some(io),
|
||||||
|
@ -144,16 +155,41 @@ impl<T: AsRawFd + Send + 'static> Async<T> {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
impl<T: AsRawFd + Send + 'static> AsRawFd for Async<T> {
|
impl<T: AsRawFd + Send + 'static> AsRawFd for Async<T> {
|
||||||
fn as_raw_fd(&self) -> RawFd {
|
fn as_raw_fd(&self) -> RawFd {
|
||||||
self.source.raw
|
self.get_ref().as_raw_fd()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
impl<T: AsFd + Send + 'static> AsFd for Async<T> {
|
||||||
|
fn as_fd(&self) -> BorrowedFd<'_> {
|
||||||
|
self.get_ref().as_fd()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
impl<T: AsFd + From<OwnedFd> + Send + 'static> TryFrom<OwnedFd> for Async<T> {
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
fn try_from(value: OwnedFd) -> Result<Self, Self::Error> {
|
||||||
|
Async::new(value.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
impl<T: Into<OwnedFd> + Send + 'static> TryFrom<Async<T>> for OwnedFd {
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
|
||||||
|
value.into_inner().map(Into::into)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
impl<T: AsRawSocket + Send + 'static> Async<T> {
|
impl<T: AsSocket + Send + 'static> Async<T> {
|
||||||
/// Creates an async I/O handle.
|
/// Creates an async I/O handle.
|
||||||
///
|
///
|
||||||
/// This method will put the handle in non-blocking mode and register it in
|
/// This method will put the handle in non-blocking mode and register it in
|
||||||
/// [epoll]/[kqueue]/[event ports]/[wepoll].
|
/// [epoll]/[kqueue]/[event ports]/[IOCP].
|
||||||
///
|
///
|
||||||
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
|
/// On Unix systems, the handle must implement `AsRawFd`, while on Windows it must implement
|
||||||
/// `AsRawSocket`.
|
/// `AsRawSocket`.
|
||||||
|
@ -161,27 +197,24 @@ impl<T: AsRawSocket + Send + 'static> Async<T> {
|
||||||
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
|
/// [epoll]: https://en.wikipedia.org/wiki/Epoll
|
||||||
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
|
/// [kqueue]: https://en.wikipedia.org/wiki/Kqueue
|
||||||
/// [event ports]: https://illumos.org/man/port_create
|
/// [event ports]: https://illumos.org/man/port_create
|
||||||
/// [wepoll]: https://github.com/piscisaureus/wepoll
|
/// [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
|
||||||
pub fn new(io: T) -> io::Result<Async<T>> {
|
pub fn new(io: T) -> io::Result<Async<T>> {
|
||||||
let sock = io.as_raw_socket();
|
let borrowed = io.as_socket();
|
||||||
|
|
||||||
// Put the socket in non-blocking mode.
|
// Put the socket in non-blocking mode.
|
||||||
unsafe {
|
//
|
||||||
use winapi::ctypes;
|
// Safety: We assume `as_raw_socket()` returns a valid fd. When we can
|
||||||
use winapi::um::winsock2;
|
// depend on Rust >= 1.63, where `AsFd` is stabilized, and when
|
||||||
|
// `TimerFd` implements it, we can remove this unsafe and simplify this.
|
||||||
|
rustix::io::ioctl_fionbio(borrowed, true)?;
|
||||||
|
|
||||||
let mut nonblocking = true as ctypes::c_ulong;
|
// Create the registration.
|
||||||
let res = winsock2::ioctlsocket(
|
//
|
||||||
sock as winsock2::SOCKET,
|
// SAFETY: It is impossible to drop the I/O source while it is registered through
|
||||||
winsock2::FIONBIO,
|
// this type.
|
||||||
&mut nonblocking,
|
let registration = unsafe { Registration::new(borrowed) };
|
||||||
);
|
|
||||||
if res != 0 {
|
|
||||||
return Err(io::Error::last_os_error());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let source = Reactor::with_mut(|reactor| reactor.insert_io(sock))?;
|
let source = Reactor::with_mut(|reactor| reactor.insert_io(registration))?;
|
||||||
Ok(Async {
|
Ok(Async {
|
||||||
source,
|
source,
|
||||||
io: Some(io),
|
io: Some(io),
|
||||||
|
@ -195,7 +228,32 @@ impl<T: AsRawSocket + Send + 'static> Async<T> {
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
impl<T: AsRawSocket + Send + 'static> AsRawSocket for Async<T> {
|
impl<T: AsRawSocket + Send + 'static> AsRawSocket for Async<T> {
|
||||||
fn as_raw_socket(&self) -> RawSocket {
|
fn as_raw_socket(&self) -> RawSocket {
|
||||||
self.source.raw
|
self.get_ref().as_raw_socket()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(windows)]
|
||||||
|
impl<T: AsSocket + Send + 'static> AsSocket for Async<T> {
|
||||||
|
fn as_socket(&self) -> BorrowedSocket<'_> {
|
||||||
|
self.get_ref().as_socket()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(windows)]
|
||||||
|
impl<T: AsSocket + From<OwnedSocket> + Send + 'static> TryFrom<OwnedSocket> for Async<T> {
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> {
|
||||||
|
Async::new(value.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(windows)]
|
||||||
|
impl<T: Into<OwnedSocket> + Send + 'static> TryFrom<Async<T>> for OwnedSocket {
|
||||||
|
type Error = io::Error;
|
||||||
|
|
||||||
|
fn try_from(value: Async<T>) -> Result<Self, Self::Error> {
|
||||||
|
value.into_inner().map(Into::into)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,7 +438,12 @@ impl<T: Send + 'static> Drop for Async<T> {
|
||||||
sched.spawn_and_unpark(async move {
|
sched.spawn_and_unpark(async move {
|
||||||
Reactor::with_mut(|reactor| {
|
Reactor::with_mut(|reactor| {
|
||||||
if let Err(err) = reactor.remove_io(&source) {
|
if let Err(err) = reactor.remove_io(&source) {
|
||||||
gst::error!(RUNTIME_CAT, "Failed to remove fd {}: {}", source.raw, err);
|
gst::error!(
|
||||||
|
RUNTIME_CAT,
|
||||||
|
"Failed to remove fd {:?}: {}",
|
||||||
|
source.registration,
|
||||||
|
err
|
||||||
|
);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
drop(io);
|
drop(io);
|
||||||
|
@ -392,6 +455,94 @@ impl<T: Send + 'static> Drop for Async<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Types whose I/O trait implementations do not drop the underlying I/O source.
|
||||||
|
///
|
||||||
|
/// The resource contained inside of the [`Async`] cannot be invalidated. This invalidation can
|
||||||
|
/// happen if the inner resource (the [`TcpStream`], [`UnixListener`] or other `T`) is moved out
|
||||||
|
/// and dropped before the [`Async`]. Because of this, functions that grant mutable access to
|
||||||
|
/// the inner type are unsafe, as there is no way to guarantee that the source won't be dropped
|
||||||
|
/// and a dangling handle won't be left behind.
|
||||||
|
///
|
||||||
|
/// Unfortunately this extends to implementations of [`Read`] and [`Write`]. Since methods on those
|
||||||
|
/// traits take `&mut`, there is no guarantee that the implementor of those traits won't move the
|
||||||
|
/// source out while the method is being run.
|
||||||
|
///
|
||||||
|
/// This trait is an antidote to this predicament. By implementing this trait, the user pledges
|
||||||
|
/// that using any I/O traits won't destroy the source. This way, [`Async`] can implement the
|
||||||
|
/// `async` version of these I/O traits, like [`AsyncRead`] and [`AsyncWrite`].
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// Any I/O trait implementations for this type must not drop the underlying I/O source. Traits
|
||||||
|
/// affected by this trait include [`Read`], [`Write`], [`Seek`] and [`BufRead`].
|
||||||
|
///
|
||||||
|
/// This trait is implemented by default on top of `libstd` types. In addition, it is implemented
|
||||||
|
/// for immutable reference types, as it is impossible to invalidate any outstanding references
|
||||||
|
/// while holding an immutable reference, even with interior mutability. As Rust's current pinning
|
||||||
|
/// system relies on similar guarantees, I believe that this approach is robust.
|
||||||
|
///
|
||||||
|
/// [`BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
|
||||||
|
/// [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
|
||||||
|
/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
|
||||||
|
/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
|
||||||
|
///
|
||||||
|
/// [`AsyncRead`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncRead.html
|
||||||
|
/// [`AsyncWrite`]: https://docs.rs/futures-io/latest/futures_io/trait.AsyncWrite.html
|
||||||
|
pub unsafe trait IoSafe {}
|
||||||
|
|
||||||
|
/// Reference types can't be mutated.
|
||||||
|
///
|
||||||
|
/// The worst thing that can happen is that external state is used to change what kind of pointer
|
||||||
|
/// `as_fd()` returns. For instance:
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # #[cfg(unix)] {
|
||||||
|
/// use std::cell::Cell;
|
||||||
|
/// use std::net::TcpStream;
|
||||||
|
/// use std::os::unix::io::{AsFd, BorrowedFd};
|
||||||
|
///
|
||||||
|
/// struct Bar {
|
||||||
|
/// flag: Cell<bool>,
|
||||||
|
/// a: TcpStream,
|
||||||
|
/// b: TcpStream
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// impl AsFd for Bar {
|
||||||
|
/// fn as_fd(&self) -> BorrowedFd<'_> {
|
||||||
|
/// if self.flag.replace(!self.flag.get()) {
|
||||||
|
/// self.a.as_fd()
|
||||||
|
/// } else {
|
||||||
|
/// self.b.as_fd()
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// We solve this problem by only calling `as_fd()` once to get the original source. Implementations
|
||||||
|
/// like this are considered buggy (but not unsound) and are thus not really supported by `async-io`.
|
||||||
|
unsafe impl<T: ?Sized> IoSafe for &T {}
|
||||||
|
|
||||||
|
// Can be implemented on top of libstd types.
|
||||||
|
unsafe impl IoSafe for std::fs::File {}
|
||||||
|
unsafe impl IoSafe for std::io::Stderr {}
|
||||||
|
unsafe impl IoSafe for std::io::Stdin {}
|
||||||
|
unsafe impl IoSafe for std::io::Stdout {}
|
||||||
|
unsafe impl IoSafe for std::io::StderrLock<'_> {}
|
||||||
|
unsafe impl IoSafe for std::io::StdinLock<'_> {}
|
||||||
|
unsafe impl IoSafe for std::io::StdoutLock<'_> {}
|
||||||
|
unsafe impl IoSafe for std::net::TcpStream {}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
unsafe impl IoSafe for std::os::unix::net::UnixStream {}
|
||||||
|
|
||||||
|
unsafe impl<T: IoSafe + Read> IoSafe for std::io::BufReader<T> {}
|
||||||
|
unsafe impl<T: IoSafe + Write> IoSafe for std::io::BufWriter<T> {}
|
||||||
|
unsafe impl<T: IoSafe + Write> IoSafe for std::io::LineWriter<T> {}
|
||||||
|
unsafe impl<T: IoSafe + ?Sized> IoSafe for &mut T {}
|
||||||
|
unsafe impl<T: IoSafe + ?Sized> IoSafe for Box<T> {}
|
||||||
|
unsafe impl<T: Clone + IoSafe + ?Sized> IoSafe for std::borrow::Cow<'_, T> {}
|
||||||
|
|
||||||
impl<T: Read + Send + 'static> AsyncRead for Async<T> {
|
impl<T: Read + Send + 'static> AsyncRead for Async<T> {
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
|
@ -422,6 +573,8 @@ impl<T: Read + Send + 'static> AsyncRead for Async<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Since this is through a reference, we can't mutate the inner I/O source.
|
||||||
|
// Therefore this is safe!
|
||||||
impl<T: Send + 'static> AsyncRead for &Async<T>
|
impl<T: Send + 'static> AsyncRead for &Async<T>
|
||||||
where
|
where
|
||||||
for<'a> &'a T: Read,
|
for<'a> &'a T: Read,
|
||||||
|
@ -886,7 +1039,7 @@ fn connect(addr: SockAddr, domain: Domain, protocol: Option<Protocol>) -> io::Re
|
||||||
match socket.connect(&addr) {
|
match socket.connect(&addr) {
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
|
Err(err) if err.raw_os_error() == Some(rustix::io::Errno::INPROGRESS.raw_os_error()) => {}
|
||||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||||
Err(err) => return Err(err),
|
Err(err) => return Err(err),
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ mod join;
|
||||||
pub use join::JoinHandle;
|
pub use join::JoinHandle;
|
||||||
|
|
||||||
pub mod reactor;
|
pub mod reactor;
|
||||||
use reactor::{Reactor, Readable, ReadableOwned, Source, Writable, WritableOwned};
|
use reactor::{Reactor, Readable, ReadableOwned, Registration, Source, Writable, WritableOwned};
|
||||||
|
|
||||||
// We need the `Mutex<bool>` to work in pair with `Condvar`.
|
// We need the `Mutex<bool>` to work in pair with `Condvar`.
|
||||||
#[allow(clippy::mutex_atomic)]
|
#[allow(clippy::mutex_atomic)]
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
|
|
||||||
use concurrent_queue::ConcurrentQueue;
|
use concurrent_queue::ConcurrentQueue;
|
||||||
use futures::ready;
|
use futures::ready;
|
||||||
use polling::{Event, Poller};
|
use polling::{Event, Events, Poller};
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
|
||||||
use std::borrow::Borrow;
|
use std::borrow::Borrow;
|
||||||
|
@ -18,10 +18,6 @@ use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
#[cfg(unix)]
|
|
||||||
use std::os::unix::io::RawFd;
|
|
||||||
#[cfg(windows)]
|
|
||||||
use std::os::windows::io::RawSocket;
|
|
||||||
use std::panic;
|
use std::panic;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
@ -29,6 +25,31 @@ use std::sync::{Arc, Mutex};
|
||||||
use std::task::{Context, Poll, Waker};
|
use std::task::{Context, Poll, Waker};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
// Choose the proper implementation of `Registration` based on the target platform.
|
||||||
|
cfg_if::cfg_if! {
|
||||||
|
if #[cfg(windows)] {
|
||||||
|
mod windows;
|
||||||
|
pub use windows::Registration;
|
||||||
|
} else if #[cfg(any(
|
||||||
|
target_os = "macos",
|
||||||
|
target_os = "ios",
|
||||||
|
target_os = "tvos",
|
||||||
|
target_os = "watchos",
|
||||||
|
target_os = "freebsd",
|
||||||
|
target_os = "netbsd",
|
||||||
|
target_os = "openbsd",
|
||||||
|
target_os = "dragonfly",
|
||||||
|
))] {
|
||||||
|
mod kqueue;
|
||||||
|
pub use kqueue::Registration;
|
||||||
|
} else if #[cfg(unix)] {
|
||||||
|
mod unix;
|
||||||
|
pub use unix::Registration;
|
||||||
|
} else {
|
||||||
|
compile_error!("unsupported platform");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
use crate::runtime::{Async, RUNTIME_CAT};
|
use crate::runtime::{Async, RUNTIME_CAT};
|
||||||
|
|
||||||
const READ: usize = 0;
|
const READ: usize = 0;
|
||||||
|
@ -71,7 +92,7 @@ pub(super) struct Reactor {
|
||||||
/// Temporary storage for I/O events when polling the reactor.
|
/// Temporary storage for I/O events when polling the reactor.
|
||||||
///
|
///
|
||||||
/// Holding a lock on this event list implies the exclusive right to poll I/O.
|
/// Holding a lock on this event list implies the exclusive right to poll I/O.
|
||||||
events: Vec<Event>,
|
events: Events,
|
||||||
|
|
||||||
/// An ordered map of registered regular timers.
|
/// An ordered map of registered regular timers.
|
||||||
///
|
///
|
||||||
|
@ -106,7 +127,7 @@ impl Reactor {
|
||||||
half_max_throttling: max_throttling / 2,
|
half_max_throttling: max_throttling / 2,
|
||||||
wakers: Vec::new(),
|
wakers: Vec::new(),
|
||||||
sources: Slab::new(),
|
sources: Slab::new(),
|
||||||
events: Vec::new(),
|
events: Events::new(),
|
||||||
timers: BTreeMap::new(),
|
timers: BTreeMap::new(),
|
||||||
after_timers: BTreeMap::new(),
|
after_timers: BTreeMap::new(),
|
||||||
timer_ops: ConcurrentQueue::bounded(1000),
|
timer_ops: ConcurrentQueue::bounded(1000),
|
||||||
|
@ -210,16 +231,12 @@ impl Reactor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Registers an I/O source in the reactor.
|
/// Registers an I/O source in the reactor.
|
||||||
pub fn insert_io(
|
pub fn insert_io(&mut self, raw: Registration) -> io::Result<Arc<Source>> {
|
||||||
&mut self,
|
|
||||||
#[cfg(unix)] raw: RawFd,
|
|
||||||
#[cfg(windows)] raw: RawSocket,
|
|
||||||
) -> io::Result<Arc<Source>> {
|
|
||||||
// Create an I/O source for this file descriptor.
|
// Create an I/O source for this file descriptor.
|
||||||
let source = {
|
let source = {
|
||||||
let key = self.sources.vacant_entry().key();
|
let key = self.sources.vacant_entry().key();
|
||||||
let source = Arc::new(Source {
|
let source = Arc::new(Source {
|
||||||
raw,
|
registration: raw,
|
||||||
key,
|
key,
|
||||||
state: Default::default(),
|
state: Default::default(),
|
||||||
});
|
});
|
||||||
|
@ -228,11 +245,11 @@ impl Reactor {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Register the file descriptor.
|
// Register the file descriptor.
|
||||||
if let Err(err) = self.poller.add(raw, Event::none(source.key)) {
|
if let Err(err) = source.registration.add(&self.poller, source.key) {
|
||||||
gst::error!(
|
gst::error!(
|
||||||
crate::runtime::RUNTIME_CAT,
|
crate::runtime::RUNTIME_CAT,
|
||||||
"Failed to register fd {}: {}",
|
"Failed to register fd {:?}: {}",
|
||||||
source.raw,
|
source.registration,
|
||||||
err,
|
err,
|
||||||
);
|
);
|
||||||
self.sources.remove(source.key);
|
self.sources.remove(source.key);
|
||||||
|
@ -245,7 +262,7 @@ impl Reactor {
|
||||||
/// Deregisters an I/O source from the reactor.
|
/// Deregisters an I/O source from the reactor.
|
||||||
pub fn remove_io(&mut self, source: &Source) -> io::Result<()> {
|
pub fn remove_io(&mut self, source: &Source) -> io::Result<()> {
|
||||||
self.sources.remove(source.key);
|
self.sources.remove(source.key);
|
||||||
self.poller.delete(source.raw)
|
source.registration.delete(&self.poller)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Registers a regular timer in the reactor.
|
/// Registers a regular timer in the reactor.
|
||||||
|
@ -414,14 +431,16 @@ impl Reactor {
|
||||||
// e.g. we were previously interested in both readability and writability,
|
// e.g. we were previously interested in both readability and writability,
|
||||||
// but only one of them was emitted.
|
// but only one of them was emitted.
|
||||||
if !state[READ].is_empty() || !state[WRITE].is_empty() {
|
if !state[READ].is_empty() || !state[WRITE].is_empty() {
|
||||||
self.poller.modify(
|
// Create the event that we are interested in.
|
||||||
source.raw,
|
let event = {
|
||||||
Event {
|
let mut event = Event::none(source.key);
|
||||||
key: source.key,
|
event.readable = !state[READ].is_empty();
|
||||||
readable: !state[READ].is_empty(),
|
event.writable = !state[WRITE].is_empty();
|
||||||
writable: !state[WRITE].is_empty(),
|
event
|
||||||
},
|
};
|
||||||
)?;
|
|
||||||
|
// Register interest in this event.
|
||||||
|
source.registration.modify(&self.poller, event)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -493,13 +512,8 @@ enum TimerOp {
|
||||||
/// A registered source of I/O events.
|
/// A registered source of I/O events.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(super) struct Source {
|
pub(super) struct Source {
|
||||||
/// Raw file descriptor on Unix platforms.
|
/// This source's registration into the reactor.
|
||||||
#[cfg(unix)]
|
pub(super) registration: Registration,
|
||||||
pub(super) raw: RawFd,
|
|
||||||
|
|
||||||
/// Raw socket handle on Windows.
|
|
||||||
#[cfg(windows)]
|
|
||||||
pub(super) raw: RawSocket,
|
|
||||||
|
|
||||||
/// The key of this source obtained during registration.
|
/// The key of this source obtained during registration.
|
||||||
key: usize,
|
key: usize,
|
||||||
|
@ -590,14 +604,15 @@ impl Source {
|
||||||
|
|
||||||
// Update interest in this I/O handle.
|
// Update interest in this I/O handle.
|
||||||
if was_empty {
|
if was_empty {
|
||||||
reactor.poller.modify(
|
let event = {
|
||||||
self.raw,
|
let mut event = Event::none(self.key);
|
||||||
Event {
|
event.readable = !state[READ].is_empty();
|
||||||
key: self.key,
|
event.writable = !state[WRITE].is_empty();
|
||||||
readable: !state[READ].is_empty(),
|
event
|
||||||
writable: !state[WRITE].is_empty(),
|
};
|
||||||
},
|
|
||||||
)?;
|
// Register interest in it.
|
||||||
|
self.registration.modify(&reactor.poller, event)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
|
@ -645,7 +660,11 @@ impl<T: Send + 'static> Future for Readable<'_, T> {
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
ready!(Pin::new(&mut self.0).poll(cx))?;
|
ready!(Pin::new(&mut self.0).poll(cx))?;
|
||||||
gst::trace!(RUNTIME_CAT, "readable: fd={}", self.0.handle.source.raw);
|
gst::trace!(
|
||||||
|
RUNTIME_CAT,
|
||||||
|
"readable: fd={:?}",
|
||||||
|
self.0.handle.source.registration
|
||||||
|
);
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -667,8 +686,8 @@ impl<T: Send + 'static> Future for ReadableOwned<T> {
|
||||||
ready!(Pin::new(&mut self.0).poll(cx))?;
|
ready!(Pin::new(&mut self.0).poll(cx))?;
|
||||||
gst::trace!(
|
gst::trace!(
|
||||||
RUNTIME_CAT,
|
RUNTIME_CAT,
|
||||||
"readable_owned: fd={}",
|
"readable_owned: fd={:?}",
|
||||||
self.0.handle.source.raw
|
self.0.handle.source.registration
|
||||||
);
|
);
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
@ -689,7 +708,11 @@ impl<T: Send + 'static> Future for Writable<'_, T> {
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
ready!(Pin::new(&mut self.0).poll(cx))?;
|
ready!(Pin::new(&mut self.0).poll(cx))?;
|
||||||
gst::trace!(RUNTIME_CAT, "writable: fd={}", self.0.handle.source.raw);
|
gst::trace!(
|
||||||
|
RUNTIME_CAT,
|
||||||
|
"writable: fd={:?}",
|
||||||
|
self.0.handle.source.registration
|
||||||
|
);
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -711,8 +734,8 @@ impl<T: Send + 'static> Future for WritableOwned<T> {
|
||||||
ready!(Pin::new(&mut self.0).poll(cx))?;
|
ready!(Pin::new(&mut self.0).poll(cx))?;
|
||||||
gst::trace!(
|
gst::trace!(
|
||||||
RUNTIME_CAT,
|
RUNTIME_CAT,
|
||||||
"writable_owned: fd={}",
|
"writable_owned: fd={:?}",
|
||||||
self.0.handle.source.raw
|
self.0.handle.source.registration
|
||||||
);
|
);
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
@ -780,14 +803,19 @@ impl<H: Borrow<Async<T>> + Clone, T: Send + 'static> Future for Ready<H, T> {
|
||||||
|
|
||||||
// Update interest in this I/O handle.
|
// Update interest in this I/O handle.
|
||||||
if was_empty {
|
if was_empty {
|
||||||
reactor.poller.modify(
|
// Create the event that we are interested in.
|
||||||
handle.borrow().source.raw,
|
let event = {
|
||||||
Event {
|
let mut event = Event::none(handle.borrow().source.key);
|
||||||
key: handle.borrow().source.key,
|
event.readable = !state[READ].is_empty();
|
||||||
readable: !state[READ].is_empty(),
|
event.writable = !state[WRITE].is_empty();
|
||||||
writable: !state[WRITE].is_empty(),
|
event
|
||||||
},
|
};
|
||||||
)?;
|
|
||||||
|
handle
|
||||||
|
.borrow()
|
||||||
|
.source
|
||||||
|
.registration
|
||||||
|
.modify(&reactor.poller, event)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
|
|
108
generic/threadshare/src/runtime/executor/reactor/kqueue.rs
Normal file
108
generic/threadshare/src/runtime/executor/reactor/kqueue.rs
Normal file
|
@ -0,0 +1,108 @@
|
||||||
|
// SPDX-License-Identifier: MIT OR Apache-2.0
|
||||||
|
|
||||||
|
use crate::os::kqueue::Signal;
|
||||||
|
|
||||||
|
use polling::os::kqueue::{PollerKqueueExt, Process, ProcessOps, Signal as PollSignal};
|
||||||
|
use polling::{Event, PollMode, Poller};
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
use std::io::Result;
|
||||||
|
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
|
||||||
|
use std::process::Child;
|
||||||
|
|
||||||
|
/// The raw registration into the reactor.
|
||||||
|
///
|
||||||
|
/// This needs to be public, since it is technically exposed through the `QueueableSealed` trait.
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub enum Registration {
|
||||||
|
/// Raw file descriptor for readability/writability.
|
||||||
|
///
|
||||||
|
///
|
||||||
|
/// # Invariant
|
||||||
|
///
|
||||||
|
/// This describes a valid file descriptor that has not been `close`d. It will not be
|
||||||
|
/// closed while this object is alive.
|
||||||
|
Fd(RawFd),
|
||||||
|
|
||||||
|
/// Raw signal number for signal delivery.
|
||||||
|
Signal(Signal),
|
||||||
|
|
||||||
|
/// Process for process termination.
|
||||||
|
Process(Child),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Registration {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Fd(raw) => fmt::Debug::fmt(raw, f),
|
||||||
|
Self::Signal(signal) => fmt::Debug::fmt(signal, f),
|
||||||
|
Self::Process(process) => fmt::Debug::fmt(process, f),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Registration {
|
||||||
|
/// Add this file descriptor into the reactor.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// The provided file descriptor must be valid and not be closed while this object is alive.
|
||||||
|
pub(crate) unsafe fn new(f: impl AsFd) -> Self {
|
||||||
|
Self::Fd(f.as_fd().as_raw_fd())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers the object into the reactor.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> {
|
||||||
|
match self {
|
||||||
|
Self::Fd(raw) => {
|
||||||
|
// SAFETY: This object's existence validates the invariants of Poller::add
|
||||||
|
unsafe { poller.add(*raw, Event::none(token)) }
|
||||||
|
}
|
||||||
|
Self::Signal(signal) => {
|
||||||
|
poller.add_filter(PollSignal(signal.0), token, PollMode::Oneshot)
|
||||||
|
}
|
||||||
|
Self::Process(process) => poller.add_filter(
|
||||||
|
unsafe { Process::new(process, ProcessOps::Exit) },
|
||||||
|
token,
|
||||||
|
PollMode::Oneshot,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Re-registers the object into the reactor.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> {
|
||||||
|
match self {
|
||||||
|
Self::Fd(raw) => {
|
||||||
|
// SAFETY: self.raw is a valid file descriptor
|
||||||
|
let fd = unsafe { BorrowedFd::borrow_raw(*raw) };
|
||||||
|
poller.modify(fd, interest)
|
||||||
|
}
|
||||||
|
Self::Signal(signal) => {
|
||||||
|
poller.modify_filter(PollSignal(signal.0), interest.key, PollMode::Oneshot)
|
||||||
|
}
|
||||||
|
Self::Process(process) => poller.modify_filter(
|
||||||
|
unsafe { Process::new(process, ProcessOps::Exit) },
|
||||||
|
interest.key,
|
||||||
|
PollMode::Oneshot,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deregisters the object from the reactor.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn delete(&self, poller: &Poller) -> Result<()> {
|
||||||
|
match self {
|
||||||
|
Self::Fd(raw) => {
|
||||||
|
// SAFETY: self.raw is a valid file descriptor
|
||||||
|
let fd = unsafe { BorrowedFd::borrow_raw(*raw) };
|
||||||
|
poller.delete(fd)
|
||||||
|
}
|
||||||
|
Self::Signal(signal) => poller.delete_filter(PollSignal(signal.0)),
|
||||||
|
Self::Process(process) => {
|
||||||
|
poller.delete_filter(unsafe { Process::new(process, ProcessOps::Exit) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
61
generic/threadshare/src/runtime/executor/reactor/unix.rs
Normal file
61
generic/threadshare/src/runtime/executor/reactor/unix.rs
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
// SPDX-License-Identifier: MIT OR Apache-2.0
|
||||||
|
|
||||||
|
use polling::{Event, Poller};
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
use std::io::Result;
|
||||||
|
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
|
||||||
|
|
||||||
|
/// The raw registration into the reactor.
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub struct Registration {
|
||||||
|
/// Raw file descriptor on Unix.
|
||||||
|
///
|
||||||
|
/// # Invariant
|
||||||
|
///
|
||||||
|
/// This describes a valid file descriptor that has not been `close`d. It will not be
|
||||||
|
/// closed while this object is alive.
|
||||||
|
raw: RawFd,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Registration {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
fmt::Debug::fmt(&self.raw, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Registration {
|
||||||
|
/// Add this file descriptor into the reactor.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// The provided file descriptor must be valid and not be closed while this object is alive.
|
||||||
|
pub(crate) unsafe fn new(f: impl AsFd) -> Self {
|
||||||
|
Self {
|
||||||
|
raw: f.as_fd().as_raw_fd(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers the object into the reactor.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> {
|
||||||
|
// SAFETY: This object's existence validates the invariants of Poller::add
|
||||||
|
unsafe { poller.add(self.raw, Event::none(token)) }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Re-registers the object into the reactor.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> {
|
||||||
|
// SAFETY: self.raw is a valid file descriptor
|
||||||
|
let fd = unsafe { BorrowedFd::borrow_raw(self.raw) };
|
||||||
|
poller.modify(fd, interest)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deregisters the object from the reactor.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn delete(&self, poller: &Poller) -> Result<()> {
|
||||||
|
// SAFETY: self.raw is a valid file descriptor
|
||||||
|
let fd = unsafe { BorrowedFd::borrow_raw(self.raw) };
|
||||||
|
poller.delete(fd)
|
||||||
|
}
|
||||||
|
}
|
60
generic/threadshare/src/runtime/executor/reactor/windows.rs
Normal file
60
generic/threadshare/src/runtime/executor/reactor/windows.rs
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
// SPDX-License-Identifier: MIT OR Apache-2.0
|
||||||
|
|
||||||
|
use polling::{Event, Poller};
|
||||||
|
use std::fmt;
|
||||||
|
use std::io::Result;
|
||||||
|
use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, RawSocket};
|
||||||
|
|
||||||
|
/// The raw registration into the reactor.
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub struct Registration {
|
||||||
|
/// Raw socket handle on Windows.
|
||||||
|
///
|
||||||
|
/// # Invariant
|
||||||
|
///
|
||||||
|
/// This describes a valid socket that has not been `close`d. It will not be
|
||||||
|
/// closed while this object is alive.
|
||||||
|
raw: RawSocket,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Registration {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
fmt::Debug::fmt(&self.raw, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Registration {
|
||||||
|
/// Add this file descriptor into the reactor.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// The provided file descriptor must be valid and not be closed while this object is alive.
|
||||||
|
pub(crate) unsafe fn new(f: impl AsSocket) -> Self {
|
||||||
|
Self {
|
||||||
|
raw: f.as_socket().as_raw_socket(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers the object into the reactor.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> {
|
||||||
|
// SAFETY: This object's existence validates the invariants of Poller::add
|
||||||
|
unsafe { poller.add(self.raw, Event::none(token)) }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Re-registers the object into the reactor.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> {
|
||||||
|
// SAFETY: self.raw is a valid file descriptor
|
||||||
|
let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) };
|
||||||
|
poller.modify(fd, interest)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deregisters the object from the reactor.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn delete(&self, poller: &Poller) -> Result<()> {
|
||||||
|
// SAFETY: self.raw is a valid file descriptor
|
||||||
|
let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) };
|
||||||
|
poller.delete(fd)
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,8 +25,6 @@ use gst::prelude::*;
|
||||||
|
|
||||||
use gst::glib::once_cell::sync::Lazy;
|
use gst::glib::once_cell::sync::Lazy;
|
||||||
|
|
||||||
use gio::prelude::*;
|
|
||||||
|
|
||||||
use std::error;
|
use std::error;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
@ -222,31 +220,57 @@ impl GioSocketWrapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(any(
|
||||||
pub fn set_tos(&self, qos_dscp: i32) -> Result<(), glib::Error> {
|
bsd,
|
||||||
use libc::{IPPROTO_IP, IPPROTO_IPV6, IPV6_TCLASS, IP_TOS};
|
linux_like,
|
||||||
|
target_os = "aix",
|
||||||
|
target_os = "fuchsia",
|
||||||
|
target_os = "haiku",
|
||||||
|
target_env = "newlib"
|
||||||
|
))]
|
||||||
|
pub fn set_tos(&self, qos_dscp: i32) -> rustix::io::Result<()> {
|
||||||
|
use gio::prelude::*;
|
||||||
|
use rustix::net::sockopt;
|
||||||
|
|
||||||
let tos = (qos_dscp & 0x3f) << 2;
|
let tos = (qos_dscp & 0x3f) << 2;
|
||||||
|
|
||||||
let socket = self.as_socket();
|
let socket = self.as_socket();
|
||||||
|
|
||||||
socket.set_option(IPPROTO_IP, IP_TOS, tos)?;
|
sockopt::set_ip_tos(socket, tos)?;
|
||||||
|
|
||||||
if socket.family() == gio::SocketFamily::Ipv6 {
|
if socket.family() == gio::SocketFamily::Ipv6 {
|
||||||
socket.set_option(IPPROTO_IPV6, IPV6_TCLASS, tos)?;
|
sockopt::set_ipv6_tclass(socket, tos)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(any(
|
||||||
pub fn set_tos(&self, _qos_dscp: i32) -> Result<(), glib::Error> {
|
bsd,
|
||||||
|
linux_like,
|
||||||
|
target_os = "aix",
|
||||||
|
target_os = "fuchsia",
|
||||||
|
target_os = "haiku",
|
||||||
|
target_env = "newlib"
|
||||||
|
)))]
|
||||||
|
pub fn set_tos(&self, _qos_dscp: i32) -> rustix::io::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(not(windows))]
|
||||||
pub fn get<T: FromRawFd>(&self) -> T {
|
pub fn get<T: FromRawFd>(&self) -> T {
|
||||||
unsafe { FromRawFd::from_raw_fd(libc::dup(gio::ffi::g_socket_get_fd(self.socket))) }
|
unsafe {
|
||||||
|
let borrowed =
|
||||||
|
rustix::fd::BorrowedFd::borrow_raw(gio::ffi::g_socket_get_fd(self.socket));
|
||||||
|
|
||||||
|
let dupped = rustix::io::dup(borrowed).unwrap();
|
||||||
|
let res = FromRawFd::from_raw_fd(dupped.as_raw_fd());
|
||||||
|
|
||||||
|
// We transferred ownership to T so don't drop dupped
|
||||||
|
std::mem::forget(dupped);
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
|
@ -308,7 +332,7 @@ unsafe fn dup_socket(socket: usize) -> usize {
|
||||||
pub fn wrap_socket(socket: &Async<UdpSocket>) -> Result<GioSocketWrapper, gst::ErrorMessage> {
|
pub fn wrap_socket(socket: &Async<UdpSocket>) -> Result<GioSocketWrapper, gst::ErrorMessage> {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
unsafe {
|
unsafe {
|
||||||
let fd = libc::dup(socket.as_raw_fd());
|
let dupped = rustix::io::dup(socket).unwrap();
|
||||||
|
|
||||||
// This is unsafe because it allows us to share the fd between the socket and the
|
// This is unsafe because it allows us to share the fd between the socket and the
|
||||||
// GIO socket below, but safety of this is the job of the application
|
// GIO socket below, but safety of this is the job of the application
|
||||||
|
@ -319,14 +343,18 @@ pub fn wrap_socket(socket: &Async<UdpSocket>) -> Result<GioSocketWrapper, gst::E
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let fd = FdConverter(fd);
|
let fd = FdConverter(dupped.as_raw_fd());
|
||||||
|
|
||||||
let gio_socket = gio::Socket::from_fd(fd).map_err(|err| {
|
let gio_socket = gio::Socket::from_fd(fd);
|
||||||
|
// We transferred ownership to gio_socket so don't drop dupped
|
||||||
|
std::mem::forget(dupped);
|
||||||
|
let gio_socket = gio_socket.map_err(|err| {
|
||||||
gst::error_msg!(
|
gst::error_msg!(
|
||||||
gst::ResourceError::OpenWrite,
|
gst::ResourceError::OpenWrite,
|
||||||
["Failed to create wrapped GIO socket: {}", err]
|
["Failed to create wrapped GIO socket: {}", err]
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(GioSocketWrapper::new(&gio_socket))
|
Ok(GioSocketWrapper::new(&gio_socket))
|
||||||
}
|
}
|
||||||
#[cfg(windows)]
|
#[cfg(windows)]
|
||||||
|
|
Loading…
Reference in a new issue