Add breakers for requests to down domains

This commit is contained in:
asonix 2020-12-23 12:06:15 -06:00
parent 55cb25f54b
commit e2da563a1c
5 changed files with 552 additions and 348 deletions

748
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -25,11 +25,12 @@ async-trait = "0.1.24"
background-jobs = "0.8.0" background-jobs = "0.8.0"
bytes = "0.5.4" bytes = "0.5.4"
base64 = "0.13" base64 = "0.13"
chrono = "0.4.19"
config = "0.10.1" config = "0.10.1"
deadpool = "0.5.1" deadpool = "0.5.1"
deadpool-postgres = "0.5.5" deadpool-postgres = "0.5.5"
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.7.1" env_logger = "0.8.2"
futures = "0.3.4" futures = "0.3.4"
http-signature-normalization-actix = { version = "0.4.0", default-features = false, features = ["sha-2"] } http-signature-normalization-actix = { version = "0.4.0", default-features = false, features = ["sha-2"] }
log = "0.4" log = "0.4"
@ -54,7 +55,7 @@ uuid = { version = "0.8", features = ["v4", "serde"] }
[build-dependencies] [build-dependencies]
anyhow = "1.0" anyhow = "1.0"
dotenv = "0.15.0" dotenv = "0.15.0"
ructe = { version = "0.12.0", features = ["sass", "mime03"] } ructe = { version = "0.13.0", features = ["sass", "mime03"] }
[profile.dev.package.rsa] [profile.dev.package.rsa]
opt-level = 3 opt-level = 3

View file

@ -3,7 +3,7 @@ use crate::{
data::NodeCache, data::NodeCache,
db::Db, db::Db,
error::MyError, error::MyError,
requests::Requests, requests::{Breakers, Requests},
}; };
use activitystreams::url::Url; use activitystreams::url::Url;
use actix_rt::{ use actix_rt::{
@ -29,6 +29,7 @@ pub struct State {
whitelists: Arc<RwLock<HashSet<String>>>, whitelists: Arc<RwLock<HashSet<String>>>,
listeners: Arc<RwLock<HashSet<Url>>>, listeners: Arc<RwLock<HashSet<Url>>>,
node_cache: NodeCache, node_cache: NodeCache,
breakers: Breakers,
} }
impl State { impl State {
@ -46,6 +47,7 @@ impl State {
self.config.software_version(), self.config.software_version(),
self.config.generate_url(UrlKind::Index), self.config.generate_url(UrlKind::Index),
), ),
self.breakers.clone(),
) )
} }
@ -188,6 +190,7 @@ impl State {
whitelists: Arc::new(RwLock::new(whitelists)), whitelists: Arc::new(RwLock::new(whitelists)),
listeners: listeners.clone(), listeners: listeners.clone(),
node_cache: NodeCache::new(db.clone(), listeners), node_cache: NodeCache::new(db.clone(), listeners),
breakers: Breakers::default(),
}; };
state.spawn_rehydrate(db.clone()); state.spawn_rehydrate(db.clone());

View file

