2020-03-23 22:17:53 +00:00
|
|
|
use crate::error::MyError;
|
2020-09-07 21:51:02 +00:00
|
|
|
use activitystreams::url::Url;
|
2020-12-29 17:27:14 +00:00
|
|
|
use actix_web::{client::Client, http::header::Date, web::Bytes};
|
|
|
|
use async_mutex::Mutex;
|
|
|
|
use async_rwlock::RwLock;
|
2020-12-23 18:06:15 +00:00
|
|
|
use chrono::{DateTime, Utc};
|
2020-03-18 04:35:20 +00:00
|
|
|
use http_signature_normalization_actix::prelude::*;
|
2020-05-23 23:56:27 +00:00
|
|
|
use log::{debug, info, warn};
|
2020-07-25 14:33:35 +00:00
|
|
|
use rsa::{hash::Hash, padding::PaddingScheme, RSAPrivateKey};
|
2020-03-18 04:35:20 +00:00
|
|
|
use sha2::{Digest, Sha256};
|
2020-05-23 23:56:27 +00:00
|
|
|
use std::{
|
|
|
|
cell::RefCell,
|
2020-12-23 18:06:15 +00:00
|
|
|
collections::HashMap,
|
2020-05-23 23:56:27 +00:00
|
|
|
rc::Rc,
|
2020-12-23 18:06:15 +00:00
|
|
|
sync::{
|
|
|
|
atomic::{AtomicUsize, Ordering},
|
2020-12-29 17:27:14 +00:00
|
|
|
Arc,
|
2020-12-23 18:06:15 +00:00
|
|
|
},
|
2020-05-23 23:56:27 +00:00
|
|
|
time::SystemTime,
|
|
|
|
};
|
2020-03-18 04:35:20 +00:00
|
|
|
|
2020-12-23 18:06:15 +00:00
|
|
|
#[derive(Clone)]
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) struct Breakers {
|
2020-12-29 17:27:14 +00:00
|
|
|
inner: Arc<RwLock<HashMap<String, Arc<Mutex<Breaker>>>>>,
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Breakers {
|
2020-12-29 17:27:14 +00:00
|
|
|
async fn should_try(&self, url: &Url) -> bool {
|
2020-12-23 18:06:15 +00:00
|
|
|
if let Some(domain) = url.domain() {
|
2020-12-29 17:27:14 +00:00
|
|
|
if let Some(breaker) = self.inner.read().await.get(domain) {
|
|
|
|
breaker.lock().await.should_try()
|
|
|
|
} else {
|
|
|
|
true
|
|
|
|
}
|
2020-12-23 18:06:15 +00:00
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
async fn fail(&self, url: &Url) {
|
2020-12-23 18:06:15 +00:00
|
|
|
if let Some(domain) = url.domain() {
|
2021-02-10 05:45:13 +00:00
|
|
|
let should_write = {
|
|
|
|
let read = self.inner.read().await;
|
|
|
|
|
|
|
|
if let Some(breaker) = read.get(domain) {
|
|
|
|
let owned_breaker = Arc::clone(&breaker);
|
|
|
|
drop(breaker);
|
|
|
|
owned_breaker.lock().await.fail();
|
|
|
|
false
|
|
|
|
} else {
|
|
|
|
true
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if should_write {
|
2020-12-29 17:27:14 +00:00
|
|
|
let mut hm = self.inner.write().await;
|
|
|
|
let breaker = hm
|
|
|
|
.entry(domain.to_owned())
|
|
|
|
.or_insert(Arc::new(Mutex::new(Breaker::default())));
|
|
|
|
breaker.lock().await.fail();
|
|
|
|
}
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
async fn succeed(&self, url: &Url) {
|
2020-12-23 18:06:15 +00:00
|
|
|
if let Some(domain) = url.domain() {
|
2021-02-10 05:45:13 +00:00
|
|
|
let should_write = {
|
|
|
|
let read = self.inner.read().await;
|
|
|
|
|
|
|
|
if let Some(breaker) = read.get(domain) {
|
|
|
|
let owned_breaker = Arc::clone(&breaker);
|
|
|
|
drop(breaker);
|
|
|
|
owned_breaker.lock().await.succeed();
|
|
|
|
false
|
|
|
|
} else {
|
|
|
|
true
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if should_write {
|
2020-12-29 17:27:14 +00:00
|
|
|
let mut hm = self.inner.write().await;
|
|
|
|
let breaker = hm
|
|
|
|
.entry(domain.to_owned())
|
|
|
|
.or_insert(Arc::new(Mutex::new(Breaker::default())));
|
|
|
|
breaker.lock().await.succeed();
|
|
|
|
}
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Breakers {
|
|
|
|
fn default() -> Self {
|
|
|
|
Breakers {
|
2020-12-29 17:27:14 +00:00
|
|
|
inner: Arc::new(RwLock::new(HashMap::new())),
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
struct Breaker {
|
|
|
|
failures: usize,
|
|
|
|
last_attempt: DateTime<Utc>,
|
|
|
|
last_success: DateTime<Utc>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Breaker {
|
|
|
|
const fn failure_threshold() -> usize {
|
|
|
|
10
|
|
|
|
}
|
|
|
|
|
|
|
|
fn failure_wait() -> chrono::Duration {
|
|
|
|
chrono::Duration::days(1)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn should_try(&self) -> bool {
|
|
|
|
self.failures < Self::failure_threshold()
|
|
|
|
|| self.last_attempt + Self::failure_wait() < Utc::now()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn fail(&mut self) {
|
|
|
|
self.failures += 1;
|
|
|
|
self.last_attempt = Utc::now();
|
|
|
|
}
|
|
|
|
|
|
|
|
fn succeed(&mut self) {
|
|
|
|
self.failures = 0;
|
|
|
|
self.last_attempt = Utc::now();
|
|
|
|
self.last_success = Utc::now();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Breaker {
|
|
|
|
fn default() -> Self {
|
|
|
|
let now = Utc::now();
|
|
|
|
|
|
|
|
Breaker {
|
|
|
|
failures: 0,
|
|
|
|
last_attempt: now,
|
|
|
|
last_success: now,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-18 04:35:20 +00:00
|
|
|
#[derive(Clone)]
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) struct Requests {
|
2020-05-23 23:56:27 +00:00
|
|
|
client: Rc<RefCell<Client>>,
|
|
|
|
consecutive_errors: Rc<AtomicUsize>,
|
|
|
|
error_limit: usize,
|
2020-03-18 04:35:20 +00:00
|
|
|
key_id: String,
|
2020-05-23 23:56:27 +00:00
|
|
|
user_agent: String,
|
2020-03-18 04:35:20 +00:00
|
|
|
private_key: RSAPrivateKey,
|
|
|
|
config: Config,
|
2020-12-23 18:06:15 +00:00
|
|
|
breakers: Breakers,
|
2020-03-18 04:35:20 +00:00
|
|
|
}
|
2020-03-17 17:15:16 +00:00
|
|
|
|
2020-03-18 04:35:20 +00:00
|
|
|
impl Requests {
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) fn new(
|
2020-12-23 18:06:15 +00:00
|
|
|
key_id: String,
|
|
|
|
private_key: RSAPrivateKey,
|
|
|
|
user_agent: String,
|
|
|
|
breakers: Breakers,
|
|
|
|
) -> Self {
|
2020-03-18 04:35:20 +00:00
|
|
|
Requests {
|
2020-05-23 23:56:27 +00:00
|
|
|
client: Rc::new(RefCell::new(
|
2020-09-14 00:46:13 +00:00
|
|
|
Client::builder()
|
2020-05-23 23:56:27 +00:00
|
|
|
.header("User-Agent", user_agent.clone())
|
|
|
|
.finish(),
|
|
|
|
)),
|
|
|
|
consecutive_errors: Rc::new(AtomicUsize::new(0)),
|
|
|
|
error_limit: 3,
|
2020-03-18 04:35:20 +00:00
|
|
|
key_id,
|
2020-05-23 23:56:27 +00:00
|
|
|
user_agent,
|
2020-03-18 04:35:20 +00:00
|
|
|
private_key,
|
2020-09-30 00:33:50 +00:00
|
|
|
config: Config::default().mastodon_compat(),
|
2020-12-23 18:06:15 +00:00
|
|
|
breakers,
|
2020-03-18 04:35:20 +00:00
|
|
|
}
|
2020-03-17 17:15:16 +00:00
|
|
|
}
|
|
|
|
|
2020-05-23 23:56:27 +00:00
|
|
|
fn count_err(&self) {
|
|
|
|
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
|
|
|
|
if count + 1 >= self.error_limit {
|
|
|
|
warn!("{} consecutive errors, rebuilding http client", count);
|
2020-09-14 00:46:13 +00:00
|
|
|
*self.client.borrow_mut() = Client::builder()
|
2020-05-23 23:56:27 +00:00
|
|
|
.header("User-Agent", self.user_agent.clone())
|
|
|
|
.finish();
|
|
|
|
self.reset_err();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn reset_err(&self) {
|
|
|
|
self.consecutive_errors.swap(0, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, MyError>
|
2020-07-10 22:24:47 +00:00
|
|
|
where
|
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
|
{
|
|
|
|
self.do_fetch(url, "application/json").await
|
|
|
|
}
|
|
|
|
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, MyError>
|
2020-07-10 22:24:47 +00:00
|
|
|
where
|
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
|
{
|
|
|
|
self.do_fetch(url, "application/activity+json").await
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, MyError>
|
2020-03-18 04:35:20 +00:00
|
|
|
where
|
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
|
{
|
2020-12-23 18:06:15 +00:00
|
|
|
let parsed_url = url.parse::<Url>()?;
|
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
if !self.breakers.should_try(&parsed_url).await {
|
2020-12-23 18:06:15 +00:00
|
|
|
return Err(MyError::Breaker);
|
|
|
|
}
|
|
|
|
|
2020-03-30 06:06:13 +00:00
|
|
|
let signer = self.signer();
|
|
|
|
|
2020-05-23 23:56:27 +00:00
|
|
|
let client: Client = self.client.borrow().clone();
|
2020-09-07 21:51:02 +00:00
|
|
|
let res = client
|
|
|
|
.get(url)
|
2020-07-10 22:24:47 +00:00
|
|
|
.header("Accept", accept)
|
2020-04-23 18:36:06 +00:00
|
|
|
.set(Date(SystemTime::now().into()))
|
2020-03-30 06:06:13 +00:00
|
|
|
.signature(
|
|
|
|
self.config.clone(),
|
|
|
|
self.key_id.clone(),
|
|
|
|
move |signing_string| signer.sign(signing_string),
|
|
|
|
)
|
|
|
|
.await?
|
2020-03-18 04:35:20 +00:00
|
|
|
.send()
|
2020-05-23 23:56:27 +00:00
|
|
|
.await;
|
|
|
|
|
|
|
|
if res.is_err() {
|
|
|
|
self.count_err();
|
2020-12-29 17:27:14 +00:00
|
|
|
self.breakers.fail(&parsed_url).await;
|
2020-05-23 23:56:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
let mut res = res.map_err(|e| MyError::SendRequest(url.to_string(), e.to_string()))?;
|
|
|
|
|
|
|
|
self.reset_err();
|
2020-03-18 04:35:20 +00:00
|
|
|
|
|
|
|
if !res.status().is_success() {
|
|
|
|
if let Ok(bytes) = res.body().await {
|
|
|
|
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
2020-03-21 20:24:05 +00:00
|
|
|
if !s.is_empty() {
|
2020-04-21 21:39:02 +00:00
|
|
|
debug!("Response from {}, {}", url, s);
|
2020-03-21 20:24:05 +00:00
|
|
|
}
|
2020-03-18 04:35:20 +00:00
|
|
|
}
|
2020-03-17 21:53:31 +00:00
|
|
|
}
|
2020-03-18 04:35:20 +00:00
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
self.breakers.fail(&parsed_url).await;
|
2020-12-23 18:06:15 +00:00
|
|
|
|
2020-07-25 15:13:00 +00:00
|
|
|
return Err(MyError::Status(url.to_string(), res.status()));
|
2020-03-17 21:53:31 +00:00
|
|
|
}
|
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
self.breakers.succeed(&parsed_url).await;
|
2020-12-23 18:06:15 +00:00
|
|
|
|
2020-07-10 22:47:41 +00:00
|
|
|
let body = res
|
|
|
|
.body()
|
2020-04-21 21:39:02 +00:00
|
|
|
.await
|
2020-07-10 22:47:41 +00:00
|
|
|
.map_err(|e| MyError::ReceiveResponse(url.to_string(), e.to_string()))?;
|
|
|
|
|
|
|
|
Ok(serde_json::from_slice(body.as_ref())?)
|
2020-03-17 21:53:31 +00:00
|
|
|
}
|
|
|
|
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), MyError> {
|
2020-12-23 18:06:15 +00:00
|
|
|
let parsed_url = url.parse::<Url>()?;
|
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
if !self.breakers.should_try(&parsed_url).await {
|
2020-12-23 18:06:15 +00:00
|
|
|
return Err(MyError::Breaker);
|
|
|
|
}
|
|
|
|
|
2020-03-26 03:53:10 +00:00
|
|
|
info!("Fetching bytes for {}", url);
|
2020-03-30 06:06:13 +00:00
|
|
|
let signer = self.signer();
|
|
|
|
|
2020-05-23 23:56:27 +00:00
|
|
|
let client: Client = self.client.borrow().clone();
|
2020-09-07 21:51:02 +00:00
|
|
|
let res = client
|
|
|
|
.get(url)
|
2020-04-21 19:18:18 +00:00
|
|
|
.header("Accept", "*/*")
|
2020-04-23 18:36:06 +00:00
|
|
|
.set(Date(SystemTime::now().into()))
|
2020-03-30 06:06:13 +00:00
|
|
|
.signature(
|
|
|
|
self.config.clone(),
|
|
|
|
self.key_id.clone(),
|
|
|
|
move |signing_string| signer.sign(signing_string),
|
|
|
|
)
|
|
|
|
.await?
|
2020-03-26 03:26:45 +00:00
|
|
|
.send()
|
2020-05-23 23:56:27 +00:00
|
|
|
.await;
|
|
|
|
|
|
|
|
if res.is_err() {
|
2020-12-29 17:27:14 +00:00
|
|
|
self.breakers.fail(&parsed_url).await;
|
2020-05-23 23:56:27 +00:00
|
|
|
self.count_err();
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut res = res.map_err(|e| MyError::SendRequest(url.to_string(), e.to_string()))?;
|
|
|
|
|
|
|
|
self.reset_err();
|
2020-03-26 03:26:45 +00:00
|
|
|
|
|
|
|
let content_type = if let Some(content_type) = res.headers().get("content-type") {
|
|
|
|
if let Ok(s) = content_type.to_str() {
|
|
|
|
s.to_owned()
|
|
|
|
} else {
|
|
|
|
return Err(MyError::ContentType);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return Err(MyError::ContentType);
|
|
|
|
};
|
|
|
|
|
|
|
|
if !res.status().is_success() {
|
|
|
|
if let Ok(bytes) = res.body().await {
|
|
|
|
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
|
|
|
if !s.is_empty() {
|
2020-04-21 21:39:02 +00:00
|
|
|
debug!("Response from {}, {}", url, s);
|
2020-03-26 03:26:45 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
self.breakers.fail(&parsed_url).await;
|
2020-12-23 18:06:15 +00:00
|
|
|
|
2020-07-25 15:13:00 +00:00
|
|
|
return Err(MyError::Status(url.to_string(), res.status()));
|
2020-03-26 03:26:45 +00:00
|
|
|
}
|
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
self.breakers.succeed(&parsed_url).await;
|
2020-12-23 18:06:15 +00:00
|
|
|
|
2020-03-26 03:26:45 +00:00
|
|
|
let bytes = match res.body().limit(1024 * 1024 * 4).await {
|
|
|
|
Err(e) => {
|
2020-04-21 21:39:02 +00:00
|
|
|
return Err(MyError::ReceiveResponse(url.to_string(), e.to_string()));
|
2020-03-26 03:26:45 +00:00
|
|
|
}
|
|
|
|
Ok(bytes) => bytes,
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok((content_type, bytes))
|
|
|
|
}
|
|
|
|
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) async fn deliver<T>(&self, inbox: Url, item: &T) -> Result<(), MyError>
|
2020-03-18 04:35:20 +00:00
|
|
|
where
|
|
|
|
T: serde::ser::Serialize,
|
|
|
|
{
|
2020-12-29 17:27:14 +00:00
|
|
|
if !self.breakers.should_try(&inbox).await {
|
2020-12-23 18:06:15 +00:00
|
|
|
return Err(MyError::Breaker);
|
|
|
|
}
|
|
|
|
|
2020-03-30 06:06:13 +00:00
|
|
|
let signer = self.signer();
|
2020-03-18 04:35:20 +00:00
|
|
|
let item_string = serde_json::to_string(item)?;
|
|
|
|
|
2020-05-23 23:56:27 +00:00
|
|
|
let client: Client = self.client.borrow().clone();
|
2020-09-07 21:51:02 +00:00
|
|
|
let res = client
|
|
|
|
.post(inbox.as_str())
|
2020-03-18 04:35:20 +00:00
|
|
|
.header("Accept", "application/activity+json")
|
|
|
|
.header("Content-Type", "application/activity+json")
|
2020-04-23 18:36:06 +00:00
|
|
|
.set(Date(SystemTime::now().into()))
|
2020-03-18 04:35:20 +00:00
|
|
|
.signature_with_digest(
|
2020-03-30 06:06:13 +00:00
|
|
|
self.config.clone(),
|
|
|
|
self.key_id.clone(),
|
|
|
|
Sha256::new(),
|
2020-03-18 04:35:20 +00:00
|
|
|
item_string,
|
2020-03-30 06:06:13 +00:00
|
|
|
move |signing_string| signer.sign(signing_string),
|
|
|
|
)
|
|
|
|
.await?
|
2020-03-18 04:35:20 +00:00
|
|
|
.send()
|
2020-05-23 23:56:27 +00:00
|
|
|
.await;
|
|
|
|
|
|
|
|
if res.is_err() {
|
|
|
|
self.count_err();
|
2020-12-29 17:27:14 +00:00
|
|
|
self.breakers.fail(&inbox).await;
|
2020-05-23 23:56:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
let mut res = res.map_err(|e| MyError::SendRequest(inbox.to_string(), e.to_string()))?;
|
|
|
|
|
|
|
|
self.reset_err();
|
2020-03-18 04:35:20 +00:00
|
|
|
|
|
|
|
if !res.status().is_success() {
|
|
|
|
if let Ok(bytes) = res.body().await {
|
|
|
|
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
2020-03-21 20:24:05 +00:00
|
|
|
if !s.is_empty() {
|
2020-04-21 21:39:02 +00:00
|
|
|
debug!("Response from {}, {}", inbox.as_str(), s);
|
2020-03-21 20:24:05 +00:00
|
|
|
}
|
2020-03-18 04:35:20 +00:00
|
|
|
}
|
|
|
|
}
|
2020-12-23 18:06:15 +00:00
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
self.breakers.fail(&inbox).await;
|
2020-07-25 15:13:00 +00:00
|
|
|
return Err(MyError::Status(inbox.to_string(), res.status()));
|
2020-03-18 04:35:20 +00:00
|
|
|
}
|
2020-03-17 17:15:16 +00:00
|
|
|
|
2020-12-29 17:27:14 +00:00
|
|
|
self.breakers.succeed(&inbox).await;
|
2020-12-23 18:06:15 +00:00
|
|
|
|
2020-03-18 04:35:20 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
2020-03-17 17:15:16 +00:00
|
|
|
|
2020-03-30 06:06:13 +00:00
|
|
|
fn signer(&self) -> Signer {
|
|
|
|
Signer {
|
|
|
|
private_key: self.private_key.clone(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
struct Signer {
|
|
|
|
private_key: RSAPrivateKey,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Signer {
|
2020-03-21 20:24:05 +00:00
|
|
|
fn sign(&self, signing_string: &str) -> Result<String, MyError> {
|
2020-03-18 04:35:20 +00:00
|
|
|
let hashed = Sha256::digest(signing_string.as_bytes());
|
2020-07-25 14:33:35 +00:00
|
|
|
let bytes = self.private_key.sign(
|
|
|
|
PaddingScheme::PKCS1v15Sign {
|
|
|
|
hash: Some(Hash::SHA2_256),
|
|
|
|
},
|
|
|
|
&hashed,
|
|
|
|
)?;
|
2020-03-18 04:35:20 +00:00
|
|
|
Ok(base64::encode(bytes))
|
|
|
|
}
|
2020-03-17 17:15:16 +00:00
|
|
|
}
|