Use newest background-jobs

This commit is contained in:
asonix 2020-04-20 19:56:50 -05:00
parent 91a57985c9
commit f016f14efe
14 changed files with 54 additions and 157 deletions

16
Cargo.lock generated
View file

@ -470,8 +470,8 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs" name = "background-jobs"
version = "0.8.0-alpha.0" version = "0.8.0-alpha.1"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20" source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45"
dependencies = [ dependencies = [
"background-jobs-actix", "background-jobs-actix",
"background-jobs-core", "background-jobs-core",
@ -479,8 +479,8 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-actix" name = "background-jobs-actix"
version = "0.7.0-alpha.0" version = "0.8.0-alpha.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20" source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45"
dependencies = [ dependencies = [
"actix", "actix",
"actix-rt", "actix-rt",
@ -500,8 +500,8 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-core" name = "background-jobs-core"
version = "0.7.0" version = "0.8.0-alpha.0"
source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20" source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45"
dependencies = [ dependencies = [
"actix", "actix",
"anyhow", "anyhow",
@ -2312,9 +2312,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "standback" name = "standback"
version = "0.2.4" version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d747fd6d33f130039c9518cffa45a83d53986642ca4f872497df668e3f2b6b4d" checksum = "6389164ce46e8a68e1b373787efcca3b6b6620bb50b12d4e8d14380838db316f"
[[package]] [[package]]
name = "static_assertions" name = "static_assertions"

View file

@ -21,8 +21,8 @@ actix-webfinger = "0.3.0-alpha.3"
activitystreams = "0.5.0" activitystreams = "0.5.0"
ammonia = "3.1.0" ammonia = "3.1.0"
async-trait = "0.1.24" async-trait = "0.1.24"
background-jobs = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] } background-jobs = { version = "0.8.0-alpha.1", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] }
background-jobs-core = { version = "0.7.0", git = "https://git.asonix.dog/Aardwolf/background-jobs" } background-jobs-core = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs" }
bytes = "0.5.4" bytes = "0.5.4"
base64 = "0.12" base64 = "0.12"
bb8-postgres = { version = "0.4.0", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] } bb8-postgres = { version = "0.4.0", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] }

View file

@ -8,7 +8,7 @@ use crate::{
}, },
}; };
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use background_jobs::{ActixJob, Processor}; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -17,9 +17,6 @@ pub struct Announce {
actor: Actor, actor: Actor,
} }
#[derive(Clone, Debug)]
pub struct AnnounceProcessor;
impl Announce { impl Announce {
pub fn new(object_id: XsdAnyUri, actor: Actor) -> Self { pub fn new(object_id: XsdAnyUri, actor: Actor) -> Self {
Announce { object_id, actor } Announce { object_id, actor }
@ -60,18 +57,12 @@ fn generate_announce(
} }
impl ActixJob for Announce { impl ActixJob for Announce {
type Processor = AnnounceProcessor;
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "AnnounceProcessor";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(self.perform(state))
} }
} }
impl Processor for AnnounceProcessor {
type Job = Announce;
const NAME: &'static str = "AnnounceProcessor";
const QUEUE: &'static str = "default";
}

View file

@ -6,7 +6,7 @@ use crate::{
jobs::{apub::prepare_activity, Deliver, JobState}, jobs::{apub::prepare_activity, Deliver, JobState},
}; };
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use background_jobs::{ActixJob, Processor}; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -16,9 +16,6 @@ pub struct Follow {
actor: Actor, actor: Actor,
} }
#[derive(Clone, Debug)]
pub struct FollowProcessor;
impl Follow { impl Follow {
pub fn new(is_listener: bool, input: AcceptedObjects, actor: Actor) -> Self { pub fn new(is_listener: bool, input: AcceptedObjects, actor: Actor) -> Self {
Follow { Follow {
@ -105,18 +102,12 @@ fn generate_accept_follow(
} }
impl ActixJob for Follow { impl ActixJob for Follow {
type Processor = FollowProcessor;
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "FollowProcessor";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(self.perform(state))
} }
} }
impl Processor for FollowProcessor {
type Job = Follow;
const NAME: &'static str = "FollowProcessor";
const QUEUE: &'static str = "default";
}

View file

@ -3,7 +3,7 @@ use crate::{
data::Actor, data::Actor,
jobs::{apub::get_inboxes, DeliverMany, JobState}, jobs::{apub::get_inboxes, DeliverMany, JobState},
}; };
use background_jobs::{ActixJob, Processor}; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -12,9 +12,6 @@ pub struct Forward {
actor: Actor, actor: Actor,
} }
#[derive(Clone, Debug)]
pub struct ForwardProcessor;
impl Forward { impl Forward {
pub fn new(input: AcceptedObjects, actor: Actor) -> Self { pub fn new(input: AcceptedObjects, actor: Actor) -> Self {
Forward { input, actor } Forward { input, actor }
@ -34,18 +31,12 @@ impl Forward {
} }
impl ActixJob for Forward { impl ActixJob for Forward {
type Processor = ForwardProcessor;
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "ForwardProcessor";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(self.perform(state))
} }
} }
impl Processor for ForwardProcessor {
type Job = Forward;
const NAME: &'static str = "ForwardProcessor";
const QUEUE: &'static str = "default";
}

View file

@ -14,13 +14,7 @@ mod forward;
mod reject; mod reject;
mod undo; mod undo;
pub use self::{ pub use self::{announce::Announce, follow::Follow, forward::Forward, reject::Reject, undo::Undo};
announce::{Announce, AnnounceProcessor},
follow::{Follow, FollowProcessor},
forward::{Forward, ForwardProcessor},
reject::{Reject, RejectProcessor},
undo::{Undo, UndoProcessor},
};
async fn get_inboxes( async fn get_inboxes(
state: &State, state: &State,

View file

@ -4,15 +4,12 @@ use crate::{
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use background_jobs::{ActixJob, Processor}; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Reject(pub Actor); pub struct Reject(pub Actor);
#[derive(Clone, Debug)]
pub struct RejectProcessor;
impl Reject { impl Reject {
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { async fn perform(self, state: JobState) -> Result<(), anyhow::Error> {
if let Some(_) = state.actors.unfollower(&self.0).await? { if let Some(_) = state.actors.unfollower(&self.0).await? {
@ -29,18 +26,12 @@ impl Reject {
} }
impl ActixJob for Reject { impl ActixJob for Reject {
type Processor = RejectProcessor;
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "RejectProcessor";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(self.perform(state))
} }
} }
impl Processor for RejectProcessor {
type Job = Reject;
const NAME: &'static str = "RejectProcessor";
const QUEUE: &'static str = "default";
}

View file

@ -5,7 +5,7 @@ use crate::{
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use background_jobs::{ActixJob, Processor}; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -14,9 +14,6 @@ pub struct Undo {
actor: Actor, actor: Actor,
} }
#[derive(Clone, Debug)]
pub struct UndoProcessor;
impl Undo { impl Undo {
pub fn new(input: AcceptedObjects, actor: Actor) -> Self { pub fn new(input: AcceptedObjects, actor: Actor) -> Self {
Undo { input, actor } Undo { input, actor }
@ -42,18 +39,12 @@ impl Undo {
} }
impl ActixJob for Undo { impl ActixJob for Undo {
type Processor = UndoProcessor;
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "UndoProcessor";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(self.perform(state))
} }
} }
impl Processor for UndoProcessor {
type Job = Undo;
const NAME: &'static str = "UndoProcessor";
const QUEUE: &'static str = "default";
}

View file

@ -1,7 +1,7 @@
use crate::{error::MyError, jobs::JobState}; use crate::{error::MyError, jobs::JobState};
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use anyhow::Error; use anyhow::Error;
use background_jobs::{ActixJob, Backoff, Processor}; use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -22,14 +22,13 @@ impl Deliver {
} }
} }
#[derive(Clone, Debug)]
pub struct DeliverProcessor;
impl ActixJob for Deliver { impl ActixJob for Deliver {
type State = JobState; type State = JobState;
type Processor = DeliverProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
const NAME: &'static str = "DeliverProcessor";
const BACKOFF: Backoff = Backoff::Exponential(8);
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { Box::pin(async move {
state.requests.deliver(self.to, &self.data).await?; state.requests.deliver(self.to, &self.data).await?;
@ -38,11 +37,3 @@ impl ActixJob for Deliver {
}) })
} }
} }
impl Processor for DeliverProcessor {
type Job = Deliver;
const NAME: &'static str = "DeliverProcessor";
const QUEUE: &'static str = "default";
const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(8);
}

View file

@ -4,7 +4,7 @@ use crate::{
}; };
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use anyhow::Error; use anyhow::Error;
use background_jobs::{ActixJob, Processor}; use background_jobs::ActixJob;
use futures::future::{ready, Ready}; use futures::future::{ready, Ready};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -35,22 +35,13 @@ impl DeliverMany {
} }
} }
#[derive(Clone, Debug)]
pub struct DeliverManyProcessor;
impl ActixJob for DeliverMany { impl ActixJob for DeliverMany {
type State = JobState; type State = JobState;
type Processor = DeliverManyProcessor;
type Future = Ready<Result<(), Error>>; type Future = Ready<Result<(), Error>>;
const NAME: &'static str = "DeliverManyProcessor";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
ready(self.perform(state)) ready(self.perform(state))
} }
} }
impl Processor for DeliverManyProcessor {
type Job = DeliverMany;
const NAME: &'static str = "DeliverManyProcessor";
const QUEUE: &'static str = "default";
}

