Use tokio rather than actix-rt

This commit is contained in:
asonix 2024-01-14 15:56:07 -05:00
parent 417553e643
commit d862bf8106
8 changed files with 56 additions and 61 deletions

44
Cargo.lock generated
View file

@ -388,7 +388,6 @@ version = "0.3.106-beta.2"
dependencies = [ dependencies = [
"activitystreams", "activitystreams",
"activitystreams-ext", "activitystreams-ext",
"actix-rt",
"actix-web", "actix-web",
"actix-webfinger", "actix-webfinger",
"ammonia", "ammonia",
@ -567,35 +566,17 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs" name = "background-jobs"
version = "0.17.0" version = "0.17.0"
source = "git+https://git.asonix.dog/asonix/background-jobs#2727645ca9d44ceefcc7e954694323eb55fd38ef" source = "git+https://git.asonix.dog/asonix/background-jobs#5a95a71b7c450dc0e303316f43572d0916eeef93"
dependencies = [ dependencies = [
"background-jobs-actix",
"background-jobs-core", "background-jobs-core",
"background-jobs-metrics", "background-jobs-metrics",
] "background-jobs-tokio",
[[package]]
name = "background-jobs-actix"
version = "0.17.0"
source = "git+https://git.asonix.dog/asonix/background-jobs#2727645ca9d44ceefcc7e954694323eb55fd38ef"
dependencies = [
"actix-rt",
"anyhow",
"async-trait",
"background-jobs-core",
"metrics",
"serde",
"serde_json",
"thiserror",
"tokio",
"tracing",
"uuid",
] ]
[[package]] [[package]]
name = "background-jobs-core" name = "background-jobs-core"
version = "0.17.0" version = "0.17.0"
source = "git+https://git.asonix.dog/asonix/background-jobs#2727645ca9d44ceefcc7e954694323eb55fd38ef" source = "git+https://git.asonix.dog/asonix/background-jobs#5a95a71b7c450dc0e303316f43572d0916eeef93"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@ -612,7 +593,7 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-metrics" name = "background-jobs-metrics"
version = "0.17.0" version = "0.17.0"
source = "git+https://git.asonix.dog/asonix/background-jobs#2727645ca9d44ceefcc7e954694323eb55fd38ef" source = "git+https://git.asonix.dog/asonix/background-jobs#5a95a71b7c450dc0e303316f43572d0916eeef93"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"background-jobs-core", "background-jobs-core",
@ -622,6 +603,22 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "background-jobs-tokio"
version = "0.17.0"
source = "git+https://git.asonix.dog/asonix/background-jobs#5a95a71b7c450dc0e303316f43572d0916eeef93"
dependencies = [
"anyhow",
"async-trait",
"background-jobs-core",
"metrics",
"serde",
"serde_json",
"tokio",
"tracing",
"uuid",
]
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.69" version = "0.3.69"
@ -3790,6 +3787,7 @@ dependencies = [
"bytes", "bytes",
"libc", "libc",
"mio", "mio",
"num_cpus",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",

View file

@ -22,7 +22,6 @@ default = []
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
actix-rt = "2.7.0"
actix-web = { version = "4.4.0", default-features = false, features = ["compress-brotli", "compress-gzip", "rustls-0_21"] } actix-web = { version = "4.4.0", default-features = false, features = ["compress-brotli", "compress-gzip", "rustls-0_21"] }
actix-webfinger = { version = "0.5.0", default-features = false } actix-webfinger = { version = "0.5.0", default-features = false }
activitystreams = "0.7.0-alpha.25" activitystreams = "0.7.0-alpha.25"
@ -79,7 +78,7 @@ tracing-subscriber = { version = "0.3", features = [
"env-filter", "env-filter",
"fmt", "fmt",
] } ] }
tokio = { version = "1", features = ["macros", "sync"] } tokio = { version = "1", features = ["full", "tracing"] }
uuid = { version = "1", features = ["v4", "serde"] } uuid = { version = "1", features = ["v4", "serde"] }
streem = "0.2.0" streem = "0.2.0"
@ -87,7 +86,7 @@ streem = "0.2.0"
version = "0.17.0" version = "0.17.0"
git = "https://git.asonix.dog/asonix/background-jobs" git = "https://git.asonix.dog/asonix/background-jobs"
default-features = false default-features = false
features = ["background-jobs-actix", "background-jobs-metrics", "error-logging"] features = ["error-logging", "metrics", "tokio"]
[dependencies.http-signature-normalization-actix] [dependencies.http-signature-normalization-actix]
version = "0.11.0" version = "0.11.0"

View file

@ -750,6 +750,11 @@ mod tests {
{ {
let db = let db =
Db::build_inner(true, sled::Config::new().temporary(true).open().unwrap()).unwrap(); Db::build_inner(true, sled::Config::new().temporary(true).open().unwrap()).unwrap();
actix_rt::System::new().block_on((f)(db));
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on((f)(db));
} }
} }

