forked from mirrors/relay
Make better use of cores for jobs
This commit is contained in:
parent
95f98ec052
commit
787c8312bc
12 changed files with 24 additions and 5 deletions
18
src/jobs.rs
18
src/jobs.rs
|
@ -7,8 +7,8 @@ mod nodeinfo;
|
||||||
mod process_listeners;
|
mod process_listeners;
|
||||||
|
|
||||||
pub(crate) use self::{
|
pub(crate) use self::{
|
||||||
contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany,
|
contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance,
|
||||||
instance::QueryInstance, nodeinfo::QueryNodeinfo,
|
nodeinfo::QueryNodeinfo,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -22,7 +22,7 @@ use background_jobs::{
|
||||||
memory_storage::{ActixTimer, Storage},
|
memory_storage::{ActixTimer, Storage},
|
||||||
Job, Manager, QueueHandle, WorkerConfig,
|
Job, Manager, QueueHandle, WorkerConfig,
|
||||||
};
|
};
|
||||||
use std::time::Duration;
|
use std::{convert::TryFrom, num::NonZeroUsize, time::Duration};
|
||||||
|
|
||||||
fn debug_object(activity: &serde_json::Value) -> &serde_json::Value {
|
fn debug_object(activity: &serde_json::Value) -> &serde_json::Value {
|
||||||
let mut object = &activity["object"]["type"];
|
let mut object = &activity["object"]["type"];
|
||||||
|
@ -44,6 +44,12 @@ pub(crate) fn create_workers(
|
||||||
media: MediaCache,
|
media: MediaCache,
|
||||||
config: Config,
|
config: Config,
|
||||||
) -> (Manager, JobServer) {
|
) -> (Manager, JobServer) {
|
||||||
|
let parallelism = std::thread::available_parallelism()
|
||||||
|
.map(|p| p.get())
|
||||||
|
.unwrap_or(1) as u64;
|
||||||
|
|
||||||
|
let parallelism = (parallelism / 2).max(1);
|
||||||
|
|
||||||
let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| {
|
let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| {
|
||||||
JobState::new(
|
JobState::new(
|
||||||
state.clone(),
|
state.clone(),
|
||||||
|
@ -64,8 +70,10 @@ pub(crate) fn create_workers(
|
||||||
.register::<apub::Forward>()
|
.register::<apub::Forward>()
|
||||||
.register::<apub::Reject>()
|
.register::<apub::Reject>()
|
||||||
.register::<apub::Undo>()
|
.register::<apub::Undo>()
|
||||||
.set_worker_count("default", 16)
|
.set_worker_count("maintenance", parallelism)
|
||||||
.start();
|
.set_worker_count("apub", parallelism)
|
||||||
|
.set_worker_count("deliver", parallelism * 3)
|
||||||
|
.start_with_threads(NonZeroUsize::try_from(parallelism as usize).expect("nonzero"));
|
||||||
|
|
||||||
shared.every(Duration::from_secs(60 * 5), Listeners);
|
shared.every(Duration::from_secs(60 * 5), Listeners);
|
||||||
|
|
||||||
|
|
|
@ -67,6 +67,7 @@ impl ActixJob for Announce {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Announce";
|
const NAME: &'static str = "relay::jobs::apub::Announce";
|
||||||
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
|
@ -116,6 +116,7 @@ impl ActixJob for Follow {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Follow";
|
const NAME: &'static str = "relay::jobs::apub::Follow";
|
||||||
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
|
@ -52,6 +52,7 @@ impl ActixJob for Forward {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Forward";
|
const NAME: &'static str = "relay::jobs::apub::Forward";
|
||||||
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
|
@ -38,6 +38,7 @@ impl ActixJob for Reject {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Reject";
|
const NAME: &'static str = "relay::jobs::apub::Reject";
|
||||||
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
|
@ -53,6 +53,7 @@ impl ActixJob for Undo {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Undo";
|
const NAME: &'static str = "relay::jobs::apub::Undo";
|
||||||
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
|
@ -86,6 +86,7 @@ impl ActixJob for QueryContact {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::QueryContact";
|
const NAME: &'static str = "relay::jobs::QueryContact";
|
||||||
|
const QUEUE: &'static str = "maintenance";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
|
@ -55,6 +55,7 @@ impl ActixJob for Deliver {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::Deliver";
|
const NAME: &'static str = "relay::jobs::Deliver";
|
||||||
|
const QUEUE: &'static str = "deliver";
|
||||||
const BACKOFF: Backoff = Backoff::Exponential(8);
|
const BACKOFF: Backoff = Backoff::Exponential(8);
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
|
|
|
@ -50,6 +50,7 @@ impl ActixJob for DeliverMany {
|
||||||
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::DeliverMany";
|
const NAME: &'static str = "relay::jobs::DeliverMany";
|
||||||
|
const QUEUE: &'static str = "deliver";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
|
@ -110,6 +110,7 @@ impl ActixJob for QueryInstance {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::QueryInstance";
|
const NAME: &'static str = "relay::jobs::QueryInstance";
|
||||||
|
const QUEUE: &'static str = "maintenance";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
|
@ -100,6 +100,7 @@ impl ActixJob for QueryNodeinfo {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::QueryNodeinfo";
|
const NAME: &'static str = "relay::jobs::QueryNodeinfo";
|
||||||
|
const QUEUE: &'static str = "maintenance";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
|
@ -28,6 +28,7 @@ impl ActixJob for Listeners {
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::Listeners";
|
const NAME: &'static str = "relay::jobs::Listeners";
|
||||||
|
const QUEUE: &'static str = "maintenance";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
|
|
Loading…
Reference in a new issue