Update to newest background-jobs, implement Job rather than ActixJob

This commit is contained in:
asonix 2024-01-08 17:00:15 -06:00
parent 36aa9120ea
commit c640567206
16 changed files with 144 additions and 113 deletions

129
Cargo.lock generated
View file

@ -66,7 +66,7 @@ dependencies = [
"actix-tls", "actix-tls",
"actix-utils", "actix-utils",
"ahash 0.8.7", "ahash 0.8.7",
"base64 0.21.5", "base64 0.21.6",
"bitflags 2.4.1", "bitflags 2.4.1",
"brotli", "brotli",
"bytes", "bytes",
@ -395,7 +395,7 @@ dependencies = [
"anyhow", "anyhow",
"async-cpupool", "async-cpupool",
"background-jobs", "background-jobs",
"base64 0.21.5", "base64 0.21.6",
"bcrypt", "bcrypt",
"clap", "clap",
"config", "config",
@ -438,7 +438,6 @@ dependencies = [
"tracing", "tracing",
"tracing-actix-web", "tracing-actix-web",
"tracing-error", "tracing-error",
"tracing-futures",
"tracing-log", "tracing-log",
"tracing-opentelemetry", "tracing-opentelemetry",
"tracing-subscriber", "tracing-subscriber",
@ -508,6 +507,12 @@ dependencies = [
"syn 2.0.48", "syn 2.0.48",
] ]
[[package]]
name = "atomic"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -561,19 +566,18 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs" name = "background-jobs"
version = "0.16.0" version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://git.asonix.dog/asonix/background-jobs#e02de4a15340eefd41d130d785ed59019558986f"
checksum = "220c1b532c3b8532a43282f0871cf43d6238421f0e72084cb1f6ddb65fc0e8e6"
dependencies = [ dependencies = [
"background-jobs-actix", "background-jobs-actix",
"background-jobs-core", "background-jobs-core",
"background-jobs-metrics",
] ]
[[package]] [[package]]
name = "background-jobs-actix" name = "background-jobs-actix"
version = "0.16.0" version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://git.asonix.dog/asonix/background-jobs#e02de4a15340eefd41d130d785ed59019558986f"
checksum = "d084a3dec6f0bd656a7c388e255e988a340b397985bfe7bebdb0ebebb34b50b6"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"anyhow", "anyhow",
@ -585,17 +589,14 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",
"tracing-futures",
"uuid", "uuid",
] ]
[[package]] [[package]]
name = "background-jobs-core" name = "background-jobs-core"
version = "0.16.0" version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://git.asonix.dog/asonix/background-jobs#e02de4a15340eefd41d130d785ed59019558986f"
checksum = "c585c87a70e090f8f0b52cd25951ba156e7faca26464e11611fc6ab0d700815c"
dependencies = [ dependencies = [
"actix-rt",
"anyhow", "anyhow",
"async-trait", "async-trait",
"event-listener", "event-listener",
@ -605,7 +606,19 @@ dependencies = [
"thiserror", "thiserror",
"time", "time",
"tracing", "tracing",
"tracing-futures", "uuid",
]
[[package]]
name = "background-jobs-metrics"
version = "0.17.0"
source = "git+https://git.asonix.dog/asonix/background-jobs#e02de4a15340eefd41d130d785ed59019558986f"
dependencies = [
"async-trait",
"background-jobs-core",
"metrics",
"metrics-util",
"tracing",
"uuid", "uuid",
] ]
@ -632,9 +645,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.21.5" version = "0.21.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" checksum = "c79fed4cdb43e993fcdadc7e58a09fd0e3e649c4436fa11da71c9f1f3ee7feb9"
[[package]] [[package]]
name = "base64-simd" name = "base64-simd"
@ -657,7 +670,7 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d1c9c15093eb224f0baa400f38fcd713fc1391a6f1c389d886beef146d60a3" checksum = "28d1c9c15093eb224f0baa400f38fcd713fc1391a6f1c389d886beef146d60a3"
dependencies = [ dependencies = [
"base64 0.21.5", "base64 0.21.6",
"blowfish", "blowfish",
"getrandom", "getrandom",
"subtle", "subtle",
@ -819,9 +832,9 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.4.13" version = "4.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52bdc885e4cacc7f7c9eedc1ef6da641603180c783c41a15c264944deeaab642" checksum = "33e92c5c1a78c62968ec57dbc2440366a2d6e5a23faf829970ff1585dc6b18e2"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
"clap_derive", "clap_derive",
@ -829,9 +842,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.4.12" version = "4.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9" checksum = "f4323769dc8a61e2c39ad7dc26f6f2800524691a44d74fe3d1071a5c24db6370"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",
@ -863,6 +876,15 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "concurrent-queue"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "config" name = "config"
version = "0.13.4" version = "0.13.4"
@ -987,44 +1009,37 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.10" version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2" checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b"
dependencies = [ dependencies = [
"cfg-if",
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]] [[package]]
name = "crossbeam-deque" name = "crossbeam-deque"
version = "0.8.4" version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751" checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [ dependencies = [
"cfg-if",
"crossbeam-epoch", "crossbeam-epoch",
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]] [[package]]
name = "crossbeam-epoch" name = "crossbeam-epoch"
version = "0.9.17" version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e3681d554572a651dda4186cd47240627c3d0114d45a95f6ad27f2f22e7548d" checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [ dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.18" version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
@ -1251,9 +1266,14 @@ dependencies = [
[[package]] [[package]]
name = "event-listener" name = "event-listener"
version = "2.5.3" version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]] [[package]]
name = "fastrand" name = "fastrand"
@ -1511,7 +1531,7 @@ version = "7.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
dependencies = [ dependencies = [
"base64 0.21.5", "base64 0.21.6",
"byteorder", "byteorder",
"flate2", "flate2",
"nom", "nom",
@ -1584,7 +1604,7 @@ dependencies = [
"actix-http", "actix-http",
"actix-rt", "actix-rt",
"actix-web", "actix-web",
"base64 0.21.5", "base64 0.21.6",
"futures-core", "futures-core",
"http-signature-normalization", "http-signature-normalization",
"ring", "ring",
@ -1603,7 +1623,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86048ef6b1d59bcb2cdde0100bb16b1a29ce78ab6dd4a90706ba0791a2831b5a" checksum = "86048ef6b1d59bcb2cdde0100bb16b1a29ce78ab6dd4a90706ba0791a2831b5a"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"base64 0.21.5", "base64 0.21.6",
"http-signature-normalization", "http-signature-normalization",
"httpdate", "httpdate",
"reqwest", "reqwest",
@ -1992,7 +2012,7 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83a4c4718a371ddfb7806378f23617876eea8b82e5ff1324516bcd283249d9ea" checksum = "83a4c4718a371ddfb7806378f23617876eea8b82e5ff1324516bcd283249d9ea"
dependencies = [ dependencies = [
"base64 0.21.5", "base64 0.21.6",
"hyper", "hyper",
"indexmap 1.9.3", "indexmap 1.9.3",
"ipnet", "ipnet",
@ -2398,6 +2418,12 @@ dependencies = [
"vlq", "vlq",
] ]
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.2" version = "0.11.2"
@ -2981,7 +3007,7 @@ version = "0.11.23"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41"
dependencies = [ dependencies = [
"base64 0.21.5", "base64 0.21.6",
"bytes", "bytes",
"encoding_rs", "encoding_rs",
"futures-core", "futures-core",
@ -3129,7 +3155,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88ca6a6947c6fe6454c93c3bb65b92f9680e6f9e906e75e30631110f2227344c" checksum = "88ca6a6947c6fe6454c93c3bb65b92f9680e6f9e906e75e30631110f2227344c"
dependencies = [ dependencies = [
"base64 0.21.5", "base64 0.21.6",
"num-bigint-dig", "num-bigint-dig",
"rsa", "rsa",
"thiserror", "thiserror",
@ -3158,7 +3184,7 @@ version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b0a930679d54e46fa4e66be3d9a333026da04d2b659e42aab4dfd1586452815" checksum = "9b0a930679d54e46fa4e66be3d9a333026da04d2b659e42aab4dfd1586452815"
dependencies = [ dependencies = [
"base64 0.21.5", "base64 0.21.6",
"bytecount", "bytecount",
"itertools 0.11.0", "itertools 0.11.0",
"md5", "md5",
@ -3216,7 +3242,7 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [ dependencies = [
"base64 0.21.5", "base64 0.21.6",
] ]
[[package]] [[package]]
@ -3514,9 +3540,9 @@ dependencies = [
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.10.1" version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccbca6f34534eb78dbee83f6b2c9442fea7113f43d9e80ea320f0972ae5dc08d" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]] [[package]]
name = "subtle" name = "subtle"
@ -3880,7 +3906,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"axum", "axum",
"base64 0.21.5", "base64 0.21.6",
"bytes", "bytes",
"futures-core", "futures-core",
"futures-util", "futures-util",
@ -3909,7 +3935,7 @@ dependencies = [
"async-stream", "async-stream",
"async-trait", "async-trait",
"axum", "axum",
"base64 0.21.5", "base64 0.21.6",
"bytes", "bytes",
"h2", "h2",
"http", "http",
@ -4162,6 +4188,7 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
dependencies = [ dependencies = [
"atomic",
"getrandom", "getrandom",
"serde", "serde",
] ]

View file

@ -72,7 +72,6 @@ thiserror = "1.0"
time = { version = "0.3.17", features = ["serde"] } time = { version = "0.3.17", features = ["serde"] }
tracing = "0.1" tracing = "0.1"
tracing-error = "0.2" tracing-error = "0.2"
tracing-futures = "0.2"
tracing-log = "0.2" tracing-log = "0.2"
tracing-opentelemetry = "0.22" tracing-opentelemetry = "0.22"
tracing-subscriber = { version = "0.3", features = [ tracing-subscriber = { version = "0.3", features = [
@ -85,9 +84,10 @@ uuid = { version = "1", features = ["v4", "serde"] }
streem = "0.2.0" streem = "0.2.0"
[dependencies.background-jobs] [dependencies.background-jobs]
version = "0.16.0" version = "0.17.0"
git = "https://git.asonix.dog/asonix/background-jobs"
default-features = false default-features = false
features = ["background-jobs-actix", "error-logging"] features = ["background-jobs-actix", "background-jobs-metrics", "error-logging"]
[dependencies.http-signature-normalization-actix] [dependencies.http-signature-normalization-actix]
version = "0.11.0" version = "0.11.0"

View file

@ -1,3 +1,4 @@
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>; pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

View file

@ -20,6 +20,7 @@ use crate::{
}; };
use background_jobs::{ use background_jobs::{
memory_storage::{ActixTimer, Storage}, memory_storage::{ActixTimer, Storage},
metrics::MetricsStorage,
Job, QueueHandle, WorkerConfig, Job, QueueHandle, WorkerConfig,
}; };
use std::time::Duration; use std::time::Duration;
@ -46,15 +47,18 @@ pub(crate) fn create_workers(
) -> JobServer { ) -> JobServer {
let deliver_concurrency = config.deliver_concurrency(); let deliver_concurrency = config.deliver_concurrency();
let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| { let queue_handle = WorkerConfig::new(
JobState::new( MetricsStorage::wrap(Storage::new(ActixTimer)),
state.clone(), move |queue_handle| {
actors.clone(), JobState::new(
JobServer::new(queue_handle), state.clone(),
media.clone(), actors.clone(),
config.clone(), JobServer::new(queue_handle),
) media.clone(),
}) config.clone(),
)
},
)
.register::<Deliver>() .register::<Deliver>()
.register::<DeliverMany>() .register::<DeliverMany>()
.register::<QueryNodeinfo>() .register::<QueryNodeinfo>()

View file

@ -2,14 +2,14 @@ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
db::Actor, db::Actor,
error::Error, error::Error,
future::BoxFuture,
jobs::{ jobs::{
apub::{get_inboxes, prepare_activity}, apub::{get_inboxes, prepare_activity},
DeliverMany, JobState, DeliverMany, JobState,
}, },
}; };
use activitystreams::{activity::Announce as AsAnnounce, iri_string::types::IriString}; use activitystreams::{activity::Announce as AsAnnounce, iri_string::types::IriString};
use background_jobs::ActixJob; use background_jobs::Job;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Announce { pub(crate) struct Announce {
@ -62,9 +62,9 @@ fn generate_announce(
) )
} }
impl ActixJob for Announce { impl Job for Announce {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::apub::Announce"; const NAME: &'static str = "relay::jobs::apub::Announce";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";

View file

@ -3,6 +3,7 @@ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
db::Actor, db::Actor,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo}, jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo},
}; };
use activitystreams::{ use activitystreams::{
@ -10,8 +11,7 @@ use activitystreams::{
iri_string::types::IriString, iri_string::types::IriString,
prelude::*, prelude::*,
}; };
use background_jobs::ActixJob; use background_jobs::Job;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Follow { pub(crate) struct Follow {
@ -111,9 +111,9 @@ fn generate_accept_follow(
) )
} }
impl ActixJob for Follow { impl Job for Follow {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::apub::Follow"; const NAME: &'static str = "relay::jobs::apub::Follow";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";

View file

@ -2,11 +2,11 @@ use crate::{
apub::AcceptedActivities, apub::AcceptedActivities,
db::Actor, db::Actor,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::{apub::get_inboxes, DeliverMany, JobState}, jobs::{apub::get_inboxes, DeliverMany, JobState},
}; };
use activitystreams::prelude::*; use activitystreams::prelude::*;
use background_jobs::ActixJob; use background_jobs::Job;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Forward { pub(crate) struct Forward {
@ -47,9 +47,9 @@ impl Forward {
} }
} }
impl ActixJob for Forward { impl Job for Forward {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::apub::Forward"; const NAME: &'static str = "relay::jobs::apub::Forward";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";

View file

@ -2,10 +2,10 @@ use crate::{
config::UrlKind, config::UrlKind,
db::Actor, db::Actor,
error::Error, error::Error,
future::BoxFuture,
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use background_jobs::ActixJob; use background_jobs::Job;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Reject(pub(crate) Actor); pub(crate) struct Reject(pub(crate) Actor);
@ -33,9 +33,9 @@ impl Reject {
} }
} }
impl ActixJob for Reject { impl Job for Reject {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::apub::Reject"; const NAME: &'static str = "relay::jobs::apub::Reject";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";

View file

@ -3,11 +3,11 @@ use crate::{
config::UrlKind, config::UrlKind,
db::Actor, db::Actor,
error::Error, error::Error,
future::BoxFuture,
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use activitystreams::prelude::BaseExt; use activitystreams::prelude::BaseExt;
use background_jobs::ActixJob; use background_jobs::Job;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Undo { pub(crate) struct Undo {
@ -48,9 +48,9 @@ impl Undo {
} }
} }
impl ActixJob for Undo { impl Job for Undo {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::apub::Undo"; const NAME: &'static str = "relay::jobs::apub::Undo";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";

View file

@ -1,12 +1,12 @@
use crate::{ use crate::{
apub::AcceptedActors, apub::AcceptedActors,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::JobState, jobs::JobState,
requests::BreakerStrategy, requests::BreakerStrategy,
}; };
use activitystreams::{iri_string::types::IriString, object::Image, prelude::*}; use activitystreams::{iri_string::types::IriString, object::Image, prelude::*};
use background_jobs::ActixJob; use background_jobs::Job;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct QueryContact { pub(crate) struct QueryContact {
@ -85,9 +85,9 @@ fn to_contact(contact: AcceptedActors) -> Option<(String, String, IriString, Iri
Some((username, display_name, url, avatar)) Some((username, display_name, url, avatar))
} }
impl ActixJob for QueryContact { impl Job for QueryContact {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::QueryContact"; const NAME: &'static str = "relay::jobs::QueryContact";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";

View file

@ -1,11 +1,11 @@
use crate::{ use crate::{
error::Error, error::Error,
future::BoxFuture,
jobs::{debug_object, JobState}, jobs::{debug_object, JobState},
requests::BreakerStrategy, requests::BreakerStrategy,
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use background_jobs::{ActixJob, Backoff}; use background_jobs::{Backoff, Job};
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Deliver { pub(crate) struct Deliver {
@ -56,9 +56,9 @@ impl Deliver {
} }
} }
impl ActixJob for Deliver { impl Job for Deliver {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::Deliver"; const NAME: &'static str = "relay::jobs::Deliver";
const QUEUE: &'static str = "deliver"; const QUEUE: &'static str = "deliver";

View file

@ -1,10 +1,10 @@
use crate::{ use crate::{
error::Error, error::Error,
future::LocalBoxFuture, future::BoxFuture,
jobs::{debug_object, Deliver, JobState}, jobs::{debug_object, Deliver, JobState},
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use background_jobs::ActixJob; use background_jobs::Job;
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct DeliverMany { pub(crate) struct DeliverMany {
@ -45,9 +45,9 @@ impl DeliverMany {
} }
} }
impl ActixJob for DeliverMany { impl Job for DeliverMany {
type State = JobState; type State = JobState;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::DeliverMany"; const NAME: &'static str = "relay::jobs::DeliverMany";
const QUEUE: &'static str = "deliver"; const QUEUE: &'static str = "deliver";

View file

@ -1,12 +1,12 @@
use crate::{ use crate::{
config::UrlKind, config::UrlKind,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::{Boolish, JobState}, jobs::{Boolish, JobState},
requests::BreakerStrategy, requests::BreakerStrategy,
}; };
use activitystreams::{iri, iri_string::types::IriString}; use activitystreams::{iri, iri_string::types::IriString};
use background_jobs::ActixJob; use background_jobs::Job;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct QueryInstance { pub(crate) struct QueryInstance {
@ -165,9 +165,9 @@ impl QueryInstance {
} }
} }
impl ActixJob for QueryInstance { impl Job for QueryInstance {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::QueryInstance"; const NAME: &'static str = "relay::jobs::QueryInstance";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";

View file

@ -1,18 +1,18 @@
use crate::{ use crate::{
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::{Boolish, JobState, QueryContact}, jobs::{Boolish, JobState, QueryContact},
requests::BreakerStrategy, requests::BreakerStrategy,
}; };
use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany}; use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany};
use background_jobs::ActixJob; use background_jobs::Job;
use std::{fmt::Debug, future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct QueryNodeinfo { pub(crate) struct QueryNodeinfo {
actor_id: IriString, actor_id: IriString,
} }
impl Debug for QueryNodeinfo { impl std::fmt::Debug for QueryNodeinfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryNodeinfo") f.debug_struct("QueryNodeinfo")
.field("actor_id", &self.actor_id.to_string()) .field("actor_id", &self.actor_id.to_string())
@ -104,9 +104,9 @@ impl QueryNodeinfo {
} }
} }
impl ActixJob for QueryNodeinfo { impl Job for QueryNodeinfo {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::QueryNodeinfo"; const NAME: &'static str = "relay::jobs::QueryNodeinfo";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";

View file

@ -1,9 +1,9 @@
use crate::{ use crate::{
error::Error, error::Error,
future::BoxFuture,
jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState}, jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState},
}; };
use background_jobs::ActixJob; use background_jobs::Job;
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Listeners; pub(crate) struct Listeners;
@ -23,9 +23,9 @@ impl Listeners {
} }
} }
impl ActixJob for Listeners { impl Job for Listeners {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::Listeners"; const NAME: &'static str = "relay::jobs::Listeners";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";

View file

@ -1,6 +1,5 @@
use crate::{error::Error, jobs::JobState}; use crate::{error::Error, future::BoxFuture, jobs::JobState};
use background_jobs::{ActixJob, Backoff}; use background_jobs::{Backoff, Job};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct RecordLastOnline; pub(crate) struct RecordLastOnline;
@ -14,9 +13,9 @@ impl RecordLastOnline {
} }
} }
impl ActixJob for RecordLastOnline { impl Job for RecordLastOnline {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = BoxFuture<'static, anyhow::Result<()>>;
const NAME: &'static str = "relay::jobs::RecordLastOnline"; const NAME: &'static str = "relay::jobs::RecordLastOnline";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";