relay/src/requests.rs

444 lines
12 KiB
Rust
Raw Normal View History

2021-09-18 17:55:39 +00:00
use crate::error::{Error, ErrorKind};
2020-09-07 21:51:02 +00:00
use activitystreams::url::Url;
2021-03-10 02:09:56 +00:00
use actix_web::{http::header::Date, web::Bytes};
use async_mutex::Mutex;
use async_rwlock::RwLock;
2021-03-10 02:09:56 +00:00
use awc::Client;
use chrono::{DateTime, Utc};
2020-03-18 04:35:20 +00:00
use http_signature_normalization_actix::prelude::*;
2021-08-01 20:12:06 +00:00
use rsa::{hash::Hash, padding::PaddingScheme, RsaPrivateKey};
2020-03-18 04:35:20 +00:00
use sha2::{Digest, Sha256};
use std::{
cell::RefCell,
collections::HashMap,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::SystemTime,
};
2021-09-18 17:55:39 +00:00
use tracing::{debug, info, warn};
2021-09-21 16:21:06 +00:00
use tracing_awc::Propagate;
2020-03-18 04:35:20 +00:00
#[derive(Clone)]
2021-02-10 04:17:20 +00:00
pub(crate) struct Breakers {
inner: Arc<RwLock<HashMap<String, Arc<Mutex<Breaker>>>>>,
}
2021-09-18 17:55:39 +00:00
impl std::fmt::Debug for Breakers {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Breakers").finish()
}
}
impl Breakers {
async fn should_try(&self, url: &Url) -> bool {
if let Some(domain) = url.domain() {
if let Some(breaker) = self.inner.read().await.get(domain) {
breaker.lock().await.should_try()
} else {
true
}
} else {
false
}
}
async fn fail(&self, url: &Url) {
if let Some(domain) = url.domain() {
let should_write = {
let read = self.inner.read().await;
if let Some(breaker) = read.get(domain) {
let owned_breaker = Arc::clone(&breaker);
drop(breaker);
owned_breaker.lock().await.fail();
false
} else {
true
}
};
if should_write {
let mut hm = self.inner.write().await;
let breaker = hm
.entry(domain.to_owned())
.or_insert(Arc::new(Mutex::new(Breaker::default())));
breaker.lock().await.fail();
}
}
}
async fn succeed(&self, url: &Url) {
if let Some(domain) = url.domain() {
let should_write = {
let read = self.inner.read().await;
if let Some(breaker) = read.get(domain) {
let owned_breaker = Arc::clone(&breaker);
drop(breaker);
owned_breaker.lock().await.succeed();
false
} else {
true
}
};
if should_write {
let mut hm = self.inner.write().await;
let breaker = hm
.entry(domain.to_owned())
.or_insert(Arc::new(Mutex::new(Breaker::default())));
breaker.lock().await.succeed();
}
}
}
}
impl Default for Breakers {
fn default() -> Self {
Breakers {
inner: Arc::new(RwLock::new(HashMap::new())),
}
}
}
2021-09-18 17:55:39 +00:00
#[derive(Debug)]
struct Breaker {
failures: usize,
last_attempt: DateTime<Utc>,
last_success: DateTime<Utc>,
}
impl Breaker {
const fn failure_threshold() -> usize {
10
}
fn failure_wait() -> chrono::Duration {
chrono::Duration::days(1)
}
fn should_try(&self) -> bool {
self.failures < Self::failure_threshold()
|| self.last_attempt + Self::failure_wait() < Utc::now()
}
fn fail(&mut self) {
self.failures += 1;
self.last_attempt = Utc::now();
}
fn succeed(&mut self) {
self.failures = 0;
self.last_attempt = Utc::now();
self.last_success = Utc::now();
}
}
impl Default for Breaker {
fn default() -> Self {
let now = Utc::now();
Breaker {
failures: 0,
last_attempt: now,
last_success: now,
}
}
}
2020-03-18 04:35:20 +00:00
#[derive(Clone)]
2021-02-10 04:17:20 +00:00
pub(crate) struct Requests {
client: Rc<RefCell<Client>>,
consecutive_errors: Rc<AtomicUsize>,
error_limit: usize,
2020-03-18 04:35:20 +00:00
key_id: String,
user_agent: String,
2021-08-01 20:12:06 +00:00
private_key: RsaPrivateKey,
2020-03-18 04:35:20 +00:00
config: Config,
breakers: Breakers,
2020-03-18 04:35:20 +00:00
}
2021-09-18 17:55:39 +00:00
impl std::fmt::Debug for Requests {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Requests")
.field("error_limit", &self.error_limit)
.field("key_id", &self.key_id)
.field("user_agent", &self.user_agent)
.field("config", &self.config)
.field("breakers", &self.breakers)
.finish()
}
}
2020-03-18 04:35:20 +00:00
impl Requests {
2021-02-10 04:17:20 +00:00
pub(crate) fn new(
key_id: String,
2021-08-01 20:12:06 +00:00
private_key: RsaPrivateKey,
user_agent: String,
breakers: Breakers,
) -> Self {
2020-03-18 04:35:20 +00:00
Requests {
client: Rc::new(RefCell::new(
2020-09-14 00:46:13 +00:00
Client::builder()
.header("User-Agent", user_agent.clone())
.finish(),
)),
consecutive_errors: Rc::new(AtomicUsize::new(0)),
error_limit: 3,
2020-03-18 04:35:20 +00:00
key_id,
user_agent,
2020-03-18 04:35:20 +00:00
private_key,
2020-09-30 00:33:50 +00:00
config: Config::default().mastodon_compat(),
breakers,
2020-03-18 04:35:20 +00:00
}
}
fn count_err(&self) {
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
if count + 1 >= self.error_limit {
warn!("{} consecutive errors, rebuilding http client", count);
2020-09-14 00:46:13 +00:00
*self.client.borrow_mut() = Client::builder()
.header("User-Agent", self.user_agent.clone())
.finish();
self.reset_err();
}
}
fn reset_err(&self) {
self.consecutive_errors.swap(0, Ordering::Relaxed);
}
2021-09-21 18:26:31 +00:00
#[tracing::instrument(name = "Fetch Json")]
2021-09-18 17:55:39 +00:00
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
self.do_fetch(url, "application/json").await
}
2021-09-21 18:26:31 +00:00
#[tracing::instrument(name = "Fetch Activity+Json")]
2021-09-18 17:55:39 +00:00
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
self.do_fetch(url, "application/activity+json").await
}
2021-09-18 17:55:39 +00:00
async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, Error>
2020-03-18 04:35:20 +00:00
where
T: serde::de::DeserializeOwned,
{
let parsed_url = url.parse::<Url>()?;
if !self.breakers.should_try(&parsed_url).await {
2021-09-18 17:55:39 +00:00
return Err(ErrorKind::Breaker.into());
}
2020-03-30 06:06:13 +00:00
let signer = self.signer();
let client: Client = self.client.borrow().clone();
2020-09-07 21:51:02 +00:00
let res = client
.get(url)
2021-02-11 00:00:11 +00:00
.insert_header(("Accept", accept))
.insert_header(Date(SystemTime::now().into()))
2020-03-30 06:06:13 +00:00
.signature(
self.config.clone(),
self.key_id.clone(),
move |signing_string| signer.sign(signing_string),
)
.await?
2021-09-21 16:21:06 +00:00
.propagate()
2020-03-18 04:35:20 +00:00
.send()
.await;
if res.is_err() {
self.count_err();
self.breakers.fail(&parsed_url).await;
}
2021-09-18 17:55:39 +00:00
let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?;
self.reset_err();
2020-03-18 04:35:20 +00:00
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
debug!("Response from {}, {}", url, s);
}
2020-03-18 04:35:20 +00:00
}
}
2020-03-18 04:35:20 +00:00
self.breakers.fail(&parsed_url).await;
2021-09-18 17:55:39 +00:00
return Err(ErrorKind::Status(url.to_string(), res.status()).into());
}
self.breakers.succeed(&parsed_url).await;
let body = res
.body()
.await
2021-09-18 17:55:39 +00:00
.map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
Ok(serde_json::from_slice(body.as_ref())?)
}
2021-09-21 18:26:31 +00:00
#[tracing::instrument(name = "Fetch Bytes")]
2021-09-18 17:55:39 +00:00
pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> {
let parsed_url = url.parse::<Url>()?;
if !self.breakers.should_try(&parsed_url).await {
2021-09-18 17:55:39 +00:00
return Err(ErrorKind::Breaker.into());
}
info!("Fetching bytes for {}", url);
2020-03-30 06:06:13 +00:00
let signer = self.signer();
let client: Client = self.client.borrow().clone();
2020-09-07 21:51:02 +00:00
let res = client
.get(url)
2021-02-11 00:00:11 +00:00
.insert_header(("Accept", "*/*"))
.insert_header(Date(SystemTime::now().into()))
2020-03-30 06:06:13 +00:00
.signature(
self.config.clone(),
self.key_id.clone(),
move |signing_string| signer.sign(signing_string),
)
.await?
2021-09-21 16:21:06 +00:00
.propagate()
.send()
.await;
if res.is_err() {
self.breakers.fail(&parsed_url).await;
self.count_err();
}
2021-09-18 17:55:39 +00:00
let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?;
self.reset_err();
let content_type = if let Some(content_type) = res.headers().get("content-type") {
if let Ok(s) = content_type.to_str() {
s.to_owned()
} else {
2021-09-18 17:55:39 +00:00
return Err(ErrorKind::ContentType.into());
}
} else {
2021-09-18 17:55:39 +00:00
return Err(ErrorKind::ContentType.into());
};
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
debug!("Response from {}, {}", url, s);
}
}
}
self.breakers.fail(&parsed_url).await;
2021-09-18 17:55:39 +00:00
return Err(ErrorKind::Status(url.to_string(), res.status()).into());
}
self.breakers.succeed(&parsed_url).await;
let bytes = match res.body().limit(1024 * 1024 * 4).await {
Err(e) => {
2021-09-18 17:55:39 +00:00
return Err(ErrorKind::ReceiveResponse(url.to_string(), e.to_string()).into());
}
Ok(bytes) => bytes,
};
Ok((content_type, bytes))
}
2021-09-21 19:32:25 +00:00
#[tracing::instrument(
"Deliver to Inbox",
fields(self, inbox = inbox.to_string().as_str(), item)
)]
2021-09-18 17:55:39 +00:00
pub(crate) async fn deliver<T>(&self, inbox: Url, item: &T) -> Result<(), Error>
2020-03-18 04:35:20 +00:00
where
2021-09-21 18:26:31 +00:00
T: serde::ser::Serialize + std::fmt::Debug,
2020-03-18 04:35:20 +00:00
{
if !self.breakers.should_try(&inbox).await {
2021-09-18 17:55:39 +00:00
return Err(ErrorKind::Breaker.into());
}
2020-03-30 06:06:13 +00:00
let signer = self.signer();
2020-03-18 04:35:20 +00:00
let item_string = serde_json::to_string(item)?;
let client: Client = self.client.borrow().clone();
2021-09-21 16:21:06 +00:00
let (req, body) = client
2020-09-07 21:51:02 +00:00
.post(inbox.as_str())
2021-02-11 00:00:11 +00:00
.insert_header(("Accept", "application/activity+json"))
.insert_header(("Content-Type", "application/activity+json"))
.insert_header(Date(SystemTime::now().into()))
2020-03-18 04:35:20 +00:00
.signature_with_digest(
2020-03-30 06:06:13 +00:00
self.config.clone(),
self.key_id.clone(),
Sha256::new(),
2020-03-18 04:35:20 +00:00
item_string,
2020-03-30 06:06:13 +00:00
move |signing_string| signer.sign(signing_string),
)
.await?
2021-09-21 16:21:06 +00:00
.split();
let res = req.propagate().send_body(body).await;
if res.is_err() {
self.count_err();
self.breakers.fail(&inbox).await;
}
2021-09-18 17:55:39 +00:00
let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?;
self.reset_err();
2020-03-18 04:35:20 +00:00
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
debug!("Response from {}, {}", inbox.as_str(), s);
}
2020-03-18 04:35:20 +00:00
}
}
self.breakers.fail(&inbox).await;
2021-09-18 17:55:39 +00:00
return Err(ErrorKind::Status(inbox.to_string(), res.status()).into());
2020-03-18 04:35:20 +00:00
}
self.breakers.succeed(&inbox).await;
2020-03-18 04:35:20 +00:00
Ok(())
}
2020-03-30 06:06:13 +00:00
fn signer(&self) -> Signer {
Signer {
private_key: self.private_key.clone(),
}
}
}
struct Signer {
2021-08-01 20:12:06 +00:00
private_key: RsaPrivateKey,
2020-03-30 06:06:13 +00:00
}
impl Signer {
2021-09-18 17:55:39 +00:00
fn sign(&self, signing_string: &str) -> Result<String, Error> {
2020-03-18 04:35:20 +00:00
let hashed = Sha256::digest(signing_string.as_bytes());
2020-07-25 14:33:35 +00:00
let bytes = self.private_key.sign(
PaddingScheme::PKCS1v15Sign {
hash: Some(Hash::SHA2_256),
},
&hashed,
)?;
2020-03-18 04:35:20 +00:00
Ok(base64::encode(bytes))
}
}