Run workers on handler threads

This commit is contained in:
asonix 2023-06-23 15:08:59 -05:00
parent 74f35faa22
commit 346664396c
2 changed files with 12 additions and 22 deletions

View file

@ -21,9 +21,9 @@ use crate::{
}; };
use background_jobs::{ use background_jobs::{
memory_storage::{ActixTimer, Storage}, memory_storage::{ActixTimer, Storage},
Job, Manager, QueueHandle, WorkerConfig, Job, QueueHandle, WorkerConfig,
}; };
use std::{convert::TryFrom, num::NonZeroUsize, time::Duration}; use std::time::Duration;
fn debug_object(activity: &serde_json::Value) -> &serde_json::Value { fn debug_object(activity: &serde_json::Value) -> &serde_json::Value {
let mut object = &activity["object"]["type"]; let mut object = &activity["object"]["type"];
@ -44,11 +44,8 @@ pub(crate) fn create_workers(
actors: ActorCache, actors: ActorCache,
media: MediaCache, media: MediaCache,
config: Config, config: Config,
) -> (Manager, JobServer) { ) -> JobServer {
let parallelism = std::thread::available_parallelism() let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| {
.unwrap_or_else(|_| NonZeroUsize::try_from(1).expect("nonzero"));
let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| {
JobState::new( JobState::new(
state.clone(), state.clone(),
actors.clone(), actors.clone(),
@ -72,14 +69,12 @@ pub(crate) fn create_workers(
.set_worker_count("maintenance", 2) .set_worker_count("maintenance", 2)
.set_worker_count("apub", 2) .set_worker_count("apub", 2)
.set_worker_count("deliver", 8) .set_worker_count("deliver", 8)
.start_with_threads(parallelism); .start();
shared.every(Duration::from_secs(60 * 5), Listeners); queue_handle.every(Duration::from_secs(60 * 5), Listeners);
shared.every(Duration::from_secs(60 * 10), RecordLastOnline); queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline);
let job_server = JobServer::new(shared.queue_handle().clone()); JobServer::new(queue_handle)
(shared, job_server)
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View file

@ -246,10 +246,6 @@ async fn do_server_main(
tracing::warn!("Creating state"); tracing::warn!("Creating state");
let state = State::build(db.clone()).await?; let state = State::build(db.clone()).await?;
tracing::warn!("Creating workers");
let (manager, job_server) =
create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
if let Some((token, admin_handle)) = config.telegram_info() { if let Some((token, admin_handle)) = config.telegram_info() {
tracing::warn!("Creating telegram handler"); tracing::warn!("Creating telegram handler");
telegram::start(admin_handle.to_owned(), db.clone(), token); telegram::start(admin_handle.to_owned(), db.clone(), token);
@ -261,13 +257,16 @@ async fn do_server_main(
let server = HttpServer::new(move || { let server = HttpServer::new(move || {
let requests = state.requests(&config); let requests = state.requests(&config);
let job_server =
create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
let app = App::new() let app = App::new()
.app_data(web::Data::new(db.clone())) .app_data(web::Data::new(db.clone()))
.app_data(web::Data::new(state.clone())) .app_data(web::Data::new(state.clone()))
.app_data(web::Data::new(requests.clone())) .app_data(web::Data::new(requests.clone()))
.app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(actors.clone()))
.app_data(web::Data::new(config.clone())) .app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(job_server.clone())) .app_data(web::Data::new(job_server))
.app_data(web::Data::new(media.clone())) .app_data(web::Data::new(media.clone()))
.app_data(web::Data::new(collector.clone())); .app_data(web::Data::new(collector.clone()));
@ -336,10 +335,6 @@ async fn do_server_main(
tracing::warn!("Server closed"); tracing::warn!("Server closed");
drop(manager);
tracing::warn!("Main complete");
Ok(()) Ok(())
} }