Move blocking setup out of actix systems

This commit is contained in:
asonix 2022-11-19 23:35:00 -06:00
parent a154fbb504
commit 3500f85f44

View file

@ -3,6 +3,7 @@
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::{web, App, HttpServer}; use actix_web::{web, App, HttpServer};
use collector::MemoryCollector;
#[cfg(feature = "console")] #[cfg(feature = "console")]
use console_subscriber::ConsoleLayer; use console_subscriber::ConsoleLayer;
use opentelemetry::{sdk::Resource, KeyValue}; use opentelemetry::{sdk::Resource, KeyValue};
@ -92,25 +93,39 @@ fn init_subscriber(
Ok(()) Ok(())
} }
#[actix_rt::main] fn main() -> Result<(), anyhow::Error> {
async fn main() -> Result<(), anyhow::Error> {
actix_rt::spawn(do_main()).await??;
tracing::warn!("Application exit");
Ok(())
}
async fn do_main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
let config = Config::build()?; let config = Config::build()?;
init_subscriber(Config::software_name(), config.opentelemetry_url())?; init_subscriber(Config::software_name(), config.opentelemetry_url())?;
let collector = collector::MemoryCollector::new(); let collector = MemoryCollector::new();
collector.install()?; collector.install()?;
let args = Args::new(); let args = Args::new();
if args.any() { if args.any() {
return client_main(config, args);
}
let db = Db::build(&config)?;
let actors = ActorCache::new(db.clone());
let media = MediaCache::new(db.clone());
server_main(db, actors, media, collector, config)?;
tracing::warn!("Application exit");
Ok(())
}
#[actix_rt::main]
async fn client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
actix_rt::spawn(do_client_main(config, args)).await?
}
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
let client = requests::build_client(&config.user_agent()); let client = requests::build_client(&config.user_agent());
if !args.blocks().is_empty() || !args.allowed().is_empty() { if !args.blocks().is_empty() || !args.allowed().is_empty() {
@ -154,22 +169,40 @@ async fn do_main() -> Result<(), anyhow::Error> {
} }
return Ok(()); return Ok(());
} }
let db = Db::build(&config)?; #[actix_rt::main]
async fn server_main(
db: Db,
actors: ActorCache,
media: MediaCache,
collector: MemoryCollector,
config: Config,
) -> Result<(), anyhow::Error> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config)).await?
}
let media = MediaCache::new(db.clone()); async fn do_server_main(
db: Db,
actors: ActorCache,
media: MediaCache,
collector: MemoryCollector,
config: Config,
) -> Result<(), anyhow::Error> {
tracing::warn!("Creating state");
let state = State::build(db.clone()).await?; let state = State::build(db.clone()).await?;
let actors = ActorCache::new(db.clone());
tracing::warn!("Creating workers");
let (manager, job_server) = let (manager, job_server) =
create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
if let Some((token, admin_handle)) = config.telegram_info() { if let Some((token, admin_handle)) = config.telegram_info() {
tracing::warn!("Creating telegram handler");
telegram::start(admin_handle.to_owned(), db.clone(), token); telegram::start(admin_handle.to_owned(), db.clone(), token);
} }
let bind_address = config.bind_address(); let bind_address = config.bind_address();
tracing::warn!("Binding to {}:{}", bind_address.0, bind_address.1);
HttpServer::new(move || { HttpServer::new(move || {
let app = App::new() let app = App::new()
.app_data(web::Data::new(db.clone())) .app_data(web::Data::new(db.clone()))