View file

@ -1,7 +1,7 @@
use crate::{config::UrlKind, jobs::JobState}; use crate::{config::UrlKind, jobs::JobState};
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use anyhow::Error; use anyhow::Error;
use background_jobs::{ActixJob, Processor}; use background_jobs::ActixJob;
use futures::join; use futures::join;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
@ -81,26 +81,17 @@ impl QueryInstance {
} }
} }
#[derive(Clone, Debug)]
pub struct InstanceProcessor;
impl ActixJob for QueryInstance { impl ActixJob for QueryInstance {
type State = JobState; type State = JobState;
type Processor = InstanceProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
const NAME: &'static str = "InstanceProcessor";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(self.perform(state))
} }
} }
impl Processor for InstanceProcessor {
type Job = QueryInstance;
const NAME: &'static str = "InstanceProcessor";
const QUEUE: &'static str = "default";
}
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
struct Instance { struct Instance {
title: String, title: String,

View file

@ -15,14 +15,7 @@ use crate::{
data::{ActorCache, Media, NodeCache, State}, data::{ActorCache, Media, NodeCache, State},
db::Db, db::Db,
error::MyError, error::MyError,
jobs::{ jobs::{process_listeners::Listeners, storage::Storage},
deliver::DeliverProcessor,
deliver_many::DeliverManyProcessor,
instance::InstanceProcessor,
nodeinfo::NodeinfoProcessor,
process_listeners::{Listeners, ListenersProcessor},
storage::Storage,
},
requests::Requests, requests::Requests,
}; };
use background_jobs::{Job, QueueHandle, WorkerConfig}; use background_jobs::{Job, QueueHandle, WorkerConfig};
@ -56,17 +49,17 @@ pub fn create_workers(
config.clone(), config.clone(),
) )
}) })
.register(DeliverProcessor) .register::<Deliver>()
.register(DeliverManyProcessor) .register::<Deliver>()
.register(NodeinfoProcessor) .register::<QueryNodeinfo>()
.register(InstanceProcessor) .register::<QueryInstance>()
.register(ListenersProcessor) .register::<Listeners>()
.register(apub::AnnounceProcessor) .register::<apub::Announce>()
.register(apub::FollowProcessor) .register::<apub::Follow>()
.register(apub::ForwardProcessor) .register::<apub::Forward>()
.register(apub::RejectProcessor) .register::<apub::Reject>()
.register(apub::UndoProcessor) .register::<apub::Undo>()
.set_processor_count("default", 4) .set_worker_count("default", 4)
.start(remote_handle); .start(remote_handle);
} }

