Update background jobs

This commit is contained in:
Aode (lion) 2021-10-29 19:26:57 -05:00
parent e7bbf3454b
commit 8021dca1dd
3 changed files with 21 additions and 30 deletions

22
Cargo.lock generated
View file

@ -408,7 +408,7 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs" name = "background-jobs"
version = "0.11.0" version = "0.11.0"
source = "git+https://git.asonix.dog/asonix/background-jobs?branch=main#7e1e89e77701854d3a983f9c82fc255270b6823f" source = "git+https://git.asonix.dog/asonix/background-jobs?branch=main#70ab459ae91d26a702c61b3d35d702dfbf0d61b6"
dependencies = [ dependencies = [
"background-jobs-actix", "background-jobs-actix",
"background-jobs-core", "background-jobs-core",
@ -417,7 +417,7 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-actix" name = "background-jobs-actix"
version = "0.11.0" version = "0.11.0"
source = "git+https://git.asonix.dog/asonix/background-jobs?branch=main#7e1e89e77701854d3a983f9c82fc255270b6823f" source = "git+https://git.asonix.dog/asonix/background-jobs?branch=main#70ab459ae91d26a702c61b3d35d702dfbf0d61b6"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"anyhow", "anyhow",
@ -438,7 +438,7 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-core" name = "background-jobs-core"
version = "0.10.0" version = "0.10.0"
source = "git+https://git.asonix.dog/asonix/background-jobs?branch=main#7e1e89e77701854d3a983f9c82fc255270b6823f" source = "git+https://git.asonix.dog/asonix/background-jobs?branch=main#70ab459ae91d26a702c61b3d35d702dfbf0d61b6"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"anyhow", "anyhow",
@ -2479,9 +2479,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.12.0" version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc" checksum = "588b2d10a336da58d877567cd8fb8a14b463e2104910f8132cd054b4b96e29ee"
dependencies = [ dependencies = [
"autocfg 1.0.1", "autocfg 1.0.1",
"bytes", "bytes",
@ -2509,9 +2509,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "1.5.0" version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2dd85aeaba7b68df939bd357c6afb36c87951be9e80bf9c859f2fc3e9fca0fd" checksum = "114383b041aa6212c579467afa0075fbbdd0718de036100bc0ba7961d8cb9095"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2531,9 +2531,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-stream" name = "tokio-stream"
version = "0.1.7" version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
@ -2542,9 +2542,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.6.8" version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd" checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",

View file

@ -23,30 +23,19 @@ use crate::{
use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig}; use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig};
use std::time::Duration; use std::time::Duration;
pub(crate) fn create_server() -> JobServer {
let shared = background_jobs::create_server(Storage::new());
shared.every(Duration::from_secs(60 * 5), Listeners);
JobServer::new(shared)
}
pub(crate) fn create_workers( pub(crate) fn create_workers(
db: Db, db: Db,
state: State, state: State,
actors: ActorCache, actors: ActorCache,
job_server: JobServer,
media: MediaCache, media: MediaCache,
config: Config, config: Config,
) { ) -> JobServer {
let remote_handle = job_server.remote.clone(); let queue_handle = WorkerConfig::new(Storage::new(), move |queue_handle| {
WorkerConfig::new(move || {
JobState::new( JobState::new(
db.clone(), db.clone(),
state.clone(), state.clone(),
actors.clone(), actors.clone(),
job_server.clone(), JobServer::new(queue_handle),
media.clone(), media.clone(),
config.clone(), config.clone(),
) )
@ -64,7 +53,11 @@ pub(crate) fn create_workers(
.register::<apub::Reject>() .register::<apub::Reject>()
.register::<apub::Undo>() .register::<apub::Undo>()
.set_worker_count("default", 16) .set_worker_count("default", 16)
.start(remote_handle); .start();
queue_handle.every(Duration::from_secs(60 * 5), Listeners);
JobServer::new(queue_handle)
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View file

@ -23,7 +23,7 @@ use self::{
config::Config, config::Config,
data::{ActorCache, MediaCache, State}, data::{ActorCache, MediaCache, State},
db::Db, db::Db,
jobs::{create_server, create_workers}, jobs::create_workers,
middleware::{DebugPayload, RelayResolver}, middleware::{DebugPayload, RelayResolver},
routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics}, routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics},
}; };
@ -96,13 +96,11 @@ async fn main() -> Result<(), anyhow::Error> {
let media = MediaCache::new(db.clone()); let media = MediaCache::new(db.clone());
let state = State::build(db.clone()).await?; let state = State::build(db.clone()).await?;
let actors = ActorCache::new(db.clone()); let actors = ActorCache::new(db.clone());
let job_server = create_server();
create_workers( let job_server = create_workers(
db.clone(), db.clone(),
state.clone(), state.clone(),
actors.clone(), actors.clone(),
job_server.clone(),
media.clone(), media.clone(),
config.clone(), config.clone(),
); );