Check for signature errors, record signing strings

This commit is contained in:
asonix 2022-11-16 11:23:36 -06:00
parent 7e01cbfc41
commit f55ea0a550
2 changed files with 63 additions and 69 deletions

View file

@ -100,6 +100,9 @@ pub(crate) enum ErrorKind {
#[error("Couldn't sign digest")] #[error("Couldn't sign digest")]
Signature(#[from] signature::Error), Signature(#[from] signature::Error),
#[error("Server {0} failed to validate our signature")]
SignedDelivery(String),
#[error("Couldn't parse the signature header")] #[error("Couldn't parse the signature header")]
HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue), HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue),

View file

@ -1,7 +1,7 @@
use crate::error::{Error, ErrorKind}; use crate::error::{Error, ErrorKind};
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::{http::header::Date, web::Bytes}; use actix_web::{http::header::Date, web::Bytes};
use awc::Client; use awc::{Client, ClientResponse};
use dashmap::DashMap; use dashmap::DashMap;
use http_signature_normalization_actix::prelude::*; use http_signature_normalization_actix::prelude::*;
use rand::thread_rng; use rand::thread_rng;
@ -53,6 +53,9 @@ impl Breakers {
let should_write = { let should_write = {
if let Some(mut breaker) = self.inner.get_mut(authority) { if let Some(mut breaker) = self.inner.get_mut(authority) {
breaker.fail(); breaker.fail();
if !breaker.should_try() {
tracing::warn!("Failed breaker for {}", authority);
}
false false
} else { } else {
true true
@ -101,17 +104,12 @@ struct Breaker {
} }
impl Breaker { impl Breaker {
const fn failure_threshold() -> usize { const FAILURE_WAIT: Duration = Duration::from_secs(ONE_DAY);
10 const FAILURE_THRESHOLD: usize = 10;
}
fn failure_wait() -> Duration {
Duration::from_secs(ONE_DAY)
}
fn should_try(&self) -> bool { fn should_try(&self) -> bool {
self.failures < Self::failure_threshold() self.failures < Self::FAILURE_THRESHOLD
|| self.last_attempt + Self::failure_wait() < SystemTime::now() || self.last_attempt + Self::FAILURE_WAIT < SystemTime::now()
} }
fn fail(&mut self) { fn fail(&mut self) {
@ -192,7 +190,7 @@ impl Requests {
fn count_err(&self) { fn count_err(&self) {
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed); let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
if count + 1 >= self.error_limit { if count + 1 >= self.error_limit {
tracing::warn!("{} consecutive errors, rebuilding http client", count); tracing::warn!("{} consecutive errors, rebuilding http client", count + 1);
*self.client.borrow_mut() = build_client(&self.user_agent); *self.client.borrow_mut() = build_client(&self.user_agent);
self.reset_err(); self.reset_err();
} }
@ -202,7 +200,36 @@ impl Requests {
self.consecutive_errors.swap(0, Ordering::Relaxed); self.consecutive_errors.swap(0, Ordering::Relaxed);
} }
#[tracing::instrument(name = "Fetch Json", skip(self))] async fn check_response(
&self,
parsed_url: &IriString,
res: &mut ClientResponse,
) -> Result<(), Error> {
self.reset_err();
if !res.status().is_success() {
self.breakers.fail(&parsed_url);
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if s.to_lowercase().contains("http signature") {
return Err(ErrorKind::SignedDelivery(parsed_url.to_string()).into());
}
if !s.is_empty() {
tracing::warn!("Response from {}, {}", parsed_url, s);
}
}
}
return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into());
}
self.breakers.succeed(&parsed_url);
Ok(())
}
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error> pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
@ -210,7 +237,7 @@ impl Requests {
self.do_fetch(url, "application/json").await self.do_fetch(url, "application/json").await
} }
#[tracing::instrument(name = "Fetch Activity+Json", skip(self))] #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error> pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
@ -229,6 +256,7 @@ impl Requests {
} }
let signer = self.signer(); let signer = self.signer();
let span = tracing::Span::current();
let client: Client = self.client.borrow().clone(); let client: Client = self.client.borrow().clone();
let res = client let res = client
@ -238,7 +266,10 @@ impl Requests {
.signature( .signature(
self.config.clone(), self.config.clone(),
self.key_id.clone(), self.key_id.clone(),
move |signing_string| signer.sign(signing_string), move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
) )
.await? .await?
.send() .send()
@ -251,23 +282,7 @@ impl Requests {
let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?; let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?;
self.reset_err(); self.check_response(&parsed_url, &mut res).await?;
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
tracing::warn!("Response from {}, {}", url, s);
}
}
}
self.breakers.fail(&parsed_url);
return Err(ErrorKind::Status(url.to_string(), res.status()).into());
}
self.breakers.succeed(&parsed_url);
let body = res let body = res
.body() .body()
@ -277,7 +292,7 @@ impl Requests {
Ok(serde_json::from_slice(body.as_ref())?) Ok(serde_json::from_slice(body.as_ref())?)
} }
#[tracing::instrument(name = "Fetch Bytes", skip(self))] #[tracing::instrument(name = "Fetch Bytes", skip(self), fields(signing_string))]
pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> { pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> {
let parsed_url = url.parse::<IriString>()?; let parsed_url = url.parse::<IriString>()?;
@ -285,8 +300,8 @@ impl Requests {
return Err(ErrorKind::Breaker.into()); return Err(ErrorKind::Breaker.into());
} }
tracing::info!("Fetching bytes for {}", url);
let signer = self.signer(); let signer = self.signer();
let span = tracing::Span::current();
let client: Client = self.client.borrow().clone(); let client: Client = self.client.borrow().clone();
let res = client let res = client
@ -296,7 +311,10 @@ impl Requests {
.signature( .signature(
self.config.clone(), self.config.clone(),
self.key_id.clone(), self.key_id.clone(),
move |signing_string| signer.sign(signing_string), move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
) )
.await? .await?
.send() .send()
@ -309,7 +327,7 @@ impl Requests {
let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?; let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?;
self.reset_err(); self.check_response(&parsed_url, &mut res).await?;
let content_type = if let Some(content_type) = res.headers().get("content-type") { let content_type = if let Some(content_type) = res.headers().get("content-type") {
if let Ok(s) = content_type.to_str() { if let Ok(s) = content_type.to_str() {
@ -321,22 +339,6 @@ impl Requests {
return Err(ErrorKind::ContentType.into()); return Err(ErrorKind::ContentType.into());
}; };
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
tracing::warn!("Response from {}, {}", url, s);
}
}
}
self.breakers.fail(&parsed_url);
return Err(ErrorKind::Status(url.to_string(), res.status()).into());
}
self.breakers.succeed(&parsed_url);
let bytes = match res.body().limit(1024 * 1024 * 4).await { let bytes = match res.body().limit(1024 * 1024 * 4).await {
Err(e) => { Err(e) => {
return Err(ErrorKind::ReceiveResponse(url.to_string(), e.to_string()).into()); return Err(ErrorKind::ReceiveResponse(url.to_string(), e.to_string()).into());
@ -350,7 +352,7 @@ impl Requests {
#[tracing::instrument( #[tracing::instrument(
"Deliver to Inbox", "Deliver to Inbox",
skip_all, skip_all,
fields(inbox = inbox.to_string().as_str()) fields(inbox = inbox.to_string().as_str(), signing_string)
)] )]
pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error> pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error>
where where
@ -361,6 +363,7 @@ impl Requests {
} }
let signer = self.signer(); let signer = self.signer();
let span = tracing::Span::current();
let item_string = serde_json::to_string(item)?; let item_string = serde_json::to_string(item)?;
let client: Client = self.client.borrow().clone(); let client: Client = self.client.borrow().clone();
@ -374,7 +377,10 @@ impl Requests {
self.key_id.clone(), self.key_id.clone(),
Sha256::new(), Sha256::new(),
item_string, item_string,
move |signing_string| signer.sign(signing_string), move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
) )
.await? .await?
.split(); .split();
@ -388,22 +394,7 @@ impl Requests {
let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?; let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?;
self.reset_err(); self.check_response(&inbox, &mut res).await?;
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
tracing::warn!("Response from {}, {}", inbox.as_str(), s);
}
}
}
self.breakers.fail(&inbox);
return Err(ErrorKind::Status(inbox.to_string(), res.status()).into());
}
self.breakers.succeed(&inbox);
Ok(()) Ok(())
} }