core: Paranoid no-lock-in-async

This commit is contained in:
asonix 2022-12-09 17:38:22 -06:00
parent 9869fe7cb3
commit ba8899bed8
3 changed files with 119 additions and 88 deletions

View file

@ -1,21 +1,19 @@
use std::{ use std::{
future::Future, future::Future,
panic::AssertUnwindSafe,
pin::Pin, pin::Pin,
sync::Mutex,
task::{Context, Poll}, task::{Context, Poll},
}; };
pub(crate) struct CatchUnwindFuture<F> { pub(crate) struct CatchUnwindFuture<F> {
future: Mutex<F>, future: F,
} }
pub(crate) fn catch_unwind<F>(future: F) -> CatchUnwindFuture<F> pub(crate) fn catch_unwind<F>(future: F) -> CatchUnwindFuture<F>
where where
F: Future + Unpin, F: Future + Unpin,
{ {
CatchUnwindFuture { CatchUnwindFuture { future }
future: Mutex::new(future),
}
} }
impl<F> Future for CatchUnwindFuture<F> impl<F> Future for CatchUnwindFuture<F>
@ -25,13 +23,12 @@ where
type Output = std::thread::Result<F::Output>; type Output = std::thread::Result<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let future = &self.future; let future = &mut self.get_mut().future;
let waker = cx.waker().clone(); let waker = cx.waker().clone();
let res = std::panic::catch_unwind(|| { let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
let mut context = Context::from_waker(&waker); let mut context = Context::from_waker(&waker);
let mut guard = future.lock().unwrap(); Pin::new(future).poll(&mut context)
Pin::new(&mut *guard).poll(&mut context) }));
});
match res { match res {
Ok(poll) => poll.map(Ok), Ok(poll) => poll.map(Ok),

View file

@ -1,6 +1,9 @@
use crate::{catch_unwind::catch_unwind, Job, JobError, JobInfo, ReturnJobInfo}; use crate::{catch_unwind::catch_unwind, Job, JobError, JobInfo, ReturnJobInfo};
use serde_json::Value; use serde_json::Value;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Instant}; use std::{
collections::HashMap, future::Future, panic::AssertUnwindSafe, pin::Pin, sync::Arc,
time::Instant,
};
use tracing::Span; use tracing::Span;
use tracing_futures::Instrument; use tracing_futures::Instrument;
use uuid::Uuid; use uuid::Uuid;
@ -165,13 +168,7 @@ where
let start = Instant::now(); let start = Instant::now();
let state_mtx = std::sync::Mutex::new(state); let res = match std::panic::catch_unwind(AssertUnwindSafe(|| (process_fn)(args, state))) {
let process_mtx = std::sync::Mutex::new(process_fn);
let res = match std::panic::catch_unwind(|| {
let state = state_mtx.lock().unwrap().clone();
(process_mtx.lock().unwrap())(args, state)
}) {
Ok(fut) => catch_unwind(fut).await, Ok(fut) => catch_unwind(fut).await,
Err(e) => Err(e), Err(e) => Err(e),
}; };

View file

@ -138,10 +138,11 @@ pub trait Storage: Clone + Send {
/// A default, in-memory implementation of a storage mechanism /// A default, in-memory implementation of a storage mechanism
pub mod memory_storage { pub mod memory_storage {
use super::JobInfo; use super::JobInfo;
use event_listener::Event; use event_listener::{Event, EventListener};
use std::{ use std::{
collections::HashMap, collections::HashMap,
convert::Infallible, convert::Infallible,
future::Future,
sync::Arc, sync::Arc,
sync::Mutex, sync::Mutex,
time::{Duration, SystemTime}, time::{Duration, SystemTime},
@ -154,7 +155,7 @@ pub mod memory_storage {
/// Race a future against the clock, returning an empty tuple if the clock wins /// Race a future against the clock, returning an empty tuple if the clock wins
async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()> async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
where where
F: std::future::Future + Send + Sync; F: Future + Send + Sync;
} }
#[derive(Clone)] #[derive(Clone)]
@ -186,40 +187,21 @@ pub mod memory_storage {
timer, timer,
} }
} }
fn contains_job(&self, uuid: &Uuid) -> bool {
self.inner.lock().unwrap().jobs.contains_key(uuid)
} }
#[async_trait::async_trait] fn insert_job(&self, job: JobInfo) {
impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
type Error = Infallible;
async fn generate_id(&self) -> Result<Uuid, Self::Error> {
let uuid = loop {
let uuid = Uuid::new_v4();
if !self.inner.lock().unwrap().jobs.contains_key(&uuid) {
break uuid;
}
};
Ok(uuid)
}
async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error> {
self.inner.lock().unwrap().jobs.insert(job.id(), job); self.inner.lock().unwrap().jobs.insert(job.id(), job);
Ok(())
} }
async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, Self::Error> { fn get_job(&self, id: &Uuid) -> Option<JobInfo> {
let j = self.inner.lock().unwrap().jobs.get(&id).cloned(); self.inner.lock().unwrap().jobs.get(id).cloned()
Ok(j)
} }
async fn fetch_job_from_queue(&self, queue: &str) -> Result<JobInfo, Self::Error> { fn try_deque(&self, queue: &str, now: SystemTime) -> Option<JobInfo> {
loop {
let listener = {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let now = SystemTime::now();
let j = inner.job_queues.iter().find_map(|(k, v)| { let j = inner.job_queues.iter().find_map(|(k, v)| {
if v == queue { if v == queue {
@ -233,16 +215,22 @@ pub mod memory_storage {
None None
}); });
let duration = if let Some(j) = j { if let Some(job) = j {
if inner.job_queues.remove(&j.id()).is_some() { inner.job_queues.remove(&job.id());
return Ok(j); return Some(job);
} else {
continue;
} }
} else {
inner.job_queues.iter().fold( None
Duration::from_secs(5), }
|duration, (id, v_queue)| {
fn listener(&self, queue: &str, now: SystemTime) -> (Duration, EventListener) {
let mut inner = self.inner.lock().unwrap();
let duration =
inner
.job_queues
.iter()
.fold(Duration::from_secs(5), |duration, (id, v_queue)| {
if v_queue == queue { if v_queue == queue {
if let Some(job) = inner.jobs.get(id) { if let Some(job) = inner.jobs.get(id) {
if let Some(ready_at) = job.next_queue() { if let Some(ready_at) = job.next_queue() {
@ -258,45 +246,94 @@ pub mod memory_storage {
} }
duration duration
}, });
)
};
self.timer.timeout( let listener = inner.queues.entry(queue.to_string()).or_default().listen();
duration,
inner.queues.entry(queue.to_string()).or_default().listen(),
)
};
let _ = listener.await; (duration, listener)
}
} }
async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error> { fn queue_and_notify(&self, queue: &str, id: Uuid) {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
inner.job_queues.insert(id, queue.to_owned()); inner.job_queues.insert(id, queue.to_owned());
inner.queues.entry(queue.to_string()).or_default().notify(1); inner.queues.entry(queue.to_string()).or_default().notify(1);
}
fn mark_running(&self, job_id: Uuid, worker_id: Uuid) {
let mut inner = self.inner.lock().unwrap();
inner.worker_ids.insert(job_id, worker_id);
inner.worker_ids_inverse.insert(worker_id, job_id);
}
fn purge_job(&self, job_id: Uuid) {
let mut inner = self.inner.lock().unwrap();
inner.jobs.remove(&job_id);
inner.job_queues.remove(&job_id);
if let Some(worker_id) = inner.worker_ids.remove(&job_id) {
inner.worker_ids_inverse.remove(&worker_id);
}
}
}
#[async_trait::async_trait]
impl<T: Timer + Send + Sync + Clone> super::Storage for Storage<T> {
type Error = Infallible;
async fn generate_id(&self) -> Result<Uuid, Self::Error> {
let uuid = loop {
let uuid = Uuid::new_v4();
if !self.contains_job(&uuid) {
break uuid;
}
};
Ok(uuid)
}
async fn save_job(&self, job: JobInfo) -> Result<(), Self::Error> {
self.insert_job(job);
Ok(())
}
async fn fetch_job(&self, id: Uuid) -> Result<Option<JobInfo>, Self::Error> {
Ok(self.get_job(&id))
}
async fn fetch_job_from_queue(&self, queue: &str) -> Result<JobInfo, Self::Error> {
loop {
let now = SystemTime::now();
if let Some(job) = self.try_deque(queue, now) {
return Ok(job);
}
let (duration, listener) = self.listener(queue, now);
let _ = self.timer.timeout(duration, listener).await;
}
}
async fn queue_job(&self, queue: &str, id: Uuid) -> Result<(), Self::Error> {
self.queue_and_notify(queue, id);
Ok(()) Ok(())
} }
async fn run_job(&self, id: Uuid, worker_id: Uuid) -> Result<(), Self::Error> { async fn run_job(&self, id: Uuid, worker_id: Uuid) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap(); self.mark_running(id, worker_id);
inner.worker_ids.insert(id, worker_id);
inner.worker_ids_inverse.insert(worker_id, id);
Ok(()) Ok(())
} }
async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error> { async fn delete_job(&self, id: Uuid) -> Result<(), Self::Error> {
let mut inner = self.inner.lock().unwrap(); self.purge_job(id);
inner.jobs.remove(&id);
inner.job_queues.remove(&id);
if let Some(worker_id) = inner.worker_ids.remove(&id) {
inner.worker_ids_inverse.remove(&worker_id);
}
Ok(()) Ok(())
} }
} }