Enable different breaker failure cases for different endpoints

Additionally, don't count 4xx towards succeeding a breaker
This commit is contained in:
asonix 2023-09-08 19:11:24 -06:00
parent 5a6fbbcb77
commit 804d22ee81
8 changed files with 114 additions and 27 deletions

View file

@ -2,7 +2,7 @@ use crate::{
apub::AcceptedActors, apub::AcceptedActors,
db::{Actor, Db}, db::{Actor, Db},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
requests::Requests, requests::{BreakerStrategy, Requests},
}; };
use activitystreams::{iri_string::types::IriString, prelude::*}; use activitystreams::{iri_string::types::IriString, prelude::*};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
@ -71,7 +71,9 @@ impl ActorCache {
id: &IriString, id: &IriString,
requests: &Requests, requests: &Requests,
) -> Result<Actor, Error> { ) -> Result<Actor, Error> {
let accepted_actor = requests.fetch::<AcceptedActors>(id).await?; let accepted_actor = requests
.fetch::<AcceptedActors>(id, BreakerStrategy::Require2XX)
.await?;
let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?; let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?;
let accepted_actor_id = accepted_actor let accepted_actor_id = accepted_actor

View file

@ -2,6 +2,7 @@ use crate::{
apub::AcceptedActors, apub::AcceptedActors,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
jobs::JobState, jobs::JobState,
requests::BreakerStrategy,
}; };
use activitystreams::{iri_string::types::IriString, object::Image, prelude::*}; use activitystreams::{iri_string::types::IriString, object::Image, prelude::*};
use background_jobs::ActixJob; use background_jobs::ActixJob;
@ -44,7 +45,7 @@ impl QueryContact {
let contact = match state let contact = match state
.state .state
.requests .requests
.fetch::<AcceptedActors>(&self.contact_id) .fetch::<AcceptedActors>(&self.contact_id, BreakerStrategy::Allow404AndBelow)
.await .await
{ {
Ok(contact) => contact, Ok(contact) => contact,

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
error::Error, error::Error,
jobs::{debug_object, JobState}, jobs::{debug_object, JobState},
requests::BreakerStrategy,
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use background_jobs::{ActixJob, Backoff}; use background_jobs::{ActixJob, Backoff};
@ -35,7 +36,12 @@ impl Deliver {
#[tracing::instrument(name = "Deliver", skip(state))] #[tracing::instrument(name = "Deliver", skip(state))]
async fn permform(self, state: JobState) -> Result<(), Error> { async fn permform(self, state: JobState) -> Result<(), Error> {
if let Err(e) = state.state.requests.deliver(&self.to, &self.data).await { if let Err(e) = state
.state
.requests
.deliver(&self.to, &self.data, BreakerStrategy::Allow401AndBelow)
.await
{
if e.is_breaker() { if e.is_breaker() {
tracing::debug!("Not trying due to failed breaker"); tracing::debug!("Not trying due to failed breaker");
return Ok(()); return Ok(());

View file

@ -2,6 +2,7 @@ use crate::{
config::UrlKind, config::UrlKind,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
jobs::{Boolish, JobState}, jobs::{Boolish, JobState},
requests::BreakerStrategy,
}; };
use activitystreams::{iri, iri_string::types::IriString}; use activitystreams::{iri, iri_string::types::IriString};
use background_jobs::ActixJob; use background_jobs::ActixJob;
@ -42,7 +43,10 @@ impl QueryInstance {
state state
.state .state
.requests .requests
.fetch_json::<Instance>(&mastodon_instance_uri) .fetch_json::<Instance>(
&mastodon_instance_uri,
BreakerStrategy::Allow404AndBelow,
)
.await .await
} }
InstanceApiType::Misskey => { InstanceApiType::Misskey => {
@ -50,7 +54,10 @@ impl QueryInstance {
state state
.state .state
.requests .requests
.fetch_json_msky::<MisskeyMeta>(&msky_meta_uri) .fetch_json_msky::<MisskeyMeta>(
&msky_meta_uri,
BreakerStrategy::Allow404AndBelow,
)
.await .await
.map(|res| res.into()) .map(|res| res.into())
} }

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
error::{Error, ErrorKind}, error::{Error, ErrorKind},
jobs::{Boolish, JobState, QueryContact}, jobs::{Boolish, JobState, QueryContact},
requests::BreakerStrategy,
}; };
use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany}; use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany};
use background_jobs::ActixJob; use background_jobs::ActixJob;
@ -45,7 +46,7 @@ impl QueryNodeinfo {
let well_known = match state let well_known = match state
.state .state
.requests .requests
.fetch_json::<WellKnown>(&well_known_uri) .fetch_json::<WellKnown>(&well_known_uri, BreakerStrategy::Allow404AndBelow)
.await .await
{ {
Ok(well_known) => well_known, Ok(well_known) => well_known,
@ -62,7 +63,12 @@ impl QueryNodeinfo {
return Ok(()); return Ok(());
}; };
let nodeinfo = match state.state.requests.fetch_json::<Nodeinfo>(&href).await { let nodeinfo = match state
.state
.requests
.fetch_json::<Nodeinfo>(&href, BreakerStrategy::Require2XX)
.await
{
Ok(nodeinfo) => nodeinfo, Ok(nodeinfo) => nodeinfo,
Err(e) if e.is_breaker() => { Err(e) if e.is_breaker() => {
tracing::debug!("Not retrying due to failed breaker"); tracing::debug!("Not retrying due to failed breaker");

View file

@ -2,7 +2,7 @@ use crate::{
apub::AcceptedActors, apub::AcceptedActors,
data::{ActorCache, State}, data::{ActorCache, State},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
requests::Requests, requests::{BreakerStrategy, Requests},
spawner::Spawner, spawner::Spawner,
}; };
use activitystreams::{base::BaseExt, iri, iri_string::types::IriString}; use activitystreams::{base::BaseExt, iri, iri_string::types::IriString};
@ -70,7 +70,11 @@ impl MyVerify {
actor_id actor_id
} else { } else {
match self.0.fetch::<PublicKeyResponse>(&public_key_id).await { match self
.0
.fetch::<PublicKeyResponse>(&public_key_id, BreakerStrategy::Require2XX)
.await
{
Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId), Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId),
Err(e) => { Err(e) => {
if e.is_gone() { if e.is_gone() {

View file

@ -24,6 +24,16 @@ const ONE_MINUTE: u64 = 60 * ONE_SECOND;
const ONE_HOUR: u64 = 60 * ONE_MINUTE; const ONE_HOUR: u64 = 60 * ONE_MINUTE;
const ONE_DAY: u64 = 24 * ONE_HOUR; const ONE_DAY: u64 = 24 * ONE_HOUR;
#[derive(Debug)]
pub(crate) enum BreakerStrategy {
// Requires a successful response
Require2XX,
// Allows HTTP 2xx-401
Allow401AndBelow,
// Allows HTTP 2xx-404
Allow404AndBelow,
}
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Breakers { pub(crate) struct Breakers {
inner: Arc<DashMap<String, Breaker>>, inner: Arc<DashMap<String, Breaker>>,
@ -193,6 +203,7 @@ impl Requests {
async fn check_response( async fn check_response(
&self, &self,
parsed_url: &IriString, parsed_url: &IriString,
strategy: BreakerStrategy,
res: Result<reqwest::Response, reqwest_middleware::Error>, res: Result<reqwest::Response, reqwest_middleware::Error>,
) -> Result<reqwest::Response, Error> { ) -> Result<reqwest::Response, Error> {
if res.is_err() { if res.is_err() {
@ -203,7 +214,13 @@ impl Requests {
let status = res.status(); let status = res.status();
if status.is_server_error() { let success = match strategy {
BreakerStrategy::Require2XX => status.is_success(),
BreakerStrategy::Allow401AndBelow => (200..=401).contains(&status.as_u16()),
BreakerStrategy::Allow404AndBelow => (200..=404).contains(&status.as_u16()),
};
if !success {
self.breakers.fail(&parsed_url); self.breakers.fail(&parsed_url);
if let Ok(s) = res.text().await { if let Ok(s) = res.text().await {
@ -215,22 +232,33 @@ impl Requests {
return Err(ErrorKind::Status(parsed_url.to_string(), status).into()); return Err(ErrorKind::Status(parsed_url.to_string(), status).into());
} }
// only actually succeed a breaker on 2xx response
if status.is_success() {
self.last_online.mark_seen(&parsed_url); self.last_online.mark_seen(&parsed_url);
self.breakers.succeed(&parsed_url); self.breakers.succeed(&parsed_url);
}
Ok(res) Ok(res)
} }
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json<T>(&self, url: &IriString) -> Result<T, Error> pub(crate) async fn fetch_json<T>(
&self,
url: &IriString,
strategy: BreakerStrategy,
) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
self.do_fetch(url, "application/json").await self.do_fetch(url, "application/json", strategy).await
} }
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json_msky<T>(&self, url: &IriString) -> Result<T, Error> pub(crate) async fn fetch_json_msky<T>(
&self,
url: &IriString,
strategy: BreakerStrategy,
) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
@ -240,6 +268,7 @@ impl Requests {
&serde_json::json!({}), &serde_json::json!({}),
"application/json", "application/json",
"application/json", "application/json",
strategy,
) )
.await? .await?
.bytes() .bytes()
@ -249,31 +278,50 @@ impl Requests {
} }
#[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch<T>(&self, url: &IriString) -> Result<T, Error> pub(crate) async fn fetch<T>(
&self,
url: &IriString,
strategy: BreakerStrategy,
) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
self.do_fetch(url, "application/activity+json").await self.do_fetch(url, "application/activity+json", strategy)
.await
} }
async fn do_fetch<T>(&self, url: &IriString, accept: &str) -> Result<T, Error> async fn do_fetch<T>(
&self,
url: &IriString,
accept: &str,
strategy: BreakerStrategy,
) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
let body = self.do_fetch_response(url, accept).await?.bytes().await?; let body = self
.do_fetch_response(url, accept, strategy)
.await?
.bytes()
.await?;
Ok(serde_json::from_slice(&body)?) Ok(serde_json::from_slice(&body)?)
} }
#[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
pub(crate) async fn fetch_response(&self, url: &IriString) -> Result<reqwest::Response, Error> { pub(crate) async fn fetch_response(
self.do_fetch_response(url, "*/*").await &self,
url: &IriString,
strategy: BreakerStrategy,
) -> Result<reqwest::Response, Error> {
self.do_fetch_response(url, "*/*", strategy).await
} }
pub(crate) async fn do_fetch_response( pub(crate) async fn do_fetch_response(
&self, &self,
url: &IriString, url: &IriString,
accept: &str, accept: &str,
strategy: BreakerStrategy,
) -> Result<reqwest::Response, Error> { ) -> Result<reqwest::Response, Error> {
if !self.breakers.should_try(url) { if !self.breakers.should_try(url) {
return Err(ErrorKind::Breaker.into()); return Err(ErrorKind::Breaker.into());
@ -295,7 +343,7 @@ impl Requests {
let res = self.client.execute(request).await; let res = self.client.execute(request).await;
let res = self.check_response(url, res).await?; let res = self.check_response(url, strategy, res).await?;
Ok(res) Ok(res)
} }
@ -305,7 +353,12 @@ impl Requests {
skip_all, skip_all,
fields(inbox = inbox.to_string().as_str(), signing_string) fields(inbox = inbox.to_string().as_str(), signing_string)
)] )]
pub(crate) async fn deliver<T>(&self, inbox: &IriString, item: &T) -> Result<(), Error> pub(crate) async fn deliver<T>(
&self,
inbox: &IriString,
item: &T,
strategy: BreakerStrategy,
) -> Result<(), Error>
where where
T: serde::ser::Serialize + std::fmt::Debug, T: serde::ser::Serialize + std::fmt::Debug,
{ {
@ -314,6 +367,7 @@ impl Requests {
item, item,
"application/activity+json", "application/activity+json",
"application/activity+json", "application/activity+json",
strategy,
) )
.await?; .await?;
Ok(()) Ok(())
@ -325,6 +379,7 @@ impl Requests {
item: &T, item: &T,
content_type: &str, content_type: &str,
accept: &str, accept: &str,
strategy: BreakerStrategy,
) -> Result<reqwest::Response, Error> ) -> Result<reqwest::Response, Error>
where where
T: serde::ser::Serialize + std::fmt::Debug, T: serde::ser::Serialize + std::fmt::Debug,
@ -357,7 +412,7 @@ impl Requests {
let res = self.client.execute(request).await; let res = self.client.execute(request).await;
let res = self.check_response(inbox, res).await?; let res = self.check_response(inbox, strategy, res).await?;
Ok(res) Ok(res)
} }

View file

@ -1,4 +1,8 @@
use crate::{data::MediaCache, error::Error, requests::Requests}; use crate::{
data::MediaCache,
error::Error,
requests::{BreakerStrategy, Requests},
};
use actix_web::{body::BodyStream, web, HttpResponse}; use actix_web::{body::BodyStream, web, HttpResponse};
use uuid::Uuid; use uuid::Uuid;
@ -11,7 +15,9 @@ pub(crate) async fn route(
let uuid = uuid.into_inner(); let uuid = uuid.into_inner();
if let Some(url) = media.get_url(uuid).await? { if let Some(url) = media.get_url(uuid).await? {
let res = requests.fetch_response(&url).await?; let res = requests
.fetch_response(&url, BreakerStrategy::Allow404AndBelow)
.await?;
let mut response = HttpResponse::build(res.status()); let mut response = HttpResponse::build(res.status());