@ -119,6 +119,9 @@ pub enum MyError {
#[error("Blocking operation was canceled")] #[error("Blocking operation was canceled")]
Canceled, Canceled,
#[error("Not trying request due to failed breaker")]
Breaker,
} }
impl ResponseError for MyError { impl ResponseError for MyError {

View file

@ -2,17 +2,114 @@ use crate::error::MyError;
use activitystreams::url::Url; use activitystreams::url::Url;
use actix_web::{client::Client, http::header::Date}; use actix_web::{client::Client, http::header::Date};
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc};
use http_signature_normalization_actix::prelude::*; use http_signature_normalization_actix::prelude::*;
use log::{debug, info, warn}; use log::{debug, info, warn};
use rsa::{hash::Hash, padding::PaddingScheme, RSAPrivateKey}; use rsa::{hash::Hash, padding::PaddingScheme, RSAPrivateKey};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{ use std::{
cell::RefCell, cell::RefCell,
collections::HashMap,
rc::Rc, rc::Rc,
sync::atomic::{AtomicUsize, Ordering}, sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::SystemTime, time::SystemTime,
}; };
#[derive(Clone)]
pub struct Breakers {
inner: Arc<Mutex<HashMap<String, Breaker>>>,
}
impl Breakers {
pub fn new() -> Self {
Self::default()
}
fn should_try(&self, url: &Url) -> bool {
if let Some(domain) = url.domain() {
self.inner
.lock()
.expect("Breakers poisoned")
.get(domain)
.map(|breaker| breaker.should_try())
.unwrap_or(true)
} else {
false
}
}
fn fail(&self, url: &Url) {
if let Some(domain) = url.domain() {
let mut hm = self.inner.lock().expect("Breakers poisoned");
let entry = hm.entry(domain.to_owned()).or_insert(Breaker::default());
entry.fail();
}
}
fn succeed(&self, url: &Url) {
if let Some(domain) = url.domain() {
let mut hm = self.inner.lock().expect("Breakers poisoned");
let entry = hm.entry(domain.to_owned()).or_insert(Breaker::default());
entry.succeed();
}
}
}
impl Default for Breakers {
fn default() -> Self {
Breakers {
inner: Arc::new(Mutex::new(HashMap::new())),
}
}
}
struct Breaker {
failures: usize,
last_attempt: DateTime<Utc>,
last_success: DateTime<Utc>,
}
impl Breaker {
const fn failure_threshold() -> usize {
10
}
fn failure_wait() -> chrono::Duration {
chrono::Duration::days(1)
}
fn should_try(&self) -> bool {
self.failures < Self::failure_threshold()
|| self.last_attempt + Self::failure_wait() < Utc::now()
}
fn fail(&mut self) {
self.failures += 1;
self.last_attempt = Utc::now();
}
fn succeed(&mut self) {
self.failures = 0;
self.last_attempt = Utc::now();
self.last_success = Utc::now();
}
}
impl Default for Breaker {
fn default() -> Self {
let now = Utc::now();
Breaker {
failures: 0,
last_attempt: now,
last_success: now,
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Requests { pub struct Requests {
client: Rc<RefCell<Client>>, client: Rc<RefCell<Client>>,
@ -22,10 +119,16 @@ pub struct Requests {
user_agent: String, user_agent: String,
private_key: RSAPrivateKey, private_key: RSAPrivateKey,
config: Config, config: Config,
breakers: Breakers,
} }
impl Requests { impl Requests {
pub fn new(key_id: String, private_key: RSAPrivateKey, user_agent: String) -> Self { pub fn new(
key_id: String,
private_key: RSAPrivateKey,
user_agent: String,
breakers: Breakers,
) -> Self {
Requests { Requests {
client: Rc::new(RefCell::new( client: Rc::new(RefCell::new(
Client::builder() Client::builder()
@ -38,6 +141,7 @@ impl Requests {
user_agent, user_agent,
private_key, private_key,
config: Config::default().mastodon_compat(), config: Config::default().mastodon_compat(),
breakers,
} }
} }
@ -74,6 +178,12 @@ impl Requests {
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
let parsed_url = url.parse::<Url>()?;
if !self.breakers.should_try(&parsed_url) {
return Err(MyError::Breaker);
}
let signer = self.signer(); let signer = self.signer();
let client: Client = self.client.borrow().clone(); let client: Client = self.client.borrow().clone();
@ -92,6 +202,7 @@ impl Requests {
if res.is_err() { if res.is_err() {
self.count_err(); self.count_err();
self.breakers.fail(&parsed_url);
} }
let mut res = res.map_err(|e| MyError::SendRequest(url.to_string(), e.to_string()))?; let mut res = res.map_err(|e| MyError::SendRequest(url.to_string(), e.to_string()))?;
@ -107,9 +218,13 @@ impl Requests {
} }
} }
self.breakers.fail(&parsed_url);
return Err(MyError::Status(url.to_string(), res.status())); return Err(MyError::Status(url.to_string(), res.status()));
} }
self.breakers.succeed(&parsed_url);
let body = res let body = res
.body() .body()
.await .await
@ -119,6 +234,12 @@ impl Requests {
} }
pub async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), MyError> { pub async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), MyError> {
let parsed_url = url.parse::<Url>()?;
if !self.breakers.should_try(&parsed_url) {
return Err(MyError::Breaker);
}
info!("Fetching bytes for {}", url); info!("Fetching bytes for {}", url);
let signer = self.signer(); let signer = self.signer();
@ -137,6 +258,7 @@ impl Requests {
.await; .await;
if res.is_err() { if res.is_err() {
self.breakers.fail(&parsed_url);
self.count_err(); self.count_err();
} }
@ -163,9 +285,13 @@ impl Requests {
} }
} }
self.breakers.fail(&parsed_url);
return Err(MyError::Status(url.to_string(), res.status())); return Err(MyError::Status(url.to_string(), res.status()));
} }
self.breakers.succeed(&parsed_url);
let bytes = match res.body().limit(1024 * 1024 * 4).await { let bytes = match res.body().limit(1024 * 1024 * 4).await {
Err(e) => { Err(e) => {
return Err(MyError::ReceiveResponse(url.to_string(), e.to_string())); return Err(MyError::ReceiveResponse(url.to_string(), e.to_string()));
@ -180,6 +306,10 @@ impl Requests {
where where
T: serde::ser::Serialize, T: serde::ser::Serialize,
{ {
if !self.breakers.should_try(&inbox) {
return Err(MyError::Breaker);
}
let signer = self.signer(); let signer = self.signer();
let item_string = serde_json::to_string(item)?; let item_string = serde_json::to_string(item)?;
@ -202,6 +332,7 @@ impl Requests {
if res.is_err() { if res.is_err() {
self.count_err(); self.count_err();
self.breakers.fail(&inbox);
} }
let mut res = res.map_err(|e| MyError::SendRequest(inbox.to_string(), e.to_string()))?; let mut res = res.map_err(|e| MyError::SendRequest(inbox.to_string(), e.to_string()))?;
@ -216,9 +347,13 @@ impl Requests {
} }
} }
} }
self.breakers.fail(&inbox);
return Err(MyError::Status(inbox.to_string(), res.status())); return Err(MyError::Status(inbox.to_string(), res.status()));
} }
self.breakers.succeed(&inbox);
Ok(()) Ok(())
} }