diff --git a/src/error.rs b/src/error.rs index 3ca22f4..0b7716b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -100,6 +100,9 @@ pub(crate) enum ErrorKind { #[error("Couldn't sign digest")] Signature(#[from] signature::Error), + #[error("Server {0} failed to validate our signature")] + SignedDelivery(String), + #[error("Couldn't parse the signature header")] HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue), diff --git a/src/requests.rs b/src/requests.rs index e417c03..2de835f 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -1,7 +1,7 @@ use crate::error::{Error, ErrorKind}; use activitystreams::iri_string::types::IriString; use actix_web::{http::header::Date, web::Bytes}; -use awc::Client; +use awc::{Client, ClientResponse}; use dashmap::DashMap; use http_signature_normalization_actix::prelude::*; use rand::thread_rng; @@ -53,6 +53,9 @@ impl Breakers { let should_write = { if let Some(mut breaker) = self.inner.get_mut(authority) { breaker.fail(); + if !breaker.should_try() { + tracing::warn!("Failed breaker for {}", authority); + } false } else { true @@ -101,17 +104,12 @@ struct Breaker { } impl Breaker { - const fn failure_threshold() -> usize { - 10 - } - - fn failure_wait() -> Duration { - Duration::from_secs(ONE_DAY) - } + const FAILURE_WAIT: Duration = Duration::from_secs(ONE_DAY); + const FAILURE_THRESHOLD: usize = 10; fn should_try(&self) -> bool { - self.failures < Self::failure_threshold() - || self.last_attempt + Self::failure_wait() < SystemTime::now() + self.failures < Self::FAILURE_THRESHOLD + || self.last_attempt + Self::FAILURE_WAIT < SystemTime::now() } fn fail(&mut self) { @@ -192,7 +190,7 @@ impl Requests { fn count_err(&self) { let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed); if count + 1 >= self.error_limit { - tracing::warn!("{} consecutive errors, rebuilding http client", count); + tracing::warn!("{} consecutive errors, rebuilding http client", count + 1); *self.client.borrow_mut() = build_client(&self.user_agent); self.reset_err(); } @@ -202,7 +200,36 @@ impl Requests { self.consecutive_errors.swap(0, Ordering::Relaxed); } - #[tracing::instrument(name = "Fetch Json", skip(self))] + async fn check_response( + &self, + parsed_url: &IriString, + res: &mut ClientResponse, + ) -> Result<(), Error> { + self.reset_err(); + + if !res.status().is_success() { + self.breakers.fail(&parsed_url); + + if let Ok(bytes) = res.body().await { + if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { + if s.to_lowercase().contains("http signature") { + return Err(ErrorKind::SignedDelivery(parsed_url.to_string()).into()); + } + if !s.is_empty() { + tracing::warn!("Response from {}, {}", parsed_url, s); + } + } + } + + return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into()); + } + + self.breakers.succeed(&parsed_url); + + Ok(()) + } + + #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] pub(crate) async fn fetch_json(&self, url: &str) -> Result where T: serde::de::DeserializeOwned, @@ -210,7 +237,7 @@ impl Requests { self.do_fetch(url, "application/json").await } - #[tracing::instrument(name = "Fetch Activity+Json", skip(self))] + #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))] pub(crate) async fn fetch(&self, url: &str) -> Result where T: serde::de::DeserializeOwned, @@ -229,6 +256,7 @@ impl Requests { } let signer = self.signer(); + let span = tracing::Span::current(); let client: Client = self.client.borrow().clone(); let res = client @@ -238,7 +266,10 @@ impl Requests { .signature( self.config.clone(), self.key_id.clone(), - move |signing_string| signer.sign(signing_string), + move |signing_string| { + span.record("signing_string", signing_string); + span.in_scope(|| signer.sign(signing_string)) + }, ) .await? .send() @@ -251,23 +282,7 @@ impl Requests { let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?; - self.reset_err(); - - if !res.status().is_success() { - if let Ok(bytes) = res.body().await { - if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - if !s.is_empty() { - tracing::warn!("Response from {}, {}", url, s); - } - } - } - - self.breakers.fail(&parsed_url); - - return Err(ErrorKind::Status(url.to_string(), res.status()).into()); - } - - self.breakers.succeed(&parsed_url); + self.check_response(&parsed_url, &mut res).await?; let body = res .body() @@ -277,7 +292,7 @@ impl Requests { Ok(serde_json::from_slice(body.as_ref())?) } - #[tracing::instrument(name = "Fetch Bytes", skip(self))] + #[tracing::instrument(name = "Fetch Bytes", skip(self), fields(signing_string))] pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> { let parsed_url = url.parse::()?; @@ -285,8 +300,8 @@ impl Requests { return Err(ErrorKind::Breaker.into()); } - tracing::info!("Fetching bytes for {}", url); let signer = self.signer(); + let span = tracing::Span::current(); let client: Client = self.client.borrow().clone(); let res = client @@ -296,7 +311,10 @@ impl Requests { .signature( self.config.clone(), self.key_id.clone(), - move |signing_string| signer.sign(signing_string), + move |signing_string| { + span.record("signing_string", signing_string); + span.in_scope(|| signer.sign(signing_string)) + }, ) .await? .send() @@ -309,7 +327,7 @@ impl Requests { let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?; - self.reset_err(); + self.check_response(&parsed_url, &mut res).await?; let content_type = if let Some(content_type) = res.headers().get("content-type") { if let Ok(s) = content_type.to_str() { @@ -321,22 +339,6 @@ impl Requests { return Err(ErrorKind::ContentType.into()); }; - if !res.status().is_success() { - if let Ok(bytes) = res.body().await { - if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - if !s.is_empty() { - tracing::warn!("Response from {}, {}", url, s); - } - } - } - - self.breakers.fail(&parsed_url); - - return Err(ErrorKind::Status(url.to_string(), res.status()).into()); - } - - self.breakers.succeed(&parsed_url); - let bytes = match res.body().limit(1024 * 1024 * 4).await { Err(e) => { return Err(ErrorKind::ReceiveResponse(url.to_string(), e.to_string()).into()); @@ -350,7 +352,7 @@ impl Requests { #[tracing::instrument( "Deliver to Inbox", skip_all, - fields(inbox = inbox.to_string().as_str()) + fields(inbox = inbox.to_string().as_str(), signing_string) )] pub(crate) async fn deliver(&self, inbox: IriString, item: &T) -> Result<(), Error> where @@ -361,6 +363,7 @@ impl Requests { } let signer = self.signer(); + let span = tracing::Span::current(); let item_string = serde_json::to_string(item)?; let client: Client = self.client.borrow().clone(); @@ -374,7 +377,10 @@ impl Requests { self.key_id.clone(), Sha256::new(), item_string, - move |signing_string| signer.sign(signing_string), + move |signing_string| { + span.record("signing_string", signing_string); + span.in_scope(|| signer.sign(signing_string)) + }, ) .await? .split(); @@ -388,22 +394,7 @@ impl Requests { let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?; - self.reset_err(); - - if !res.status().is_success() { - if let Ok(bytes) = res.body().await { - if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - if !s.is_empty() { - tracing::warn!("Response from {}, {}", inbox.as_str(), s); - } - } - } - - self.breakers.fail(&inbox); - return Err(ErrorKind::Status(inbox.to_string(), res.status()).into()); - } - - self.breakers.succeed(&inbox); + self.check_response(&inbox, &mut res).await?; Ok(()) }