diff --git a/src/jobs.rs b/src/jobs.rs index 6dfdbe9..5327ed0 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -7,8 +7,8 @@ mod nodeinfo; mod process_listeners; pub(crate) use self::{ - contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, - instance::QueryInstance, nodeinfo::QueryNodeinfo, + contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, + nodeinfo::QueryNodeinfo, }; use crate::{ @@ -22,7 +22,7 @@ use background_jobs::{ memory_storage::{ActixTimer, Storage}, Job, Manager, QueueHandle, WorkerConfig, }; -use std::time::Duration; +use std::{convert::TryFrom, num::NonZeroUsize, time::Duration}; fn debug_object(activity: &serde_json::Value) -> &serde_json::Value { let mut object = &activity["object"]["type"]; @@ -44,6 +44,12 @@ pub(crate) fn create_workers( media: MediaCache, config: Config, ) -> (Manager, JobServer) { + let parallelism = std::thread::available_parallelism() + .map(|p| p.get()) + .unwrap_or(1) as u64; + + let parallelism = (parallelism / 2).max(1); + let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| { JobState::new( state.clone(), @@ -64,8 +70,10 @@ pub(crate) fn create_workers( .register::() .register::() .register::() - .set_worker_count("default", 16) - .start(); + .set_worker_count("maintenance", parallelism) + .set_worker_count("apub", parallelism) + .set_worker_count("deliver", parallelism * 3) + .start_with_threads(NonZeroUsize::try_from(parallelism as usize).expect("nonzero")); shared.every(Duration::from_secs(60 * 5), Listeners); diff --git a/src/jobs/apub/announce.rs b/src/jobs/apub/announce.rs index 26048c1..93dec67 100644 --- a/src/jobs/apub/announce.rs +++ b/src/jobs/apub/announce.rs @@ -67,6 +67,7 @@ impl ActixJob for Announce { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Announce"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/apub/follow.rs b/src/jobs/apub/follow.rs index d78b544..10cae22 100644 --- a/src/jobs/apub/follow.rs +++ b/src/jobs/apub/follow.rs @@ -116,6 +116,7 @@ impl ActixJob for Follow { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Follow"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/apub/forward.rs b/src/jobs/apub/forward.rs index 1f72a32..f5e191b 100644 --- a/src/jobs/apub/forward.rs +++ b/src/jobs/apub/forward.rs @@ -52,6 +52,7 @@ impl ActixJob for Forward { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Forward"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/apub/reject.rs b/src/jobs/apub/reject.rs index f6ee0e7..2384426 100644 --- a/src/jobs/apub/reject.rs +++ b/src/jobs/apub/reject.rs @@ -38,6 +38,7 @@ impl ActixJob for Reject { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Reject"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/apub/undo.rs b/src/jobs/apub/undo.rs index 0359bf2..b55d4ae 100644 --- a/src/jobs/apub/undo.rs +++ b/src/jobs/apub/undo.rs @@ -53,6 +53,7 @@ impl ActixJob for Undo { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Undo"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/contact.rs b/src/jobs/contact.rs index 288941a..a98ac8f 100644 --- a/src/jobs/contact.rs +++ b/src/jobs/contact.rs @@ -86,6 +86,7 @@ impl ActixJob for QueryContact { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::QueryContact"; + const QUEUE: &'static str = "maintenance"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index 9c01d8d..223608e 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -55,6 +55,7 @@ impl ActixJob for Deliver { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::Deliver"; + const QUEUE: &'static str = "deliver"; const BACKOFF: Backoff = Backoff::Exponential(8); fn run(self, state: Self::State) -> Self::Future { diff --git a/src/jobs/deliver_many.rs b/src/jobs/deliver_many.rs index 58cdc1f..fc9107c 100644 --- a/src/jobs/deliver_many.rs +++ b/src/jobs/deliver_many.rs @@ -50,6 +50,7 @@ impl ActixJob for DeliverMany { type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "relay::jobs::DeliverMany"; + const QUEUE: &'static str = "deliver"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index d3e2719..4339845 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -110,6 +110,7 @@ impl ActixJob for QueryInstance { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::QueryInstance"; + const QUEUE: &'static str = "maintenance"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index dbe2717..fe86ad8 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -100,6 +100,7 @@ impl ActixJob for QueryNodeinfo { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::QueryNodeinfo"; + const QUEUE: &'static str = "maintenance"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/process_listeners.rs b/src/jobs/process_listeners.rs index bba4e00..1cad2e4 100644 --- a/src/jobs/process_listeners.rs +++ b/src/jobs/process_listeners.rs @@ -28,6 +28,7 @@ impl ActixJob for Listeners { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::Listeners"; + const QUEUE: &'static str = "maintenance"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) })