From a45f944edda5c111159eab0915bccbeeaa60b556 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Tue, 14 Jun 2022 20:38:25 +0200 Subject: [PATCH] ts/async_wrapper: remove fd from reactor before dropping its handle The I/O handle was dropped prior to removing it from the reactor, which caused `Poller::delete` to fail due to an invalid file descriptor. This used to happen silently unless the same fd was added again, e.g. by changing states in the pipeline as follow: Null -> Playing -> Null -> Playing. In which case `Poller::add` failed due to an already existing file. This commit makes sure the fd is removed from the reactor prior to dropping the handle. In order to achieve this, a new task is spawned on the `Context` on which the I/O was originally registered, allowing it to access the proper `Reactor`. The I/O can then safely be dropped. Because the I/O handle is moved to the spawned future, this solution requires adding the `Send + 'static` bounds to the I/O handle used within the `Async` wrapper. This appears not too restrictive for existing implementations though. Other attempts were considered, but they would cause deadlocks. This new approach also solves a potential race condition where a fd could be re-registered in a `Reactor` before it was removed. --- generic/threadshare/Cargo.toml | 2 +- .../src/runtime/executor/async_wrapper.rs | 53 +++++++++++-------- .../src/runtime/executor/reactor.rs | 50 +++++++++-------- .../src/runtime/executor/scheduler.rs | 29 +--------- 4 files changed, 62 insertions(+), 72 deletions(-) diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index e6882be5..bc2229d2 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -19,7 +19,7 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } once_cell = "1" pin-project-lite = "0.2.0" -polling = "2.0.0" +polling = "2.2.0" rand = "0.8" slab = "0.4.2" socket2 = {features = ["all"], version = "0.4"} diff --git a/generic/threadshare/src/runtime/executor/async_wrapper.rs b/generic/threadshare/src/runtime/executor/async_wrapper.rs index 7d0c53dc..dfb17b70 100644 --- a/generic/threadshare/src/runtime/executor/async_wrapper.rs +++ b/generic/threadshare/src/runtime/executor/async_wrapper.rs @@ -28,6 +28,8 @@ use std::os::windows::io::{AsRawSocket, RawSocket}; use socket2::{Domain, Protocol, SockAddr, Socket, Type}; +use crate::runtime::RUNTIME_CAT; + use super::scheduler::{self, Scheduler}; use super::{Reactor, Readable, ReadableOwned, Source, Writable, WritableOwned}; @@ -87,7 +89,7 @@ use super::{Reactor, Readable, ReadableOwned, Source, Writable, WritableOwned}; /// [`Shutdown`][`std::net::Shutdown`]. /// #[derive(Debug)] -pub struct Async { +pub struct Async { /// A source registered in the reactor. pub(super) source: Arc, @@ -95,13 +97,13 @@ pub struct Async { io: Option, // The [`Handle`] on the [`Scheduler`] on which this Async wrapper is registered. - handle: scheduler::HandleWeak, + sched: scheduler::HandleWeak, } -impl Unpin for Async {} +impl Unpin for Async {} #[cfg(unix)] -impl Async { +impl Async { /// Creates an async I/O handle. /// /// This method will put the handle in non-blocking mode and register it in @@ -132,7 +134,7 @@ impl Async { Ok(Async { source, io: Some(io), - handle: Scheduler::current() + sched: Scheduler::current() .expect("Attempt to create an Async wrapper outside of a Context") .downgrade(), }) @@ -140,14 +142,14 @@ impl Async { } #[cfg(unix)] -impl AsRawFd for Async { +impl AsRawFd for Async { fn as_raw_fd(&self) -> RawFd { self.source.raw } } #[cfg(windows)] -impl Async { +impl Async { /// Creates an async I/O handle. /// /// This method will put the handle in non-blocking mode and register it in @@ -183,7 +185,7 @@ impl Async { Ok(Async { source, io: Some(io), - handle: Scheduler::current() + sched: Scheduler::current() .expect("Attempt to create an Async wrapper outside of a Context") .downgrade(), }) @@ -191,13 +193,13 @@ impl Async { } #[cfg(windows)] -impl AsRawSocket for Async { +impl AsRawSocket for Async { fn as_raw_socket(&self) -> RawSocket { self.source.raw } } -impl Async { +impl Async { /// Gets a reference to the inner I/O handle. pub fn get_ref(&self) -> &T { self.io.as_ref().unwrap() @@ -358,32 +360,39 @@ impl Async { } } -impl AsRef for Async { +impl AsRef for Async { fn as_ref(&self) -> &T { self.get_ref() } } -impl AsMut for Async { +impl AsMut for Async { fn as_mut(&mut self) -> &mut T { self.get_mut() } } -impl Drop for Async { +impl Drop for Async { fn drop(&mut self) { if let Some(io) = self.io.take() { - // Drop the I/O handle to close it. - drop(io); - - if let Some(handle) = self.handle.upgrade() { - handle.remove_soure(Arc::clone(&self.source)); + if let Some(sched) = self.sched.upgrade() { + let source = Arc::clone(&self.source); + sched.spawn_and_awake(async move { + Reactor::with_mut(|reactor| { + if let Err(err) = reactor.remove_io(&source) { + gst::error!(RUNTIME_CAT, "Failed to remove fd {}: {}", source.raw, err); + } + }); + drop(io); + }); + } else { + drop(io); } } } } -impl AsyncRead for Async { +impl AsyncRead for Async { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -413,7 +422,7 @@ impl AsyncRead for Async { } } -impl AsyncRead for &Async +impl AsyncRead for &Async where for<'a> &'a T: Read, { @@ -446,7 +455,7 @@ where } } -impl AsyncWrite for Async { +impl AsyncWrite for Async { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -490,7 +499,7 @@ impl AsyncWrite for Async { } } -impl AsyncWrite for &Async +impl AsyncWrite for &Async where for<'a> &'a T: Write, { diff --git a/generic/threadshare/src/runtime/executor/reactor.rs b/generic/threadshare/src/runtime/executor/reactor.rs index 455436cd..6769f47a 100644 --- a/generic/threadshare/src/runtime/executor/reactor.rs +++ b/generic/threadshare/src/runtime/executor/reactor.rs @@ -186,6 +186,12 @@ impl Reactor { // Register the file descriptor. if let Err(err) = self.poller.add(raw, Event::none(source.key)) { + gst::error!( + crate::runtime::RUNTIME_CAT, + "Failed to register fd {}: {}", + source.raw, + err, + ); self.sources.remove(source.key); return Err(err); } @@ -465,27 +471,27 @@ impl Source { } /// Waits until the I/O source is readable. - pub fn readable(handle: &Async) -> Readable<'_, T> { + pub fn readable(handle: &Async) -> Readable<'_, T> { Readable(Self::ready(handle, READ)) } /// Waits until the I/O source is readable. - pub fn readable_owned(handle: Arc>) -> ReadableOwned { + pub fn readable_owned(handle: Arc>) -> ReadableOwned { ReadableOwned(Self::ready(handle, READ)) } /// Waits until the I/O source is writable. - pub fn writable(handle: &Async) -> Writable<'_, T> { + pub fn writable(handle: &Async) -> Writable<'_, T> { Writable(Self::ready(handle, WRITE)) } /// Waits until the I/O source is writable. - pub fn writable_owned(handle: Arc>) -> WritableOwned { + pub fn writable_owned(handle: Arc>) -> WritableOwned { WritableOwned(Self::ready(handle, WRITE)) } /// Waits until the I/O source is readable or writable. - fn ready> + Clone, T>(handle: H, dir: usize) -> Ready { + fn ready> + Clone, T: Send + 'static>(handle: H, dir: usize) -> Ready { Ready { handle, dir, @@ -498,9 +504,9 @@ impl Source { /// Future for [`Async::readable`](crate::runtime::Async::readable). #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Readable<'a, T>(Ready<&'a Async, T>); +pub struct Readable<'a, T: Send + 'static>(Ready<&'a Async, T>); -impl Future for Readable<'_, T> { +impl Future for Readable<'_, T> { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -510,7 +516,7 @@ impl Future for Readable<'_, T> { } } -impl fmt::Debug for Readable<'_, T> { +impl fmt::Debug for Readable<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Readable").finish() } @@ -518,9 +524,9 @@ impl fmt::Debug for Readable<'_, T> { /// Future for [`Async::readable_owned`](crate::runtime::Async::readable_owned). #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ReadableOwned(Ready>, T>); +pub struct ReadableOwned(Ready>, T>); -impl Future for ReadableOwned { +impl Future for ReadableOwned { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -534,7 +540,7 @@ impl Future for ReadableOwned { } } -impl fmt::Debug for ReadableOwned { +impl fmt::Debug for ReadableOwned { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ReadableOwned").finish() } @@ -542,9 +548,9 @@ impl fmt::Debug for ReadableOwned { /// Future for [`Async::writable`](crate::runtime::Async::writable). #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Writable<'a, T>(Ready<&'a Async, T>); +pub struct Writable<'a, T: Send + 'static>(Ready<&'a Async, T>); -impl Future for Writable<'_, T> { +impl Future for Writable<'_, T> { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -554,7 +560,7 @@ impl Future for Writable<'_, T> { } } -impl fmt::Debug for Writable<'_, T> { +impl fmt::Debug for Writable<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Writable").finish() } @@ -562,9 +568,9 @@ impl fmt::Debug for Writable<'_, T> { /// Future for [`Async::writable_owned`](crate::runtime::Async::writable_owned). #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct WritableOwned(Ready>, T>); +pub struct WritableOwned(Ready>, T>); -impl Future for WritableOwned { +impl Future for WritableOwned { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -578,13 +584,13 @@ impl Future for WritableOwned { } } -impl fmt::Debug for WritableOwned { +impl fmt::Debug for WritableOwned { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WritableOwned").finish() } } -struct Ready>, T> { +struct Ready>, T: Send + 'static> { handle: H, dir: usize, ticks: Option<(usize, usize)>, @@ -592,9 +598,9 @@ struct Ready>, T> { _guard: Option>, } -impl>, T> Unpin for Ready {} +impl>, T: Send + 'static> Unpin for Ready {} -impl> + Clone, T> Future for Ready { +impl> + Clone, T: Send + 'static> Future for Ready { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -656,14 +662,14 @@ impl> + Clone, T> Future for Ready { } /// Remove waker when dropped. -struct RemoveOnDrop>, T> { +struct RemoveOnDrop>, T: Send + 'static> { handle: H, dir: usize, key: usize, _marker: PhantomData T>, } -impl>, T> Drop for RemoveOnDrop { +impl>, T: Send + 'static + 'static> Drop for RemoveOnDrop { fn drop(&mut self) { let mut state = self.handle.borrow().source.state.lock().unwrap(); let wakers = &mut state[self.dir].wakers; diff --git a/generic/threadshare/src/runtime/executor/scheduler.rs b/generic/threadshare/src/runtime/executor/scheduler.rs index d6d76be2..14cc4340 100644 --- a/generic/threadshare/src/runtime/executor/scheduler.rs +++ b/generic/threadshare/src/runtime/executor/scheduler.rs @@ -3,8 +3,6 @@ // // Take a look at the license at the top of the repository in the LICENSE file. -use concurrent_queue::ConcurrentQueue; - use futures::channel::oneshot; use futures::pin_mut; @@ -22,22 +20,18 @@ use std::time::{Duration, Instant}; use waker_fn::waker_fn; use super::task::{SubTaskOutput, TaskId, TaskQueue}; -use super::{CallOnDrop, JoinHandle, Reactor, Source}; +use super::{CallOnDrop, JoinHandle, Reactor}; use crate::runtime::RUNTIME_CAT; thread_local! { static CURRENT_SCHEDULER: RefCell> = RefCell::new(None); } -#[derive(Debug)] -struct CleanUpOps(Arc); - #[derive(Debug)] pub(super) struct Scheduler { context_name: Arc, max_throttling: Duration, tasks: TaskQueue, - cleanup_ops: ConcurrentQueue, must_awake: Mutex, must_awake_cvar: Condvar, } @@ -108,7 +102,6 @@ impl Scheduler { context_name: context_name.clone(), max_throttling, tasks: TaskQueue::new(context_name), - cleanup_ops: ConcurrentQueue::bounded(1000), must_awake: Mutex::new(false), must_awake_cvar: Condvar::new(), })); @@ -191,13 +184,7 @@ impl Scheduler { break Ok(t); } - Reactor::with_mut(|reactor| { - while let Ok(op) = self.cleanup_ops.pop() { - let _ = reactor.remove_io(&op.0); - } - - reactor.react().ok() - }); + Reactor::with_mut(|reactor| reactor.react().ok()); while let Ok(runnable) = self.tasks.pop_runnable() { panic::catch_unwind(|| runnable.run()).map_err(|err| { @@ -424,18 +411,6 @@ impl Handle { self.0.scheduler.wake_up(); } - pub fn remove_soure(&self, source: Arc) { - if self - .0 - .scheduler - .cleanup_ops - .push(CleanUpOps(source)) - .is_err() - { - gst::warning!(RUNTIME_CAT, "scheduler: cleanup_ops is full"); - } - } - pub fn has_sub_tasks(&self, task_id: TaskId) -> bool { self.0.scheduler.tasks.has_sub_tasks(task_id) }