Remove actix-rt and replace with tokio tasks

This commit is contained in:
cetra3 2023-06-14 14:52:16 +09:30
parent 6ac6e2d90e
commit 86bce7cf65
18 changed files with 206 additions and 105 deletions

View file

@ -23,7 +23,6 @@ openssl = "0.10.54"
once_cell = "1.18.0"
http = "0.2.9"
sha2 = "0.10.6"
background-jobs = "0.13.0"
thiserror = "1.0.40"
derive_builder = "0.12.0"
itertools = "0.10.5"
@ -37,6 +36,7 @@ futures-core = { version = "0.3.28", default-features = false }
pin-project-lite = "0.2.9"
activitystreams-kinds = "0.3.0"
regex = { version = "1.8.4", default-features = false, features = ["std"] }
tokio = { version = "1.21.2", features = ["sync", "rt", "time"]}
# Actix-web
actix-web = { version = "4.3.1", default-features = false, optional = true }
@ -58,7 +58,7 @@ env_logger = "0.10.0"
tower-http = { version = "0.4.0", features = ["map-request-body", "util"] }
axum = { version = "0.6.18", features = ["http1", "tokio", "query"], default-features = false }
axum-macros = "0.3.7"
actix-rt = "2.8.0"
tokio = { version = "1.21.2", features = ["full"]}
[profile.dev]
strip = "symbols"

View file

@ -5,12 +5,13 @@ Next we need to do some configuration. Most importantly we need to specify the d
```
# use activitypub_federation::config::FederationConfig;
# let db_connection = ();
# let _ = actix_rt::System::new();
# tokio::runtime::Runtime::new().unwrap().block_on(async {
let config = FederationConfig::builder()
.domain("example.com")
.app_data(db_connection)
.build()?;
.build().await?;
# Ok::<(), anyhow::Error>(())
# }).unwrap()
```
`debug` is necessary to test federation with http and localhost URLs, but it should never be used in production. The `worker_count` value can be adjusted depending on the instance size. A lower value saves resources on a small instance, while a higher value is necessary on larger instances to keep up with send jobs. `url_verifier` can be used to implement a domain blacklist.

View file

