ts/async_wrapper: remove fd from reactor before dropping its handle

The I/O handle was dropped prior to removing it from the reactor,
which caused `Poller::delete` to fail due to an invalid file
descriptor. This used to happen silently unless the same fd was
added again, e.g. by changing states in the pipeline as follow:

    Null -> Playing -> Null -> Playing.

In which case `Poller::add` failed due to an already existing file.

This commit makes sure the fd is removed from the reactor prior to
dropping the handle. In order to achieve this, a new task is spawned
on the `Context` on which the I/O was originally registered, allowing
it to access the proper `Reactor`. The I/O can then safely be dropped.

Because the I/O handle is moved to the spawned future, this solution
requires adding the `Send + 'static` bounds to the I/O handle used
within the `Async` wrapper. This appears not too restrictive for
existing implementations though. Other attempts were considered,
but they would cause deadlocks.

This new approach also solves a potential race condition where a
fd could be re-registered in a `Reactor` before it was removed.
This commit is contained in:
François Laignel 2022-06-14 20:38:25 +02:00 committed by Sebastian Dröge
parent 06273ed628
commit a45f944edd
4 changed files with 62 additions and 72 deletions

View file

@ -19,7 +19,7 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
once_cell = "1"
pin-project-lite = "0.2.0"
polling = "2.0.0"
polling = "2.2.0"
rand = "0.8"
slab = "0.4.2"
socket2 = {features = ["all"], version = "0.4"}

View file