View file

@ -1,7 +1,7 @@
use crate::jobs::JobState; use crate::jobs::JobState;
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use anyhow::Error; use anyhow::Error;
use background_jobs::{ActixJob, Processor}; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@ -52,26 +52,17 @@ impl QueryNodeinfo {
} }
} }
#[derive(Clone, Debug)]
pub struct NodeinfoProcessor;
impl ActixJob for QueryNodeinfo { impl ActixJob for QueryNodeinfo {
type State = JobState; type State = JobState;
type Processor = NodeinfoProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
const NAME: &'static str = "NodeinfoProcessor";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(self.perform(state))
} }
} }
impl Processor for NodeinfoProcessor {
type Job = QueryNodeinfo;
const NAME: &'static str = "NodeinfoProcessor";
const QUEUE: &'static str = "default";
}
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct Nodeinfo { struct Nodeinfo {

View file

@ -1,14 +1,11 @@
use crate::jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState}; use crate::jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState};
use anyhow::Error; use anyhow::Error;
use background_jobs::{ActixJob, Processor}; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Listeners; pub struct Listeners;
#[derive(Clone, Debug)]
pub struct ListenersProcessor;
impl Listeners { impl Listeners {
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {
for listener in state.state.listeners().await { for listener in state.state.listeners().await {
@ -24,17 +21,11 @@ impl Listeners {
impl ActixJob for Listeners { impl ActixJob for Listeners {
type State = JobState; type State = JobState;
type Processor = ListenersProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
const NAME: &'static str = "ProcessListenersProcessor";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(self.perform(state))
} }
} }
impl Processor for ListenersProcessor {
type Job = Listeners;
const NAME: &'static str = "ProcessListenersProcessor";
const QUEUE: &'static str = "default";
}