@ -22,12 +22,12 @@ The next step is to allow other servers to fetch our actors and objects. For thi
# use http::HeaderMap;
# async fn generate_user_html(_: String, _: Data<DbConnection>) -> axum::response::Response { todo!() }
#[actix_rt::main]
#[tokio::main]
async fn main() -> Result<(), Error> {
let data = FederationConfig::builder()
.domain("example.com")
.app_data(DbConnection)
.build()?;
.build().await?;
let app = axum::Router::new()
.route("/user/:name", get(http_get_user))

View file

@ -7,18 +7,17 @@ After setting up our structs, implementing traits and initializing configuration
# use activitypub_federation::traits::tests::DbUser;
# use activitypub_federation::config::FederationConfig;
# let db_connection = activitypub_federation::traits::tests::DbConnection;
# let _ = actix_rt::System::new();
# actix_rt::Runtime::new().unwrap().block_on(async {
# tokio::runtime::Runtime::new().unwrap().block_on(async {
let config = FederationConfig::builder()
.domain("example.com")
.app_data(db_connection)
.build()?;
.build().await?;
let user_id = ObjectId::<DbUser>::parse("https://mastodon.social/@LemmyDev")?;
let data = config.to_request_data();
let user = user_id.dereference(&data).await;
assert!(user.is_ok());
# Ok::<(), anyhow::Error>(())
}).unwrap()
# }).unwrap()
```
`dereference` retrieves the object JSON at the given URL, and uses serde to convert it to `Person`. It then calls your method `Object::from_json` which inserts it in the database and returns a `DbUser` struct. `request_data` contains the federation config as well as a counter of outgoing HTTP requests. If this counter exceeds the configured maximum, further requests are aborted in order to avoid recursive fetching which could allow for a denial of service attack.
@ -32,9 +31,8 @@ We can similarly dereference a user over webfinger with the following method. It
# use activitypub_federation::fetch::webfinger::webfinger_resolve_actor;
# use activitypub_federation::traits::tests::DbUser;
# let db_connection = DbConnection;
# let _ = actix_rt::System::new();
# actix_rt::Runtime::new().unwrap().block_on(async {
# let config = FederationConfig::builder().domain("example.com").app_data(db_connection).build()?;
# tokio::runtime::Runtime::new().unwrap().block_on(async {
# let config = FederationConfig::builder().domain("example.com").app_data(db_connection).build().await?;
# let data = config.to_request_data();
let user: DbUser = webfinger_resolve_actor("nutomic@lemmy.ml", &data).await?;
# Ok::<(), anyhow::Error>(())

View file

@ -9,13 +9,12 @@ To send an activity we need to initialize our previously defined struct, and pic
# use activitypub_federation::traits::Actor;
# use activitypub_federation::fetch::object_id::ObjectId;
# use activitypub_federation::traits::tests::{DB_USER, DbConnection, Follow};
# let _ = actix_rt::System::new();
# actix_rt::Runtime::new().unwrap().block_on(async {
# tokio::runtime::Runtime::new().unwrap().block_on(async {
# let db_connection = DbConnection;
# let config = FederationConfig::builder()
# .domain("example.com")
# .app_data(db_connection)
# .build()?;
# .build().await?;
# let data = config.to_request_data();
# let sender = DB_USER.clone();
# let recipient = DB_USER.clone();

View file

@ -61,9 +61,9 @@ impl Object for SearchableDbObjects {
}
}
#[actix_rt::main]
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
# let config = FederationConfig::builder().domain("example.com").app_data(DbConnection).build().unwrap();
# let config = FederationConfig::builder().domain("example.com").app_data(DbConnection).build().await.unwrap();
# let data = config.to_request_data();
let query = "https://example.com/id/413";
let query_result = ObjectId::<SearchableDbObjects>::parse(query)?

View file

@ -28,7 +28,7 @@ const DOMAIN: &str = "example.com";
const LOCAL_USER_NAME: &str = "alison";
const BIND_ADDRESS: &str = "localhost:8003";
#[actix_rt::main]
#[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::builder()
.filter_level(LevelFilter::Warn)
@ -47,7 +47,8 @@ async fn main() -> Result<(), Error> {
let config = FederationConfig::builder()
.domain(DOMAIN)
.app_data(database)
.build()?;
.build()
.await?;
info!("Listen with HTTP server on {BIND_ADDRESS}");
let config = config.clone();

View file

@ -30,7 +30,7 @@ pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
})
.bind(hostname)?
.run();
actix_rt::spawn(server);
tokio::spawn(server);
Ok(())
}

View file

@ -41,7 +41,7 @@ pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
.expect("Failed to lookup domain name");
let server = axum::Server::bind(&addr).serve(app.into_make_service());
actix_rt::spawn(server);
tokio::spawn(server);
Ok(())
}

View file

@ -11,7 +11,7 @@ use std::{
};
use url::Url;
pub fn new_instance(
pub async fn new_instance(
hostname: &str,
name: String,
) -> Result<FederationConfig<DatabaseHandle>, Error> {
@ -29,7 +29,8 @@ pub fn new_instance(
.signed_fetch_actor(&system_user)
.app_data(database)
.debug(true)
.build()?;
.build()
.await?;
Ok(config)
}

View file

@ -17,7 +17,7 @@ mod instance;
mod objects;
mod utils;
#[actix_rt::main]
#[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::builder()
.filter_level(LevelFilter::Warn)
@ -32,8 +32,8 @@ async fn main() -> Result<(), Error> {
.map(|arg| Webserver::from_str(&arg).unwrap())
.unwrap_or(Webserver::Axum);
let alpha = new_instance("localhost:8001", "alpha".to_string())?;
let beta = new_instance("localhost:8002", "beta".to_string())?;
let alpha = new_instance("localhost:8001", "alpha".to_string()).await?;
let beta = new_instance("localhost:8002", "beta".to_string()).await?;
listen(&alpha, &webserver)?;
listen(&beta, &webserver)?;
info!("Local instances started");

View file

@ -11,25 +11,22 @@ use crate::{
FEDERATION_CONTENT_TYPE,
};
use anyhow::anyhow;
use background_jobs::{
memory_storage::{ActixTimer, Storage},
ActixJob,
Backoff,
Manager,
MaxRetries,
WorkerConfig,
};
use futures_core::Future;
use http::{header::HeaderName, HeaderMap, HeaderValue};
use httpdate::fmt_http_date;
use itertools::Itertools;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use std::{
fmt::Debug,
future::Future,
pin::Pin,
fmt::{Debug, Display},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, SystemTime},
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tracing::{debug, info, warn};
use url::Url;
@ -85,7 +82,7 @@ where
http_signature_compat: config.http_signature_compat,
};
if config.debug {
let res = do_send(message, &config.client, config.request_timeout).await;
let res = do_send(&message, &config.client, config.request_timeout).await;
// Don't fail on error, as we intentionally do some invalid actions in tests, to verify that
// they are rejected on the receiving side. These errors shouldn't bubble up to make the API
// call fail. This matches the behaviour in production.
@ -94,15 +91,16 @@ where
}
} else {
activity_queue.queue(message).await?;
let stats = activity_queue.get_stats().await?;
let stats = activity_queue.get_stats();
let running = stats.running.load(Ordering::Relaxed);
let stats_fmt = format!(
"Activity queue stats: pending: {}, running: {}, dead (this hour): {}, complete (this hour): {}",
stats.pending,
stats.running,
stats.dead.this_hour(),
stats.complete.this_hour()
"Activity queue stats: pending: {}, running: {}, dead: {}, complete: {}",
stats.pending.load(Ordering::Relaxed),
running,
stats.dead.load(Ordering::Relaxed),
stats.complete.load(Ordering::Relaxed),
);
if stats.running as u64 == config.worker_count {
if running == config.worker_count {
warn!("Reached max number of send activity workers ({}). Consider increasing worker count to avoid federation delays", config.worker_count);
warn!(stats_fmt);
} else {
@ -124,25 +122,8 @@ struct SendActivityTask {
http_signature_compat: bool,
}
impl ActixJob for SendActivityTask {
type State = QueueState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "SendActivityTask";
const MAX_RETRIES: MaxRetries = MaxRetries::Count(3);
/// This gives the following retry intervals:
/// - 60s (one minute, for service restart)
/// - 60min (one hour, for instance maintenance)
/// - 60h (2.5 days, for major incident with rebuild from backup)
const BACKOFF: Backoff = Backoff::Exponential(60);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { do_send(self, &state.client, state.timeout).await })
}
}
async fn do_send(
task: SendActivityTask,
task: &SendActivityTask,
client: &ClientWithMiddleware,
timeout: Duration,
) -> Result<(), anyhow::Error> {
@ -153,9 +134,9 @@ async fn do_send(
.headers(generate_request_headers(&task.inbox));
let request = sign_request(
request_builder,
task.actor_id,
task.activity,
task.private_key,
&task.actor_id,
task.activity.clone(),
task.private_key.clone(),
task.http_signature_compat,
)
.await?;
@ -220,27 +201,145 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
headers
}
/// A simple activity queue which spawns tokio workers to send out requests
/// When creating a queue, it will spawn a task per worker thread
/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
pub(crate) struct ActivityQueue {
// Our "background" tasks
senders: Vec<UnboundedSender<SendActivityTask>>,
// Round robin of the sender list
last_sender_idx: AtomicUsize,
// Stats shared between the queue and workers
stats: Arc<Stats>,
}
/// Simple stat counter to show where we're up to with sending messages
/// This is a lock-free way to share things between tasks
/// When reading these values it's possible (but extremely unlikely) to get stale data if a worker task is in the middle of transitioning
#[derive(Default)]
struct Stats {
pending: AtomicUsize,
running: AtomicUsize,
dead: AtomicUsize,
complete: AtomicUsize,
}
/// We need to retry activity sending in case the target instances is temporarily unreachable.
/// In this case, the task is stored and resent when the instance is hopefully back up. This
/// list shows the retry intervals, and which events of the target instance can be covered:
/// - 60s (one minute, service restart)
/// - 60min (one hour, instance maintenance)
/// - 60h (2.5 days, major incident with rebuild from backup)
/// TODO: make the intervals configurable
const MAX_RETRIES: usize = 3;
const BACKOFF: usize = 60;
/// A tokio spawned worker which is responsible for submitting requests to federated servers
async fn worker(
client: ClientWithMiddleware,
timeout: Duration,
mut receiver: UnboundedReceiver<SendActivityTask>,
stats: Arc<Stats>,
) {
while let Some(message) = receiver.recv().await {
// Update our counters as we're now "running" and not "pending"
stats.pending.fetch_sub(1, Ordering::Relaxed);
stats.running.fetch_add(1, Ordering::Relaxed);
// This will use the retry helper method below, with an exponential backoff
// If the task is sleeping, tokio will use work-stealing to keep it busy with something else
let outcome = retry(|| do_send(&message, &client, timeout), MAX_RETRIES, BACKOFF).await;
// "Running" has finished, check the outcome
stats.running.fetch_sub(1, Ordering::Relaxed);
match outcome {
Ok(_) => {
stats.complete.fetch_add(1, Ordering::Relaxed);
}
// We might want to do something here
Err(_err) => {
stats.dead.fetch_add(1, Ordering::Relaxed);
}
}
}
}
impl ActivityQueue {
fn new(client: ClientWithMiddleware, timeout: Duration, worker_count: usize) -> Self {
// Keep a vec of senders to send our messages to
let mut senders = Vec::with_capacity(worker_count);
let stats: Arc<Stats> = Default::default();
// Spawn our workers
for _ in 0..worker_count {
let (sender, receiver) = unbounded_channel();
tokio::spawn(worker(client.clone(), timeout, receiver, stats.clone()));
senders.push(sender);
}
Self {
senders,
last_sender_idx: AtomicUsize::new(0),
stats,
}
}
async fn queue(&self, message: SendActivityTask) -> Result<(), anyhow::Error> {
// really basic round-robin to our workers, we just do mod on the len of senders
let idx_to_send = self.last_sender_idx.fetch_add(1, Ordering::Relaxed) % self.senders.len();
// Set a queue to pending
self.stats.pending.fetch_add(1, Ordering::Relaxed);
// Send to one of our workers
self.senders[idx_to_send].send(message)?;
Ok(())
}
fn get_stats(&self) -> &Stats {
&self.stats
}
}
/// Creates an activity queue using tokio spawned tasks
/// Note: requires a tokio runtime
pub(crate) fn create_activity_queue(
client: ClientWithMiddleware,
worker_count: u64,
worker_count: usize,
request_timeout: Duration,
debug: bool,
) -> Manager {
) -> ActivityQueue {
// queue is not used in debug mod, so dont create any workers to avoid log spam
let worker_count = if debug { 0 } else { worker_count };
// Configure and start our workers
WorkerConfig::new_managed(Storage::new(ActixTimer), move |_| QueueState {
client: client.clone(),
timeout: request_timeout,
})
.register::<SendActivityTask>()
.set_worker_count("default", worker_count)
.start()
ActivityQueue::new(client, request_timeout, worker_count)
}
#[derive(Clone)]
struct QueueState {
client: ClientWithMiddleware,
timeout: Duration,
/// Retries a future action factory function up to `amount` times with an exponential backoff timer between tries
async fn retry<T, E: Display, F: Future<Output = Result<T, E>>, A: FnMut() -> F>(
mut action: A,
amount: usize,
sleep_seconds: usize,
) -> Result<T, E> {
let mut count = 0;
loop {
match action().await {
Ok(val) => return Ok(val),
Err(err) => {
if count < amount {
count += 1;
warn!("{err}");
let sleep_amt = sleep_seconds.pow(count as u32) as u64;
tokio::time::sleep(Duration::from_secs(sleep_amt)).await;
continue;
} else {
return Err(err);
}
}
}
}
}

View file

@ -65,7 +65,7 @@ mod test {
use reqwest_middleware::ClientWithMiddleware;
use url::Url;
#[actix_rt::test]
#[tokio::test]
async fn test_receive_activity() {
let (body, incoming_request, config) = setup_receive_test().await;
receive_activity::<Follow, DbUser, DbConnection>(
@ -77,7 +77,7 @@ mod test {
.unwrap();
}
#[actix_rt::test]
#[tokio::test]
async fn test_receive_activity_invalid_body_signature() {
let (_, incoming_request, config) = setup_receive_test().await;
let err = receive_activity::<Follow, DbUser, DbConnection>(
@ -93,7 +93,7 @@ mod test {
assert_eq!(e, &Error::ActivityBodyDigestInvalid)
}
#[actix_rt::test]
#[tokio::test]
async fn test_receive_activity_invalid_path() {
let (body, incoming_request, config) = setup_receive_test().await;
let incoming_request = incoming_request.uri("/wrong");
@ -125,7 +125,7 @@ mod test {
let body = serde_json::to_string(&activity).unwrap();
let outgoing_request = sign_request(
request_builder,
activity.actor.into_inner(),
&activity.actor.into_inner(),
body.to_string(),
DB_USER_KEYPAIR.private_key.clone(),
false,
@ -142,6 +142,7 @@ mod test {
.app_data(DbConnection)
.debug(true)
.build()
.await
.unwrap();
(body, incoming_request, config)
}

View file

@ -4,24 +4,24 @@
//!
//! ```
//! # use activitypub_federation::config::FederationConfig;
//! # let _ = actix_rt::System::new();
//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
//! let settings = FederationConfig::builder()
//! .domain("example.com")
//! .app_data(())
//! .http_fetch_limit(50)
//! .worker_count(16)
//! .build()?;
//! .build().await?;
//! # Ok::<(), anyhow::Error>(())
//! # }).unwrap()
//! ```
use crate::{
activity_queue::create_activity_queue,
activity_queue::{create_activity_queue, ActivityQueue},
error::Error,
protocol::verification::verify_domains_match,
traits::{ActivityHandler, Actor},
};
use async_trait::async_trait;
use background_jobs::Manager;
use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone};
use reqwest_middleware::ClientWithMiddleware;
@ -56,7 +56,7 @@ pub struct FederationConfig<T: Clone> {
pub(crate) client: ClientWithMiddleware,
/// Number of worker threads for sending outgoing activities
#[builder(default = "64")]
pub(crate) worker_count: u64,
pub(crate) worker_count: usize,
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends
/// outgoing activities synchronously, not in background thread. This helps to make tests
/// more consistent. Do not use for production.
@ -83,7 +83,7 @@ pub struct FederationConfig<T: Clone> {
/// Queue for sending outgoing activities. Only optional to make builder work, its always
/// present once constructed.
#[builder(setter(skip))]
pub(crate) activity_queue: Option<Arc<Manager>>,
pub(crate) activity_queue: Option<Arc<ActivityQueue>>,
}
impl<T: Clone> FederationConfig<T> {
@ -188,7 +188,8 @@ impl<T: Clone> FederationConfigBuilder<T> {
///
/// Values which are not explicitly specified use the defaults. Also initializes the
/// queue for outgoing activities, which is stored internally in the config struct.
pub fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> {
/// Requires a tokio runtime for the background queue.
pub async fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> {
let mut config = self.partial_build()?;
let queue = create_activity_queue(
config.client.clone(),

View file

@ -56,7 +56,7 @@ pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
let res = if let Some((actor_id, private_key_pem)) = config.signed_fetch_actor.as_deref() {
let req = sign_request(
req,
actor_id.clone(),
actor_id,
String::new(),
private_key_pem.clone(),
data.config.http_signature_compat,

View file

@ -36,13 +36,12 @@ where
/// # use activitypub_federation::config::FederationConfig;
/// # use activitypub_federation::error::Error::NotFound;
/// # use activitypub_federation::traits::tests::{DbConnection, DbUser};
/// # let _ = actix_rt::System::new();
/// # actix_rt::Runtime::new().unwrap().block_on(async {
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # let db_connection = DbConnection;
/// let config = FederationConfig::builder()
/// .domain("example.com")
/// .app_data(db_connection)
/// .build()?;
/// .build().await?;
/// let request_data = config.to_request_data();
/// let object_id = ObjectId::<DbUser>::parse("https://lemmy.ml/u/nutomic")?;
/// // Attempt to fetch object from local database or fall back to remote server

View file

@ -199,12 +199,13 @@ mod tests {
traits::tests::{DbConnection, DbUser},
};
#[actix_rt::test]
#[tokio::test]
async fn test_webfinger() {
let config = FederationConfig::builder()
.domain("example.com")
.app_data(DbConnection)
.build()
.await
.unwrap();
let data = config.to_request_data();
let res =

View file

@ -62,7 +62,7 @@ pub fn generate_actor_keypair() -> Result<Keypair, std::io::Error> {
/// `activity` as request body. The request is signed with `private_key` and then sent.
pub(crate) async fn sign_request(
request_builder: RequestBuilder,
actor_id: Url,
actor_id: &Url,
activity: String,
private_key: String,
http_signature_compat: bool,
@ -70,7 +70,7 @@ pub(crate) async fn sign_request(
static CONFIG: Lazy<Config> = Lazy::new(Config::new);
static CONFIG_COMPAT: Lazy<Config> = Lazy::new(|| Config::new().mastodon_compat());
let key_id = main_key_id(&actor_id);
let key_id = main_key_id(actor_id);
let sig_conf = match http_signature_compat {
false => CONFIG.clone(),
true => CONFIG_COMPAT.clone(),
@ -80,7 +80,7 @@ pub(crate) async fn sign_request(
sig_conf.clone(),
key_id,
Sha256::new(),
activity,
activity.clone(),
move |signing_string| {
let private_key = PKey::private_key_from_pem(private_key.as_bytes())?;
let mut signer = Signer::new(MessageDigest::sha256(), &private_key)?;
@ -259,7 +259,7 @@ pub mod test {
static INBOX_URL: Lazy<Url> =
Lazy::new(|| Url::parse("https://example.com/u/alice/inbox").unwrap());
#[actix_rt::test]
#[tokio::test]
async fn test_sign() {
let mut headers = generate_request_headers(&INBOX_URL);
// use hardcoded date in order to test against hardcoded signature
@ -273,8 +273,8 @@ pub mod test {
.headers(headers);
let request = sign_request(
request_builder,
ACTOR_ID.clone(),
"my activity".to_string(),
&ACTOR_ID,
"my activity".into(),
test_keypair().private_key,
// set this to prevent created/expires headers to be generated and inserted
// automatically from current time
@ -301,7 +301,7 @@ pub mod test {
assert_eq!(signature, expected_signature);
}
#[actix_rt::test]
#[tokio::test]
async fn test_verify() {
let headers = generate_request_headers(&INBOX_URL);
let request_builder = ClientWithMiddleware::from(Client::new())
@ -309,7 +309,7 @@ pub mod test {
.headers(headers);
let request = sign_request(
request_builder,
ACTOR_ID.clone(),
&ACTOR_ID,
"my activity".to_string(),
test_keypair().private_key,
false,