@ -28,6 +28,8 @@ use std::os::windows::io::{AsRawSocket, RawSocket};
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use crate::runtime::RUNTIME_CAT;
use super::scheduler::{self, Scheduler};
use super::{Reactor, Readable, ReadableOwned, Source, Writable, WritableOwned};
@ -87,7 +89,7 @@ use super::{Reactor, Readable, ReadableOwned, Source, Writable, WritableOwned};
/// [`Shutdown`][`std::net::Shutdown`].
///
#[derive(Debug)]
pub struct Async<T> {
pub struct Async<T: Send + 'static> {
/// A source registered in the reactor.
pub(super) source: Arc<Source>,
@ -95,13 +97,13 @@ pub struct Async<T> {
io: Option<T>,
// The [`Handle`] on the [`Scheduler`] on which this Async wrapper is registered.
handle: scheduler::HandleWeak,
sched: scheduler::HandleWeak,
}
impl<T> Unpin for Async<T> {}
impl<T: Send + 'static> Unpin for Async<T> {}
#[cfg(unix)]
impl<T: AsRawFd> Async<T> {
impl<T: AsRawFd + Send + 'static> Async<T> {
/// Creates an async I/O handle.
///
/// This method will put the handle in non-blocking mode and register it in
@ -132,7 +134,7 @@ impl<T: AsRawFd> Async<T> {
Ok(Async {
source,
io: Some(io),
handle: Scheduler::current()
sched: Scheduler::current()
.expect("Attempt to create an Async wrapper outside of a Context")
.downgrade(),
})
@ -140,14 +142,14 @@ impl<T: AsRawFd> Async<T> {
}
#[cfg(unix)]
impl<T: AsRawFd> AsRawFd for Async<T> {
impl<T: AsRawFd + Send + 'static> AsRawFd for Async<T> {
fn as_raw_fd(&self) -> RawFd {
self.source.raw
}
}
#[cfg(windows)]
impl<T: AsRawSocket> Async<T> {
impl<T: AsRawSocket + Send + 'static> Async<T> {
/// Creates an async I/O handle.
///
/// This method will put the handle in non-blocking mode and register it in
@ -183,7 +185,7 @@ impl<T: AsRawSocket> Async<T> {
Ok(Async {
source,
io: Some(io),
handle: Scheduler::current()
sched: Scheduler::current()
.expect("Attempt to create an Async wrapper outside of a Context")
.downgrade(),
})
@ -191,13 +193,13 @@ impl<T: AsRawSocket> Async<T> {
}
#[cfg(windows)]
impl<T: AsRawSocket> AsRawSocket for Async<T> {
impl<T: AsRawSocket + Send + 'static> AsRawSocket for Async<T> {
fn as_raw_socket(&self) -> RawSocket {
self.source.raw
}
}
impl<T> Async<T> {
impl<T: Send + 'static> Async<T> {
/// Gets a reference to the inner I/O handle.
pub fn get_ref(&self) -> &T {
self.io.as_ref().unwrap()
@ -358,32 +360,39 @@ impl<T> Async<T> {
}
}
impl<T> AsRef<T> for Async<T> {
impl<T: Send + 'static> AsRef<T> for Async<T> {
fn as_ref(&self) -> &T {
self.get_ref()
}
}
impl<T> AsMut<T> for Async<T> {
impl<T: Send + 'static> AsMut<T> for Async<T> {
fn as_mut(&mut self) -> &mut T {
self.get_mut()
}
}
impl<T> Drop for Async<T> {
impl<T: Send + 'static> Drop for Async<T> {
fn drop(&mut self) {
if let Some(io) = self.io.take() {
// Drop the I/O handle to close it.
if let Some(sched) = self.sched.upgrade() {
let source = Arc::clone(&self.source);
sched.spawn_and_awake(async move {
Reactor::with_mut(|reactor| {
if let Err(err) = reactor.remove_io(&source) {
gst::error!(RUNTIME_CAT, "Failed to remove fd {}: {}", source.raw, err);
}
});
drop(io);
});
} else {
drop(io);
if let Some(handle) = self.handle.upgrade() {
handle.remove_soure(Arc::clone(&self.source));
}
}
}
}
impl<T: Read> AsyncRead for Async<T> {
impl<T: Read + Send + 'static> AsyncRead for Async<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -413,7 +422,7 @@ impl<T: Read> AsyncRead for Async<T> {
}
}
impl<T> AsyncRead for &Async<T>
impl<T: Send + 'static> AsyncRead for &Async<T>
where
for<'a> &'a T: Read,
{
@ -446,7 +455,7 @@ where
}
}
impl<T: Write> AsyncWrite for Async<T> {
impl<T: Write + Send + 'static> AsyncWrite for Async<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -490,7 +499,7 @@ impl<T: Write> AsyncWrite for Async<T> {
}
}
impl<T> AsyncWrite for &Async<T>
impl<T: Send + 'static> AsyncWrite for &Async<T>
where
for<'a> &'a T: Write,
{

View file

@ -186,6 +186,12 @@ impl Reactor {
// Register the file descriptor.
if let Err(err) = self.poller.add(raw, Event::none(source.key)) {
gst::error!(
crate::runtime::RUNTIME_CAT,
"Failed to register fd {}: {}",
source.raw,
err,
);
self.sources.remove(source.key);
return Err(err);
}
@ -465,27 +471,27 @@ impl Source {
}
/// Waits until the I/O source is readable.
pub fn readable<T>(handle: &Async<T>) -> Readable<'_, T> {
pub fn readable<T: Send + 'static>(handle: &Async<T>) -> Readable<'_, T> {
Readable(Self::ready(handle, READ))
}
/// Waits until the I/O source is readable.
pub fn readable_owned<T>(handle: Arc<Async<T>>) -> ReadableOwned<T> {
pub fn readable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> ReadableOwned<T> {
ReadableOwned(Self::ready(handle, READ))
}
/// Waits until the I/O source is writable.
pub fn writable<T>(handle: &Async<T>) -> Writable<'_, T> {
pub fn writable<T: Send + 'static>(handle: &Async<T>) -> Writable<'_, T> {
Writable(Self::ready(handle, WRITE))
}
/// Waits until the I/O source is writable.
pub fn writable_owned<T>(handle: Arc<Async<T>>) -> WritableOwned<T> {
pub fn writable_owned<T: Send + 'static>(handle: Arc<Async<T>>) -> WritableOwned<T> {
WritableOwned(Self::ready(handle, WRITE))
}
/// Waits until the I/O source is readable or writable.
fn ready<H: Borrow<Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
fn ready<H: Borrow<Async<T>> + Clone, T: Send + 'static>(handle: H, dir: usize) -> Ready<H, T> {
Ready {
handle,
dir,
@ -498,9 +504,9 @@ impl Source {
/// Future for [`Async::readable`](crate::runtime::Async::readable).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Readable<'a, T>(Ready<&'a Async<T>, T>);
pub struct Readable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
impl<T> Future for Readable<'_, T> {
impl<T: Send + 'static> Future for Readable<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -510,7 +516,7 @@ impl<T> Future for Readable<'_, T> {
}
}
impl<T> fmt::Debug for Readable<'_, T> {
impl<T: Send + 'static> fmt::Debug for Readable<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Readable").finish()
}
@ -518,9 +524,9 @@ impl<T> fmt::Debug for Readable<'_, T> {
/// Future for [`Async::readable_owned`](crate::runtime::Async::readable_owned).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadableOwned<T>(Ready<Arc<Async<T>>, T>);
pub struct ReadableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
impl<T> Future for ReadableOwned<T> {
impl<T: Send + 'static> Future for ReadableOwned<T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -534,7 +540,7 @@ impl<T> Future for ReadableOwned<T> {
}
}
impl<T> fmt::Debug for ReadableOwned<T> {
impl<T: Send + 'static> fmt::Debug for ReadableOwned<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadableOwned").finish()
}
@ -542,9 +548,9 @@ impl<T> fmt::Debug for ReadableOwned<T> {
/// Future for [`Async::writable`](crate::runtime::Async::writable).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Writable<'a, T>(Ready<&'a Async<T>, T>);
pub struct Writable<'a, T: Send + 'static>(Ready<&'a Async<T>, T>);
impl<T> Future for Writable<'_, T> {
impl<T: Send + 'static> Future for Writable<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -554,7 +560,7 @@ impl<T> Future for Writable<'_, T> {
}
}
impl<T> fmt::Debug for Writable<'_, T> {
impl<T: Send + 'static> fmt::Debug for Writable<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Writable").finish()
}
@ -562,9 +568,9 @@ impl<T> fmt::Debug for Writable<'_, T> {
/// Future for [`Async::writable_owned`](crate::runtime::Async::writable_owned).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WritableOwned<T>(Ready<Arc<Async<T>>, T>);
pub struct WritableOwned<T: Send + 'static>(Ready<Arc<Async<T>>, T>);
impl<T> Future for WritableOwned<T> {
impl<T: Send + 'static> Future for WritableOwned<T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -578,13 +584,13 @@ impl<T> Future for WritableOwned<T> {
}
}
impl<T> fmt::Debug for WritableOwned<T> {
impl<T: Send + 'static> fmt::Debug for WritableOwned<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WritableOwned").finish()
}
}
struct Ready<H: Borrow<Async<T>>, T> {
struct Ready<H: Borrow<Async<T>>, T: Send + 'static> {
handle: H,
dir: usize,
ticks: Option<(usize, usize)>,
@ -592,9 +598,9 @@ struct Ready<H: Borrow<Async<T>>, T> {
_guard: Option<RemoveOnDrop<H, T>>,
}
impl<H: Borrow<Async<T>>, T> Unpin for Ready<H, T> {}
impl<H: Borrow<Async<T>>, T: Send + 'static> Unpin for Ready<H, T> {}
impl<H: Borrow<Async<T>> + Clone, T> Future for Ready<H, T> {
impl<H: Borrow<Async<T>> + Clone, T: Send + 'static> Future for Ready<H, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -656,14 +662,14 @@ impl<H: Borrow<Async<T>> + Clone, T> Future for Ready<H, T> {
}
/// Remove waker when dropped.
struct RemoveOnDrop<H: Borrow<Async<T>>, T> {
struct RemoveOnDrop<H: Borrow<Async<T>>, T: Send + 'static> {
handle: H,
dir: usize,
key: usize,
_marker: PhantomData<fn() -> T>,
}
impl<H: Borrow<Async<T>>, T> Drop for RemoveOnDrop<H, T> {
impl<H: Borrow<Async<T>>, T: Send + 'static + 'static> Drop for RemoveOnDrop<H, T> {
fn drop(&mut self) {
let mut state = self.handle.borrow().source.state.lock().unwrap();
let wakers = &mut state[self.dir].wakers;

View file

@ -3,8 +3,6 @@
//
// Take a look at the license at the top of the repository in the LICENSE file.
use concurrent_queue::ConcurrentQueue;
use futures::channel::oneshot;
use futures::pin_mut;
@ -22,22 +20,18 @@ use std::time::{Duration, Instant};
use waker_fn::waker_fn;
use super::task::{SubTaskOutput, TaskId, TaskQueue};
use super::{CallOnDrop, JoinHandle, Reactor, Source};
use super::{CallOnDrop, JoinHandle, Reactor};
use crate::runtime::RUNTIME_CAT;
thread_local! {
static CURRENT_SCHEDULER: RefCell<Option<HandleWeak>> = RefCell::new(None);
}
#[derive(Debug)]
struct CleanUpOps(Arc<Source>);
#[derive(Debug)]
pub(super) struct Scheduler {
context_name: Arc<str>,
max_throttling: Duration,
tasks: TaskQueue,
cleanup_ops: ConcurrentQueue<CleanUpOps>,
must_awake: Mutex<bool>,
must_awake_cvar: Condvar,
}
@ -108,7 +102,6 @@ impl Scheduler {
context_name: context_name.clone(),
max_throttling,
tasks: TaskQueue::new(context_name),
cleanup_ops: ConcurrentQueue::bounded(1000),
must_awake: Mutex::new(false),
must_awake_cvar: Condvar::new(),
}));
@ -191,13 +184,7 @@ impl Scheduler {
break Ok(t);
}
Reactor::with_mut(|reactor| {
while let Ok(op) = self.cleanup_ops.pop() {
let _ = reactor.remove_io(&op.0);
}
reactor.react().ok()
});
Reactor::with_mut(|reactor| reactor.react().ok());
while let Ok(runnable) = self.tasks.pop_runnable() {
panic::catch_unwind(|| runnable.run()).map_err(|err| {
@ -424,18 +411,6 @@ impl Handle {
self.0.scheduler.wake_up();
}
pub fn remove_soure(&self, source: Arc<Source>) {
if self
.0
.scheduler
.cleanup_ops
.push(CleanUpOps(source))
.is_err()
{
gst::warning!(RUNTIME_CAT, "scheduler: cleanup_ops is full");
}
}
pub fn has_sub_tasks(&self, task_id: TaskId) -> bool {
self.0.scheduler.tasks.has_sub_tasks(task_id)
}