sqlxmq/src/registry.rs

165 lines
5.1 KiB
Rust
Raw Permalink Normal View History

use std::any::type_name;
2021-03-29 02:05:20 +00:00
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Display;
use std::future::Future;
use std::sync::Arc;
2021-12-01 18:22:43 +00:00
use std::time::Instant;
2021-03-29 02:05:20 +00:00
use anymap2::any::CloneAnySendSync;
use anymap2::Map;
2021-03-29 02:05:20 +00:00
use sqlx::{Pool, Postgres};
use uuid::Uuid;
use crate::hidden::{BuildFn, RunFn};
use crate::utils::Opaque;
use crate::{JobBuilder, JobRunnerOptions};
2021-03-29 02:05:20 +00:00
2023-10-11 22:19:06 +00:00
type BoxedError = Box<dyn Error + Send + 'static>;
/// Stores a mapping from job name to job. Can be used to construct
/// a job runner.
pub struct JobRegistry {
2023-10-11 22:19:06 +00:00
#[allow(clippy::type_complexity)]
error_handler: Arc<dyn Fn(&str, BoxedError) + Send + Sync>,
job_map: HashMap<&'static str, &'static NamedJob>,
context: Map<dyn CloneAnySendSync + Send + Sync>,
2021-03-29 02:05:20 +00:00
}
/// Error returned when a job is received whose name is not in the registry.
2021-03-29 02:05:20 +00:00
#[derive(Debug)]
pub struct UnknownJobError;
2021-03-29 02:05:20 +00:00
impl Error for UnknownJobError {}
impl Display for UnknownJobError {
2021-03-29 02:05:20 +00:00
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("Unknown job")
2021-03-29 02:05:20 +00:00
}
}
impl JobRegistry {
/// Construct a new job registry from the provided job list.
pub fn new(jobs: &[&'static NamedJob]) -> Self {
let mut job_map = HashMap::new();
for &job in jobs {
if job_map.insert(job.name(), job).is_some() {
panic!("Duplicate job registered: {}", job.name());
2021-03-29 02:05:20 +00:00
}
}
Self {
error_handler: Arc::new(Self::default_error_handler),
job_map,
context: Map::new(),
2021-03-29 02:05:20 +00:00
}
}
/// Set a function to be called whenever a job returns an error.
2021-03-29 02:05:20 +00:00
pub fn set_error_handler(
&mut self,
2023-10-11 22:19:06 +00:00
error_handler: impl Fn(&str, BoxedError) + Send + Sync + 'static,
2021-03-29 02:05:20 +00:00
) -> &mut Self {
self.error_handler = Arc::new(error_handler);
self
}
/// Provide context for the jobs.
pub fn set_context<C: Clone + Send + Sync + 'static>(&mut self, context: C) -> &mut Self {
self.context.insert(context);
self
}
/// Access job context. Will panic if context with this type has not been provided.
pub fn context<C: Clone + Send + Sync + 'static>(&self) -> C {
if let Some(c) = self.context.get::<C>() {
c.clone()
} else {
panic!(
"No context of type `{}` has been provided.",
type_name::<C>()
);
}
}
/// Look-up a job by name.
pub fn resolve_job(&self, name: &str) -> Option<&'static NamedJob> {
self.job_map.get(name).copied()
2021-03-29 02:05:20 +00:00
}
/// The default error handler implementation, which simply logs the error.
2023-10-11 22:19:06 +00:00
pub fn default_error_handler(name: &str, error: BoxedError) {
2021-12-01 18:22:43 +00:00
log::error!("Job `{}` failed: {}", name, error);
2021-03-29 02:05:20 +00:00
}
#[doc(hidden)]
pub fn spawn_internal<E: Into<Box<dyn Error + Send + Sync + 'static>>>(
&self,
name: &'static str,
f: impl Future<Output = Result<(), E>> + Send + 'static,
) {
let error_handler = self.error_handler.clone();
tokio::spawn(async move {
2021-12-01 18:22:43 +00:00
let start_time = Instant::now();
log::info!("Job `{}` started.", name);
2021-03-29 02:05:20 +00:00
if let Err(e) = f.await {
error_handler(name, e.into());
2021-12-01 18:22:43 +00:00
} else {
log::info!(
"Job `{}` completed in {}s.",
name,
start_time.elapsed().as_secs_f64()
);
2021-03-29 02:05:20 +00:00
}
});
}
/// Construct a job runner from this registry and the provided connection
2021-03-29 02:05:20 +00:00
/// pool.
pub fn runner(self, pool: &Pool<Postgres>) -> JobRunnerOptions {
JobRunnerOptions::new(pool, move |current_job| {
if let Some(job) = self.resolve_job(current_job.name()) {
(job.run_fn.0 .0)(&self, current_job);
2021-03-29 02:05:20 +00:00
} else {
(self.error_handler)(current_job.name(), Box::new(UnknownJobError))
2021-03-29 02:05:20 +00:00
}
})
}
}
/// Type for a named job. Functions annotated with `#[job]` are
/// transformed into static variables whose type is `&'static NamedJob`.
2021-03-29 02:05:20 +00:00
#[derive(Debug)]
pub struct NamedJob {
2021-03-29 02:05:20 +00:00
name: &'static str,
build_fn: Opaque<BuildFn>,
run_fn: Opaque<RunFn>,
}
impl NamedJob {
2021-03-29 02:05:20 +00:00
#[doc(hidden)]
pub const fn new_internal(name: &'static str, build_fn: BuildFn, run_fn: RunFn) -> Self {
Self {
name,
build_fn: Opaque(build_fn),
run_fn: Opaque(run_fn),
}
}
/// Initialize a job builder with the name and defaults of this job.
2021-03-29 22:06:47 +00:00
pub fn builder(&self) -> JobBuilder<'static> {
let mut builder = JobBuilder::new(self.name);
2021-03-29 02:05:20 +00:00
(self.build_fn.0 .0)(&mut builder);
builder
}
/// Initialize a job builder with the name and defaults of this job,
/// using the provided job ID.
2021-03-29 22:06:47 +00:00
pub fn builder_with_id(&self, id: Uuid) -> JobBuilder<'static> {
let mut builder = JobBuilder::new_with_id(id, self.name);
2021-03-29 02:05:20 +00:00
(self.build_fn.0 .0)(&mut builder);
builder
}
/// Returns the name of this job.
2021-03-29 02:05:20 +00:00
pub const fn name(&self) -> &'static str {
self.name
}
}