2021-07-17 17:45:06 +00:00
|
|
|
use std::any::type_name;
|
2021-03-29 02:05:20 +00:00
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::error::Error;
|
|
|
|
use std::fmt::Display;
|
|
|
|
use std::future::Future;
|
|
|
|
use std::sync::Arc;
|
2021-12-01 18:22:43 +00:00
|
|
|
use std::time::Instant;
|
2021-03-29 02:05:20 +00:00
|
|
|
|
2021-07-17 17:45:06 +00:00
|
|
|
use anymap2::any::CloneAnySendSync;
|
|
|
|
use anymap2::Map;
|
2021-03-29 02:05:20 +00:00
|
|
|
use sqlx::{Pool, Postgres};
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
|
use crate::hidden::{BuildFn, RunFn};
|
|
|
|
use crate::utils::Opaque;
|
2021-03-29 20:39:07 +00:00
|
|
|
use crate::{JobBuilder, JobRunnerOptions};
|
2021-03-29 02:05:20 +00:00
|
|
|
|
2023-10-11 22:19:06 +00:00
|
|
|
type BoxedError = Box<dyn Error + Send + 'static>;
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
/// Stores a mapping from job name to job. Can be used to construct
|
|
|
|
/// a job runner.
|
|
|
|
pub struct JobRegistry {
|
2023-10-11 22:19:06 +00:00
|
|
|
#[allow(clippy::type_complexity)]
|
|
|
|
error_handler: Arc<dyn Fn(&str, BoxedError) + Send + Sync>,
|
2021-03-29 20:39:07 +00:00
|
|
|
job_map: HashMap<&'static str, &'static NamedJob>,
|
2021-07-17 17:45:06 +00:00
|
|
|
context: Map<dyn CloneAnySendSync + Send + Sync>,
|
2021-03-29 02:05:20 +00:00
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
/// Error returned when a job is received whose name is not in the registry.
|
2021-03-29 02:05:20 +00:00
|
|
|
#[derive(Debug)]
|
2021-03-29 20:39:07 +00:00
|
|
|
pub struct UnknownJobError;
|
2021-03-29 02:05:20 +00:00
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
impl Error for UnknownJobError {}
|
|
|
|
impl Display for UnknownJobError {
|
2021-03-29 02:05:20 +00:00
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
2021-03-29 20:39:07 +00:00
|
|
|
f.write_str("Unknown job")
|
2021-03-29 02:05:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
impl JobRegistry {
|
|
|
|
/// Construct a new job registry from the provided job list.
|
|
|
|
pub fn new(jobs: &[&'static NamedJob]) -> Self {
|
|
|
|
let mut job_map = HashMap::new();
|
|
|
|
for &job in jobs {
|
|
|
|
if job_map.insert(job.name(), job).is_some() {
|
|
|
|
panic!("Duplicate job registered: {}", job.name());
|
2021-03-29 02:05:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Self {
|
|
|
|
error_handler: Arc::new(Self::default_error_handler),
|
2021-03-29 20:39:07 +00:00
|
|
|
job_map,
|
2021-07-17 17:45:06 +00:00
|
|
|
context: Map::new(),
|
2021-03-29 02:05:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
/// Set a function to be called whenever a job returns an error.
|
2021-03-29 02:05:20 +00:00
|
|
|
pub fn set_error_handler(
|
|
|
|
&mut self,
|
2023-10-11 22:19:06 +00:00
|
|
|
error_handler: impl Fn(&str, BoxedError) + Send + Sync + 'static,
|
2021-03-29 02:05:20 +00:00
|
|
|
) -> &mut Self {
|
|
|
|
self.error_handler = Arc::new(error_handler);
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
2021-07-17 17:45:06 +00:00
|
|
|
/// Provide context for the jobs.
|
|
|
|
pub fn set_context<C: Clone + Send + Sync + 'static>(&mut self, context: C) -> &mut Self {
|
|
|
|
self.context.insert(context);
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Access job context. Will panic if context with this type has not been provided.
|
|
|
|
pub fn context<C: Clone + Send + Sync + 'static>(&self) -> C {
|
|
|
|
if let Some(c) = self.context.get::<C>() {
|
|
|
|
c.clone()
|
|
|
|
} else {
|
|
|
|
panic!(
|
|
|
|
"No context of type `{}` has been provided.",
|
|
|
|
type_name::<C>()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
/// Look-up a job by name.
|
|
|
|
pub fn resolve_job(&self, name: &str) -> Option<&'static NamedJob> {
|
|
|
|
self.job_map.get(name).copied()
|
2021-03-29 02:05:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// The default error handler implementation, which simply logs the error.
|
2023-10-11 22:19:06 +00:00
|
|
|
pub fn default_error_handler(name: &str, error: BoxedError) {
|
2021-12-01 18:22:43 +00:00
|
|
|
log::error!("Job `{}` failed: {}", name, error);
|
2021-03-29 02:05:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[doc(hidden)]
|
|
|
|
pub fn spawn_internal<E: Into<Box<dyn Error + Send + Sync + 'static>>>(
|
|
|
|
&self,
|
|
|
|
name: &'static str,
|
|
|
|
f: impl Future<Output = Result<(), E>> + Send + 'static,
|
|
|
|
) {
|
|
|
|
let error_handler = self.error_handler.clone();
|
|
|
|
tokio::spawn(async move {
|
2021-12-01 18:22:43 +00:00
|
|
|
let start_time = Instant::now();
|
|
|
|
log::info!("Job `{}` started.", name);
|
2021-03-29 02:05:20 +00:00
|
|
|
if let Err(e) = f.await {
|
|
|
|
error_handler(name, e.into());
|
2021-12-01 18:22:43 +00:00
|
|
|
} else {
|
|
|
|
log::info!(
|
|
|
|
"Job `{}` completed in {}s.",
|
|
|
|
name,
|
|
|
|
start_time.elapsed().as_secs_f64()
|
|
|
|
);
|
2021-03-29 02:05:20 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
/// Construct a job runner from this registry and the provided connection
|
2021-03-29 02:05:20 +00:00
|
|
|
/// pool.
|
2021-03-29 20:39:07 +00:00
|
|
|
pub fn runner(self, pool: &Pool<Postgres>) -> JobRunnerOptions {
|
|
|
|
JobRunnerOptions::new(pool, move |current_job| {
|
|
|
|
if let Some(job) = self.resolve_job(current_job.name()) {
|
|
|
|
(job.run_fn.0 .0)(&self, current_job);
|
2021-03-29 02:05:20 +00:00
|
|
|
} else {
|
2021-03-29 20:39:07 +00:00
|
|
|
(self.error_handler)(current_job.name(), Box::new(UnknownJobError))
|
2021-03-29 02:05:20 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
/// Type for a named job. Functions annotated with `#[job]` are
|
|
|
|
/// transformed into static variables whose type is `&'static NamedJob`.
|
2021-03-29 02:05:20 +00:00
|
|
|
#[derive(Debug)]
|
2021-03-29 20:39:07 +00:00
|
|
|
pub struct NamedJob {
|
2021-03-29 02:05:20 +00:00
|
|
|
name: &'static str,
|
|
|
|
build_fn: Opaque<BuildFn>,
|
|
|
|
run_fn: Opaque<RunFn>,
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
impl NamedJob {
|
2021-03-29 02:05:20 +00:00
|
|
|
#[doc(hidden)]
|
|
|
|
pub const fn new_internal(name: &'static str, build_fn: BuildFn, run_fn: RunFn) -> Self {
|
|
|
|
Self {
|
|
|
|
name,
|
|
|
|
build_fn: Opaque(build_fn),
|
|
|
|
run_fn: Opaque(run_fn),
|
|
|
|
}
|
|
|
|
}
|
2021-03-29 20:39:07 +00:00
|
|
|
/// Initialize a job builder with the name and defaults of this job.
|
2021-03-29 22:06:47 +00:00
|
|
|
pub fn builder(&self) -> JobBuilder<'static> {
|
2021-03-29 20:39:07 +00:00
|
|
|
let mut builder = JobBuilder::new(self.name);
|
2021-03-29 02:05:20 +00:00
|
|
|
(self.build_fn.0 .0)(&mut builder);
|
|
|
|
builder
|
|
|
|
}
|
2021-03-29 20:39:07 +00:00
|
|
|
/// Initialize a job builder with the name and defaults of this job,
|
|
|
|
/// using the provided job ID.
|
2021-03-29 22:06:47 +00:00
|
|
|
pub fn builder_with_id(&self, id: Uuid) -> JobBuilder<'static> {
|
2021-03-29 20:39:07 +00:00
|
|
|
let mut builder = JobBuilder::new_with_id(id, self.name);
|
2021-03-29 02:05:20 +00:00
|
|
|
(self.build_fn.0 .0)(&mut builder);
|
|
|
|
builder
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
/// Returns the name of this job.
|
2021-03-29 02:05:20 +00:00
|
|
|
pub const fn name(&self) -> &'static str {
|
|
|
|
self.name
|
|
|
|
}
|
|
|
|
}
|