fedimovies/src/activitypub/deliverer.rs

127 lines
3.5 KiB
Rust
Raw Normal View History

2021-10-30 22:35:18 +00:00
use rsa::RsaPrivateKey;
2021-04-09 00:22:17 +00:00
use crate::config::{Environment, Config};
use crate::http_signatures::create::{create_http_signature, SignatureError};
use crate::models::users::types::User;
use crate::utils::crypto::deserialize_private_key;
use super::activity::Activity;
use super::actor::Actor;
use super::constants::ACTIVITY_CONTENT_TYPE;
use super::views::get_actor_url;
#[derive(thiserror::Error, Debug)]
pub enum DelivererError {
#[error("key error")]
KeyDeserializationError(#[from] rsa::pkcs8::Error),
#[error(transparent)]
SignatureError(#[from] SignatureError),
#[error("activity serialization error")]
SerializationError(#[from] serde_json::Error),
#[error(transparent)]
RequestError(#[from] reqwest::Error),
#[error("http error {0:?}")]
HttpError(reqwest::StatusCode),
}
async fn send_activity(
config: &Config,
2021-10-30 22:35:18 +00:00
actor_key: &RsaPrivateKey,
actor_key_id: &str,
activity_json: &str,
2021-04-09 00:22:17 +00:00
inbox_url: &str,
) -> Result<(), DelivererError> {
log::info!("sending activity: {}", activity_json);
let headers = create_http_signature(
2021-10-30 22:35:18 +00:00
inbox_url,
activity_json,
2021-04-09 00:22:17 +00:00
actor_key,
actor_key_id,
)?;
match config.environment {
Environment::Development => {
log::info!(
"development mode: not sending activity to {}",
inbox_url,
);
},
Environment::Production => {
let client = reqwest::Client::new();
// Default timeout is 30s
let response = client.post(inbox_url)
.header("Host", headers.host)
.header("Date", headers.date)
.header("Digest", headers.digest)
.header("Signature", headers.signature)
.header("Content-Type", ACTIVITY_CONTENT_TYPE)
2021-10-30 22:35:18 +00:00
.body(activity_json.to_owned())
2021-04-09 00:22:17 +00:00
.send()
.await?;
let response_status = response.status();
let response_text = response.text().await?;
log::info!(
"remote server response: {}",
response_text,
);
if response_status.is_client_error() || response_status.is_server_error() {
return Err(DelivererError::HttpError(response_status));
}
},
};
Ok(())
}
2021-10-30 22:35:18 +00:00
async fn deliver_activity_worker(
config: Config,
sender: User,
2021-04-09 00:22:17 +00:00
activity: Activity,
recipients: Vec<Actor>,
2021-10-30 22:35:18 +00:00
) -> Result<(), DelivererError> {
let actor_key = deserialize_private_key(&sender.private_key)?;
let actor_key_id = format!(
"{}#main-key",
get_actor_url(
&config.instance_url(),
&sender.profile.username,
),
);
let activity_json = serde_json::to_string(&activity)?;
for recipient in recipients {
2021-04-09 00:22:17 +00:00
// TODO: retry on error
2021-10-30 22:35:18 +00:00
if let Err(err) = send_activity(
&config,
&actor_key,
&actor_key_id,
&activity_json,
&recipient.inbox,
).await {
2021-04-09 00:22:17 +00:00
log::error!("{}", err);
}
};
2021-10-30 22:35:18 +00:00
Ok(())
}
pub fn deliver_activity(
config: &Config,
sender: &User,
activity: Activity,
recipients: Vec<Actor>,
) -> () {
let config = config.clone();
let sender = sender.clone();
actix_rt::spawn(async move {
deliver_activity_worker(
config,
sender,
activity,
recipients,
).await.unwrap_or_else(|err| {
log::error!("{}", err);
});
});
2021-04-09 00:22:17 +00:00
}