2023-01-01 19:38:14 +00:00
|
|
|
use std::collections::BTreeMap;
|
2022-07-24 21:19:17 +00:00
|
|
|
use std::time::Duration;
|
|
|
|
|
2021-11-17 21:35:17 +00:00
|
|
|
use actix_web::http::Method;
|
2022-10-19 18:39:47 +00:00
|
|
|
use reqwest::{Client, Proxy};
|
2021-10-30 22:35:18 +00:00
|
|
|
use rsa::RsaPrivateKey;
|
2022-12-11 18:41:08 +00:00
|
|
|
use serde::{Deserialize, Serialize};
|
2022-12-04 22:31:02 +00:00
|
|
|
use serde_json::Value;
|
2022-07-24 21:19:17 +00:00
|
|
|
use tokio::time::sleep;
|
2021-10-30 22:35:18 +00:00
|
|
|
|
2022-07-14 10:18:03 +00:00
|
|
|
use crate::config::Instance;
|
2023-01-17 23:14:18 +00:00
|
|
|
use crate::database::{
|
|
|
|
get_database_client,
|
|
|
|
DatabaseClient,
|
|
|
|
DatabaseError,
|
|
|
|
DbPool,
|
|
|
|
};
|
2022-10-23 16:04:02 +00:00
|
|
|
use crate::http_signatures::create::{
|
|
|
|
create_http_signature,
|
|
|
|
HttpSignatureError,
|
|
|
|
};
|
2022-10-20 23:18:45 +00:00
|
|
|
use crate::json_signatures::create::{
|
2022-11-01 00:02:25 +00:00
|
|
|
is_object_signed,
|
2022-10-20 23:18:45 +00:00
|
|
|
sign_object,
|
|
|
|
JsonSignatureError,
|
|
|
|
};
|
2022-12-03 21:06:15 +00:00
|
|
|
use crate::models::{
|
|
|
|
profiles::queries::set_reachability_status,
|
|
|
|
users::types::User,
|
|
|
|
};
|
2022-11-13 18:43:57 +00:00
|
|
|
use crate::utils::crypto_rsa::deserialize_private_key;
|
2022-07-23 21:37:21 +00:00
|
|
|
use super::actors::types::Actor;
|
2023-02-07 18:28:19 +00:00
|
|
|
use super::constants::AP_MEDIA_TYPE;
|
|
|
|
use super::identifiers::{local_actor_id, local_actor_key_id};
|
2022-12-11 18:41:08 +00:00
|
|
|
use super::queues::OutgoingActivityJobData;
|
2021-04-09 00:22:17 +00:00
|
|
|
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
|
|
pub enum DelivererError {
|
|
|
|
#[error("key error")]
|
|
|
|
KeyDeserializationError(#[from] rsa::pkcs8::Error),
|
|
|
|
|
|
|
|
#[error(transparent)]
|
2022-10-23 16:04:02 +00:00
|
|
|
HttpSignatureError(#[from] HttpSignatureError),
|
2021-04-09 00:22:17 +00:00
|
|
|
|
2022-10-20 23:18:45 +00:00
|
|
|
#[error(transparent)]
|
|
|
|
JsonSignatureError(#[from] JsonSignatureError),
|
|
|
|
|
2021-04-09 00:22:17 +00:00
|
|
|
#[error("activity serialization error")]
|
|
|
|
SerializationError(#[from] serde_json::Error),
|
|
|
|
|
|
|
|
#[error(transparent)]
|
|
|
|
RequestError(#[from] reqwest::Error),
|
|
|
|
|
|
|
|
#[error("http error {0:?}")]
|
|
|
|
HttpError(reqwest::StatusCode),
|
2022-12-03 21:06:15 +00:00
|
|
|
|
|
|
|
#[error(transparent)]
|
|
|
|
DatabaseError(#[from] DatabaseError),
|
2021-04-09 00:22:17 +00:00
|
|
|
}
|
|
|
|
|
2022-10-19 18:39:47 +00:00
|
|
|
fn build_client(instance: &Instance) -> reqwest::Result<Client> {
|
|
|
|
let mut client_builder = Client::builder();
|
|
|
|
if let Some(ref proxy_url) = instance.proxy_url {
|
|
|
|
let proxy = Proxy::all(proxy_url)?;
|
|
|
|
client_builder = client_builder.proxy(proxy);
|
|
|
|
};
|
|
|
|
client_builder.build()
|
|
|
|
}
|
|
|
|
|
2021-04-09 00:22:17 +00:00
|
|
|
async fn send_activity(
|
2021-11-18 14:56:52 +00:00
|
|
|
instance: &Instance,
|
2021-10-30 22:35:18 +00:00
|
|
|
actor_key: &RsaPrivateKey,
|
|
|
|
actor_key_id: &str,
|
|
|
|
activity_json: &str,
|
2021-04-09 00:22:17 +00:00
|
|
|
inbox_url: &str,
|
|
|
|
) -> Result<(), DelivererError> {
|
|
|
|
let headers = create_http_signature(
|
2021-11-17 21:35:17 +00:00
|
|
|
Method::POST,
|
2021-10-30 22:35:18 +00:00
|
|
|
inbox_url,
|
|
|
|
activity_json,
|
2021-04-09 00:22:17 +00:00
|
|
|
actor_key,
|
|
|
|
actor_key_id,
|
|
|
|
)?;
|
|
|
|
|
2022-10-19 18:39:47 +00:00
|
|
|
let client = build_client(instance)?;
|
2021-11-17 21:35:17 +00:00
|
|
|
let request = client.post(inbox_url)
|
|
|
|
.header("Host", headers.host)
|
|
|
|
.header("Date", headers.date)
|
|
|
|
.header("Digest", headers.digest.unwrap())
|
|
|
|
.header("Signature", headers.signature)
|
2022-10-01 16:56:57 +00:00
|
|
|
.header(reqwest::header::CONTENT_TYPE, AP_MEDIA_TYPE)
|
2022-02-08 19:51:40 +00:00
|
|
|
.header(reqwest::header::USER_AGENT, instance.agent())
|
2021-11-17 21:35:17 +00:00
|
|
|
.body(activity_json.to_owned());
|
|
|
|
|
2021-11-18 14:56:52 +00:00
|
|
|
if instance.is_private {
|
2021-12-31 15:29:44 +00:00
|
|
|
log::info!(
|
2021-11-18 14:56:52 +00:00
|
|
|
"private mode: not sending activity to {}",
|
|
|
|
inbox_url,
|
|
|
|
);
|
|
|
|
} else {
|
|
|
|
let response = request.send().await?;
|
|
|
|
let response_status = response.status();
|
2022-09-04 11:57:31 +00:00
|
|
|
let response_text: String = response.text().await?
|
2023-01-24 21:36:36 +00:00
|
|
|
.chars().filter(|chr| *chr != '\n' && *chr != '\r').take(75)
|
|
|
|
.collect();
|
2021-11-18 14:56:52 +00:00
|
|
|
log::info!(
|
2023-01-24 21:36:36 +00:00
|
|
|
"response from {}: [{}] {}",
|
2021-12-24 00:44:01 +00:00
|
|
|
inbox_url,
|
2023-01-24 21:36:36 +00:00
|
|
|
response_status.as_str(),
|
2021-11-18 14:56:52 +00:00
|
|
|
response_text,
|
|
|
|
);
|
|
|
|
if response_status.is_client_error() || response_status.is_server_error() {
|
|
|
|
return Err(DelivererError::HttpError(response_status));
|
|
|
|
};
|
2021-04-09 00:22:17 +00:00
|
|
|
};
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-08-03 14:57:46 +00:00
|
|
|
// 30 secs, 5 mins, 50 mins, 8 hours
|
|
|
|
fn backoff(retry_count: u32) -> Duration {
|
|
|
|
debug_assert!(retry_count > 0);
|
|
|
|
Duration::from_secs(3 * 10_u64.pow(retry_count))
|
|
|
|
}
|
|
|
|
|
2022-12-11 18:41:08 +00:00
|
|
|
#[derive(Deserialize, Serialize)]
|
|
|
|
pub struct Recipient {
|
2022-12-31 17:14:36 +00:00
|
|
|
id: String,
|
|
|
|
inbox: String,
|
|
|
|
}
|
|
|
|
|
2021-10-30 22:35:18 +00:00
|
|
|
async fn deliver_activity_worker(
|
2022-12-03 21:06:15 +00:00
|
|
|
maybe_db_pool: Option<DbPool>,
|
2021-11-18 14:56:52 +00:00
|
|
|
instance: Instance,
|
2021-10-30 22:35:18 +00:00
|
|
|
sender: User,
|
2022-12-04 22:31:02 +00:00
|
|
|
activity: Value,
|
2022-12-31 17:14:36 +00:00
|
|
|
recipients: Vec<Recipient>,
|
2021-10-30 22:35:18 +00:00
|
|
|
) -> Result<(), DelivererError> {
|
|
|
|
let actor_key = deserialize_private_key(&sender.private_key)?;
|
2023-02-07 18:28:19 +00:00
|
|
|
let actor_id = local_actor_id(
|
|
|
|
&instance.url(),
|
|
|
|
&sender.profile.username,
|
2021-10-30 22:35:18 +00:00
|
|
|
);
|
2023-02-07 18:28:19 +00:00
|
|
|
let actor_key_id = local_actor_key_id(&actor_id);
|
2022-12-04 22:31:02 +00:00
|
|
|
let activity_signed = if is_object_signed(&activity) {
|
2022-11-01 00:02:25 +00:00
|
|
|
log::warn!("activity is already signed");
|
2022-12-04 22:31:02 +00:00
|
|
|
activity
|
2022-11-01 00:02:25 +00:00
|
|
|
} else {
|
2022-12-04 22:31:02 +00:00
|
|
|
sign_object(&activity, &actor_key, &actor_key_id)?
|
2022-11-01 00:02:25 +00:00
|
|
|
};
|
2022-10-20 23:18:45 +00:00
|
|
|
let activity_json = serde_json::to_string(&activity_signed)?;
|
2022-12-31 17:14:36 +00:00
|
|
|
|
|
|
|
if recipients.is_empty() {
|
2022-06-01 00:53:35 +00:00
|
|
|
return Ok(());
|
|
|
|
};
|
2022-12-31 17:14:36 +00:00
|
|
|
let mut queue: Vec<_> = recipients.into_iter()
|
2022-12-13 20:34:51 +00:00
|
|
|
// is_delivered: false
|
2022-12-31 17:14:36 +00:00
|
|
|
.map(|recipient| (recipient, false))
|
|
|
|
.collect();
|
|
|
|
log::info!(
|
|
|
|
"sending activity to {} inboxes: {}",
|
|
|
|
queue.len(),
|
|
|
|
activity_json,
|
|
|
|
);
|
|
|
|
|
2022-07-24 21:19:17 +00:00
|
|
|
let mut retry_count = 0;
|
|
|
|
let max_retries = 2;
|
2022-10-17 23:29:04 +00:00
|
|
|
|
2022-12-31 17:14:36 +00:00
|
|
|
while queue.iter().any(|(_, is_delivered)| !is_delivered) &&
|
2022-12-13 20:34:51 +00:00
|
|
|
retry_count <= max_retries
|
|
|
|
{
|
2022-07-24 21:19:17 +00:00
|
|
|
if retry_count > 0 {
|
2022-08-03 14:57:46 +00:00
|
|
|
// Wait before next attempt
|
|
|
|
sleep(backoff(retry_count)).await;
|
2022-07-24 21:19:17 +00:00
|
|
|
};
|
2022-12-31 17:14:36 +00:00
|
|
|
for (recipient, is_delivered) in queue.iter_mut() {
|
2022-12-13 20:34:51 +00:00
|
|
|
if *is_delivered {
|
|
|
|
continue;
|
|
|
|
};
|
2022-07-24 21:19:17 +00:00
|
|
|
if let Err(error) = send_activity(
|
|
|
|
&instance,
|
|
|
|
&actor_key,
|
|
|
|
&actor_key_id,
|
|
|
|
&activity_json,
|
2022-12-31 17:14:36 +00:00
|
|
|
&recipient.inbox,
|
2022-07-24 21:19:17 +00:00
|
|
|
).await {
|
2023-02-02 20:49:50 +00:00
|
|
|
log::warn!(
|
2022-07-24 21:19:17 +00:00
|
|
|
"failed to deliver activity to {} (attempt #{}): {}",
|
2022-12-31 17:14:36 +00:00
|
|
|
recipient.inbox,
|
2022-07-24 21:19:17 +00:00
|
|
|
retry_count + 1,
|
|
|
|
error,
|
|
|
|
);
|
2022-10-17 23:29:04 +00:00
|
|
|
} else {
|
2022-12-13 20:34:51 +00:00
|
|
|
*is_delivered = true;
|
2022-07-24 21:19:17 +00:00
|
|
|
};
|
2021-12-21 00:14:12 +00:00
|
|
|
};
|
2022-07-24 21:19:17 +00:00
|
|
|
retry_count += 1;
|
2021-04-09 00:22:17 +00:00
|
|
|
};
|
2022-12-03 21:06:15 +00:00
|
|
|
|
|
|
|
if let Some(ref db_pool) = maybe_db_pool {
|
|
|
|
// Get connection from pool only after finishing delivery
|
|
|
|
let db_client = &**get_database_client(db_pool).await?;
|
|
|
|
for (recipient, is_delivered) in queue {
|
|
|
|
set_reachability_status(
|
|
|
|
db_client,
|
|
|
|
&recipient.id,
|
|
|
|
is_delivered,
|
|
|
|
).await?;
|
|
|
|
};
|
|
|
|
};
|
2021-10-30 22:35:18 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-12-04 22:31:02 +00:00
|
|
|
pub struct OutgoingActivity {
|
2023-01-04 19:26:21 +00:00
|
|
|
pub db_pool: Option<DbPool>, // needed to track unreachable actors (optional)
|
2022-12-11 18:41:08 +00:00
|
|
|
pub instance: Instance,
|
|
|
|
pub sender: User,
|
2022-12-04 22:31:02 +00:00
|
|
|
pub activity: Value,
|
2022-12-11 18:41:08 +00:00
|
|
|
pub recipients: Vec<Recipient>,
|
2022-06-01 00:53:35 +00:00
|
|
|
}
|
|
|
|
|
2022-12-04 22:31:02 +00:00
|
|
|
impl OutgoingActivity {
|
|
|
|
pub fn new(
|
|
|
|
instance: &Instance,
|
|
|
|
sender: &User,
|
|
|
|
activity: impl Serialize,
|
|
|
|
recipients: Vec<Actor>,
|
|
|
|
) -> Self {
|
2022-12-31 17:14:36 +00:00
|
|
|
// Sort and de-duplicate recipients
|
|
|
|
let mut recipient_map = BTreeMap::new();
|
|
|
|
for actor in recipients {
|
|
|
|
if !recipient_map.contains_key(&actor.id) {
|
|
|
|
let recipient = Recipient {
|
|
|
|
id: actor.id.clone(),
|
|
|
|
inbox: actor.inbox,
|
|
|
|
};
|
|
|
|
recipient_map.insert(actor.id, recipient);
|
|
|
|
};
|
|
|
|
};
|
2022-12-04 22:31:02 +00:00
|
|
|
Self {
|
2023-01-04 19:26:21 +00:00
|
|
|
db_pool: None,
|
2022-12-04 22:31:02 +00:00
|
|
|
instance: instance.clone(),
|
|
|
|
sender: sender.clone(),
|
|
|
|
activity: serde_json::to_value(activity)
|
|
|
|
.expect("activity should be serializable"),
|
2022-12-31 17:14:36 +00:00
|
|
|
recipients: recipient_map.into_values().collect(),
|
2022-12-04 22:31:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-04 19:26:21 +00:00
|
|
|
pub async fn deliver(
|
|
|
|
self,
|
|
|
|
) -> Result<(), DelivererError> {
|
2022-06-01 00:53:35 +00:00
|
|
|
deliver_activity_worker(
|
2023-01-04 19:26:21 +00:00
|
|
|
self.db_pool,
|
2022-06-01 00:53:35 +00:00
|
|
|
self.instance,
|
|
|
|
self.sender,
|
|
|
|
self.activity,
|
2022-12-31 17:14:36 +00:00
|
|
|
self.recipients,
|
2022-06-01 00:53:35 +00:00
|
|
|
).await
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn spawn_deliver(self) -> () {
|
2022-07-17 00:00:35 +00:00
|
|
|
tokio::spawn(async move {
|
2023-01-04 19:26:21 +00:00
|
|
|
self.deliver().await.unwrap_or_else(|err| {
|
2022-12-03 21:06:15 +00:00
|
|
|
log::error!("{}", err);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2022-12-11 18:41:08 +00:00
|
|
|
pub async fn enqueue(
|
|
|
|
self,
|
2023-01-17 23:14:18 +00:00
|
|
|
db_client: &impl DatabaseClient,
|
2022-12-11 18:41:08 +00:00
|
|
|
) -> Result<(), DatabaseError> {
|
|
|
|
let job_data = OutgoingActivityJobData {
|
|
|
|
activity: self.activity,
|
|
|
|
sender_id: self.sender.id,
|
|
|
|
recipients: self.recipients,
|
|
|
|
};
|
|
|
|
job_data.into_job(db_client).await
|
|
|
|
}
|
2022-06-01 00:53:35 +00:00
|
|
|
}
|
2022-08-03 14:57:46 +00:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_backoff() {
|
|
|
|
assert_eq!(backoff(1).as_secs(), 30);
|
|
|
|
assert_eq!(backoff(2).as_secs(), 300);
|
|
|
|
}
|
|
|
|
}
|