activitypub-federation-rust/src/activity_sending.rs
Nutomic 7def01a19a
Avoid running ci checks twice (#105)
* Avoid running ci checks twice

* upgrade rust

* move clippy config to cargo.toml
2024-04-09 11:28:57 +02:00

349 lines
11 KiB
Rust

//! Queue for signing and sending outgoing activities with retry
//!
#![doc = include_str!("../docs/09_sending_activities.md")]
use crate::{
config::Data,
error::Error,
http_signatures::sign_request,
reqwest_shim::ResponseExt,
traits::{ActivityHandler, Actor},
FEDERATION_CONTENT_TYPE,
};
use bytes::Bytes;
use futures::StreamExt;
use http::StatusCode;
use httpdate::fmt_http_date;
use itertools::Itertools;
use openssl::pkey::{PKey, Private};
use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue},
Response,
};
use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize;
use std::{
fmt::{Debug, Display},
time::{Duration, SystemTime},
};
use tracing::debug;
use url::Url;
#[derive(Clone, Debug)]
/// All info needed to sign and send one activity to one inbox. You should generally use
/// [[crate::activity_queue::queue_activity]] unless you want implement your own queue.
pub struct SendActivityTask {
pub(crate) actor_id: Url,
pub(crate) activity_id: Url,
pub(crate) activity: Bytes,
pub(crate) inbox: Url,
pub(crate) private_key: PKey<Private>,
pub(crate) http_signature_compat: bool,
}
impl Display for SendActivityTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} to {}", self.activity_id, self.inbox)
}
}
impl SendActivityTask {
/// Prepare an activity for sending
///
/// - `activity`: The activity to be sent, gets converted to json
/// - `inboxes`: List of remote actor inboxes that should receive the activity. Ignores local actor
/// inboxes. Should be built by calling [crate::traits::Actor::shared_inbox_or_inbox]
/// for each target actor.
pub async fn prepare<Activity, Datatype, ActorType>(
activity: &Activity,
actor: &ActorType,
inboxes: Vec<Url>,
data: &Data<Datatype>,
) -> Result<Vec<SendActivityTask>, Error>
where
Activity: ActivityHandler + Serialize + Debug,
Datatype: Clone,
ActorType: Actor,
{
build_tasks(activity, actor, inboxes, data).await
}
/// convert a sendactivitydata to a request, signing and sending it
pub async fn sign_and_send<Datatype: Clone>(&self, data: &Data<Datatype>) -> Result<(), Error> {
self.sign_and_send_internal(&data.config.client, data.config.request_timeout)
.await
}
pub(crate) async fn sign_and_send_internal(
&self,
client: &ClientWithMiddleware,
timeout: Duration,
) -> Result<(), Error> {
debug!("Sending {} to {}", self.activity_id, self.inbox,);
let request_builder = client
.post(self.inbox.to_string())
.timeout(timeout)
.headers(generate_request_headers(&self.inbox));
let request = sign_request(
request_builder,
&self.actor_id,
self.activity.clone(),
self.private_key.clone(),
self.http_signature_compat,
)
.await?;
let response = client.execute(request).await?;
self.handle_response(response).await
}
/// Based on the HTTP status code determines if an activity was delivered successfully. In that case
/// Ok is returned. Otherwise it returns Err and the activity send should be retried later.
///
/// Equivalent code in mastodon: https://github.com/mastodon/mastodon/blob/v4.2.8/app/helpers/jsonld_helper.rb#L215-L217
async fn handle_response(&self, response: Response) -> Result<(), Error> {
match response.status() {
status if status.is_success() => {
debug!("Activity {self} delivered successfully");
Ok(())
}
status
if status.is_client_error()
&& status != StatusCode::REQUEST_TIMEOUT
&& status != StatusCode::TOO_MANY_REQUESTS =>
{
let text = response.text_limited().await?;
debug!("Activity {self} was rejected, aborting: {text}");
Ok(())
}
status => {
let text = response.text_limited().await?;
Err(Error::Other(format!(
"Activity {self} failure with status {status}: {text}",
)))
}
}
}
}
pub(crate) async fn build_tasks<'a, Activity, Datatype, ActorType>(
activity: &'a Activity,
actor: &ActorType,
inboxes: Vec<Url>,
data: &Data<Datatype>,
) -> Result<Vec<SendActivityTask>, Error>
where
Activity: ActivityHandler + Serialize + Debug,
Datatype: Clone,
ActorType: Actor,
{
let config = &data.config;
let actor_id = activity.actor();
let activity_id = activity.id();
let activity_serialized: Bytes = serde_json::to_vec(activity)
.map_err(|e| Error::SerializeOutgoingActivity(e, format!("{:?}", activity)))?
.into();
let private_key = get_pkey_cached(data, actor).await?;
Ok(futures::stream::iter(
inboxes
.into_iter()
.unique()
.filter(|i| !config.is_local_url(i)),
)
.filter_map(|inbox| async {
if let Err(err) = config.verify_url_valid(&inbox).await {
debug!("inbox url invalid, skipping: {inbox}: {err}");
return None;
};
Some(SendActivityTask {
actor_id: actor_id.clone(),
activity_id: activity_id.clone(),
inbox,
activity: activity_serialized.clone(),
private_key: private_key.clone(),
http_signature_compat: config.http_signature_compat,
})
})
.collect()
.await)
}
pub(crate) async fn get_pkey_cached<ActorType>(
data: &Data<impl Clone>,
actor: &ActorType,
) -> Result<PKey<Private>, Error>
where
ActorType: Actor,
{
let actor_id = actor.id();
// PKey is internally like an Arc<>, so cloning is ok
data.config
.actor_pkey_cache
.try_get_with_by_ref(&actor_id, async {
let private_key_pem = actor.private_key_pem().ok_or_else(|| {
Error::Other(format!(
"Actor {actor_id} does not contain a private key for signing"
))
})?;
// This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening
let pkey = tokio::task::spawn_blocking(move || {
PKey::private_key_from_pem(private_key_pem.as_bytes()).map_err(|err| {
Error::Other(format!("Could not create private key from PEM data:{err}"))
})
})
.await
.map_err(|err| Error::Other(format!("Error joining: {err}")))??;
std::result::Result::<PKey<Private>, Error>::Ok(pkey)
})
.await
.map_err(|e| Error::Other(format!("cloned error: {e}")))
}
pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
let mut host = inbox_url.domain().expect("read inbox domain").to_string();
if let Some(port) = inbox_url.port() {
host = format!("{}:{}", host, port);
}
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static(FEDERATION_CONTENT_TYPE),
);
headers.insert(
HeaderName::from_static("host"),
HeaderValue::from_str(&host).expect("Hostname is valid"),
);
headers.insert(
"date",
HeaderValue::from_str(&fmt_http_date(SystemTime::now())).expect("Date is valid"),
);
headers
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::{config::FederationConfig, http_signatures::generate_actor_keypair};
use std::{
sync::{atomic::AtomicUsize, Arc},
time::Instant,
};
use tracing::info;
// This will periodically send back internal errors to test the retry
async fn dodgy_handler(headers: HeaderMap, body: Bytes) -> Result<(), StatusCode> {
debug!("Headers:{:?}", headers);
debug!("Body len:{}", body.len());
Ok(())
}
async fn test_server() {
use axum::{routing::post, Router};
// We should break every now and then ;)
let state = Arc::new(AtomicUsize::new(0));
let app = Router::new()
.route("/", post(dodgy_handler))
.with_state(state);
axum::Server::bind(&"0.0.0.0:8001".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
// Sends 100 messages
async fn test_activity_sending() -> anyhow::Result<()> {
let num_messages: usize = 100;
tokio::spawn(test_server());
/*
// uncomment for debug logs & stats
use tracing::log::LevelFilter;
env_logger::builder()
.filter_level(LevelFilter::Warn)
.filter_module("activitypub_federation", LevelFilter::Info)
.format_timestamp(None)
.init();
*/
let keypair = generate_actor_keypair().unwrap();
let message = SendActivityTask {
actor_id: "http://localhost:8001".parse().unwrap(),
activity_id: "http://localhost:8001/activity".parse().unwrap(),
activity: "{}".into(),
inbox: "http://localhost:8001".parse().unwrap(),
private_key: keypair.private_key().unwrap(),
http_signature_compat: true,
};
let data = FederationConfig::builder()
.app_data(())
.domain("localhost")
.build()
.await?
.to_request_data();
let start = Instant::now();
for _ in 0..num_messages {
message.clone().sign_and_send(&data).await?;
}
info!("Queue Sent: {:?}", start.elapsed());
Ok(())
}
#[tokio::test]
async fn test_handle_response() {
let keypair = generate_actor_keypair().unwrap();
let message = SendActivityTask {
actor_id: "http://localhost:8001".parse().unwrap(),
activity_id: "http://localhost:8001/activity".parse().unwrap(),
activity: "{}".into(),
inbox: "http://localhost:8001".parse().unwrap(),
private_key: keypair.private_key().unwrap(),
http_signature_compat: true,
};
let res = |status| {
http::Response::builder()
.status(status)
.body(vec![])
.unwrap()
.into()
};
assert!(message.handle_response(res(StatusCode::OK)).await.is_ok());
assert!(message
.handle_response(res(StatusCode::BAD_REQUEST))
.await
.is_ok());
assert!(message
.handle_response(res(StatusCode::MOVED_PERMANENTLY))
.await
.is_err());
assert!(message
.handle_response(res(StatusCode::REQUEST_TIMEOUT))
.await
.is_err());
assert!(message
.handle_response(res(StatusCode::TOO_MANY_REQUESTS))
.await
.is_err());
assert!(message
.handle_response(res(StatusCode::INTERNAL_SERVER_ERROR))
.await
.is_err());
}
}