From 346664396c34f5bddce8bd6d8b3eebf2be5be2da Mon Sep 17 00:00:00 2001 From: asonix Date: Fri, 23 Jun 2023 15:08:59 -0500 Subject: [PATCH] Run workers on handler threads --- src/jobs.rs | 21 ++++++++------------- src/main.rs | 13 ++++--------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index 0bad1ac..e2b6aef 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -21,9 +21,9 @@ use crate::{ }; use background_jobs::{ memory_storage::{ActixTimer, Storage}, - Job, Manager, QueueHandle, WorkerConfig, + Job, QueueHandle, WorkerConfig, }; -use std::{convert::TryFrom, num::NonZeroUsize, time::Duration}; +use std::time::Duration; fn debug_object(activity: &serde_json::Value) -> &serde_json::Value { let mut object = &activity["object"]["type"]; @@ -44,11 +44,8 @@ pub(crate) fn create_workers( actors: ActorCache, media: MediaCache, config: Config, -) -> (Manager, JobServer) { - let parallelism = std::thread::available_parallelism() - .unwrap_or_else(|_| NonZeroUsize::try_from(1).expect("nonzero")); - - let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| { +) -> JobServer { + let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| { JobState::new( state.clone(), actors.clone(), @@ -72,14 +69,12 @@ pub(crate) fn create_workers( .set_worker_count("maintenance", 2) .set_worker_count("apub", 2) .set_worker_count("deliver", 8) - .start_with_threads(parallelism); + .start(); - shared.every(Duration::from_secs(60 * 5), Listeners); - shared.every(Duration::from_secs(60 * 10), RecordLastOnline); + queue_handle.every(Duration::from_secs(60 * 5), Listeners); + queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline); - let job_server = JobServer::new(shared.queue_handle().clone()); - - (shared, job_server) + JobServer::new(queue_handle) } #[derive(Clone, Debug)] diff --git a/src/main.rs b/src/main.rs index 64945ae..6ce2f22 100644 --- a/src/main.rs +++ b/src/main.rs @@ -246,10 +246,6 @@ async fn do_server_main( tracing::warn!("Creating state"); let state = State::build(db.clone()).await?; - tracing::warn!("Creating workers"); - let (manager, job_server) = - create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); - if let Some((token, admin_handle)) = config.telegram_info() { tracing::warn!("Creating telegram handler"); telegram::start(admin_handle.to_owned(), db.clone(), token); @@ -261,13 +257,16 @@ async fn do_server_main( let server = HttpServer::new(move || { let requests = state.requests(&config); + let job_server = + create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); + let app = App::new() .app_data(web::Data::new(db.clone())) .app_data(web::Data::new(state.clone())) .app_data(web::Data::new(requests.clone())) .app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(config.clone())) - .app_data(web::Data::new(job_server.clone())) + .app_data(web::Data::new(job_server)) .app_data(web::Data::new(media.clone())) .app_data(web::Data::new(collector.clone())); @@ -336,10 +335,6 @@ async fn do_server_main( tracing::warn!("Server closed"); - drop(manager); - - tracing::warn!("Main complete"); - Ok(()) }