View file

@ -1,5 +1,4 @@
use activitystreams::checked::CheckError; use activitystreams::checked::CheckError;
use actix_rt::task::JoinError;
use actix_web::{ use actix_web::{
error::{BlockingError, ResponseError}, error::{BlockingError, ResponseError},
http::StatusCode, http::StatusCode,
@ -7,6 +6,7 @@ use actix_web::{
}; };
use http_signature_normalization_reqwest::SignError; use http_signature_normalization_reqwest::SignError;
use std::{convert::Infallible, fmt::Debug, io}; use std::{convert::Infallible, fmt::Debug, io};
use tokio::task::JoinError;
use tracing_error::SpanTrace; use tracing_error::SpanTrace;
pub(crate) struct Error { pub(crate) struct Error {

View file

@ -19,9 +19,10 @@ use crate::{
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
}; };
use background_jobs::{ use background_jobs::{
memory_storage::{ActixTimer, Storage}, memory_storage::{Storage, TokioTimer},
metrics::MetricsStorage, metrics::MetricsStorage,
Job, QueueHandle, WorkerConfig, tokio::{QueueHandle, WorkerConfig},
Job,
}; };
use std::time::Duration; use std::time::Duration;
@ -44,11 +45,15 @@ pub(crate) fn create_workers(
actors: ActorCache, actors: ActorCache,
media: MediaCache, media: MediaCache,
config: Config, config: Config,
) -> JobServer { ) -> std::io::Result<JobServer> {
let parallelism = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1) as u64;
let deliver_concurrency = config.deliver_concurrency(); let deliver_concurrency = config.deliver_concurrency();
let queue_handle = WorkerConfig::new( let queue_handle = WorkerConfig::new(
MetricsStorage::wrap(Storage::new(ActixTimer)), MetricsStorage::wrap(Storage::new(TokioTimer)),
move |queue_handle| { move |queue_handle| {
JobState::new( JobState::new(
state.clone(), state.clone(),
@ -71,15 +76,15 @@ pub(crate) fn create_workers(
.register::<apub::Forward>() .register::<apub::Forward>()
.register::<apub::Reject>() .register::<apub::Reject>()
.register::<apub::Undo>() .register::<apub::Undo>()
.set_worker_count("maintenance", 2) .set_worker_count("maintenance", 2 * parallelism)
.set_worker_count("apub", 2) .set_worker_count("apub", 2 * parallelism)
.set_worker_count("deliver", deliver_concurrency) .set_worker_count("deliver", deliver_concurrency * parallelism)
.start(); .start()?;
queue_handle.every(Duration::from_secs(60 * 5), Listeners); queue_handle.every(Duration::from_secs(60 * 5), Listeners)?;
queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline); queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline)?;
JobServer::new(queue_handle) Ok(JobServer::new(queue_handle))
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View file

@ -4,7 +4,6 @@
use std::time::Duration; use std::time::Duration;
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_rt::task::JoinHandle;
use actix_web::{middleware::Compress, web, App, HttpServer}; use actix_web::{middleware::Compress, web, App, HttpServer};
use collector::MemoryCollector; use collector::MemoryCollector;
#[cfg(feature = "console")] #[cfg(feature = "console")]
@ -18,6 +17,7 @@ use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::Resource; use opentelemetry_sdk::Resource;
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use rustls::ServerConfig; use rustls::ServerConfig;
use tokio::task::JoinHandle;
use tracing_actix_web::TracingLogger; use tracing_actix_web::TracingLogger;
use tracing_error::ErrorLayer; use tracing_error::ErrorLayer;
use tracing_log::LogTracer; use tracing_log::LogTracer;
@ -141,7 +141,7 @@ fn build_client(
Ok(client_with_middleware) Ok(client_with_middleware)
} }
#[actix_rt::main] #[tokio::main]
async fn main() -> Result<(), anyhow::Error> { async fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
@ -162,7 +162,7 @@ async fn main() -> Result<(), anyhow::Error> {
.with_http_listener(bind_addr) .with_http_listener(bind_addr)
.build()?; .build()?;
actix_rt::spawn(exporter); tokio::spawn(exporter);
let recorder = FanoutBuilder::default() let recorder = FanoutBuilder::default()
.add_recorder(recorder) .add_recorder(recorder)
.add_recorder(collector.clone()) .add_recorder(collector.clone())
@ -179,7 +179,7 @@ async fn main() -> Result<(), anyhow::Error> {
let actors = ActorCache::new(db.clone()); let actors = ActorCache::new(db.clone());
let media = MediaCache::new(db.clone()); let media = MediaCache::new(db.clone());
server_main(db, actors, media, collector, config).await??; server_main(db, actors, media, collector, config).await?;
tracing::warn!("Application exit"); tracing::warn!("Application exit");
@ -187,7 +187,7 @@ async fn main() -> Result<(), anyhow::Error> {
} }
fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Error>> { fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_client_main(config, args)) tokio::spawn(do_client_main(config, args))
} }
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
@ -273,19 +273,9 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
Ok(()) Ok(())
} }
fn server_main(
db: Db,
actors: ActorCache,
media: MediaCache,
collector: MemoryCollector,
config: Config,
) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config))
}
const VERIFY_RATIO: usize = 7; const VERIFY_RATIO: usize = 7;
async fn do_server_main( async fn server_main(
db: Db, db: Db,
actors: ActorCache, actors: ActorCache,
media: MediaCache, media: MediaCache,
@ -327,10 +317,8 @@ async fn do_server_main(
let bind_address = config.bind_address(); let bind_address = config.bind_address();
let sign_spawner2 = sign_spawner.clone(); let sign_spawner2 = sign_spawner.clone();
let verify_spawner2 = verify_spawner.clone(); let verify_spawner2 = verify_spawner.clone();
let job_server = create_workers(state.clone(), actors.clone(), media.clone(), config.clone())?;
let server = HttpServer::new(move || { let server = HttpServer::new(move || {
let job_server =
create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
let app = App::new() let app = App::new()
.app_data(web::Data::new(db.clone())) .app_data(web::Data::new(db.clone()))
.app_data(web::Data::new(state.clone())) .app_data(web::Data::new(state.clone()))
@ -339,7 +327,7 @@ async fn do_server_main(
)) ))
.app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(actors.clone()))
.app_data(web::Data::new(config.clone())) .app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(job_server)) .app_data(web::Data::new(job_server.clone()))
.app_data(web::Data::new(media.clone())) .app_data(web::Data::new(media.clone()))
.app_data(web::Data::new(collector.clone())) .app_data(web::Data::new(collector.clone()))
.app_data(web::Data::new(verify_spawner.clone())); .app_data(web::Data::new(verify_spawner.clone()));

View file

@ -36,7 +36,7 @@ where
metrics::counter!("relay.spawner.wait-timer.start").increment(1); metrics::counter!("relay.spawner.wait-timer.start").increment(1);
let mut interval = actix_rt::time::interval(Duration::from_secs(5)); let mut interval = tokio::time::interval(Duration::from_secs(5));
// pass the first tick (instant) // pass the first tick (instant)
interval.tick().await; interval.tick().await;

View file

@ -46,7 +46,7 @@ pub(crate) fn start(admin_handle: String, db: Db, token: &str) {
let bot = Bot::new(token); let bot = Bot::new(token);
let admin_handle = Arc::new(admin_handle); let admin_handle = Arc::new(admin_handle);
actix_rt::spawn(async move { tokio::spawn(async move {
let command_handler = teloxide::filter_command::<Command, _>().endpoint( let command_handler = teloxide::filter_command::<Command, _>().endpoint(
move |bot: Bot, msg: Message, cmd: Command| { move |bot: Bot, msg: Message, cmd: Command| {
let admin_handle = admin_handle.clone(); let admin_handle = admin_handle.clone();