From 7b9aff925afaa00a948fd6b11460b661027178d9 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Fri, 17 Sep 2021 19:34:16 -0500 Subject: [PATCH] Instrument -actix with tracing --- http-signature-normalization-actix/Cargo.toml | 7 +- .../src/digest/middleware.rs | 103 ++++++++--- .../src/digest/mod.rs | 1 + .../src/digest/sha2.rs | 2 +- .../src/digest/sha3.rs | 2 +- http-signature-normalization-actix/src/lib.rs | 10 +- .../src/middleware.rs | 173 +++++++++++++----- 7 files changed, 223 insertions(+), 75 deletions(-) diff --git a/http-signature-normalization-actix/Cargo.toml b/http-signature-normalization-actix/Cargo.toml index 1356d40..c305d16 100644 --- a/http-signature-normalization-actix/Cargo.toml +++ b/http-signature-normalization-actix/Cargo.toml @@ -29,12 +29,15 @@ actix-web = { version = "4.0.0-beta.8", default-features = false } awc = { version = "3.0.0-beta.7", default-features = false } base64 = { version = "0.13", optional = true } chrono = "0.4.6" -futures = "0.3" +futures-util = { version = "0.3", default-features = false } http-signature-normalization = { version = "0.5.1", path = ".." } -log = "0.4" sha2 = { version = "0.9", optional = true } sha3 = { version = "0.9", optional = true } thiserror = "1.0" +tokio = { version = "1", default-features = false, features = ["sync"] } +tracing = "0.1" +tracing-error = "0.1" +tracing-futures = "0.2" [dev-dependencies] actix-rt = "2.1.0" diff --git a/http-signature-normalization-actix/src/digest/middleware.rs b/http-signature-normalization-actix/src/digest/middleware.rs index ea8b58f..8342918 100644 --- a/http-signature-normalization-actix/src/digest/middleware.rs +++ b/http-signature-normalization-actix/src/digest/middleware.rs @@ -7,13 +7,18 @@ use actix_web::{ http::{header::HeaderValue, StatusCode}, web, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, }; -use futures::{channel::mpsc, Stream, StreamExt}; -use log::{debug, warn}; +use futures_util::{ + future::LocalBoxFuture, + stream::{Stream, StreamExt}, +}; use std::{ - future::{ready, Future, Ready}, + future::{ready, Ready}, pin::Pin, task::{Context, Poll}, }; +use tokio::sync::mpsc; +use tracing::{debug, Span}; +use tracing_error::SpanTrace; #[derive(Copy, Clone, Debug)] /// A type implementing FromRequest that can be used in route handler to guard for verified @@ -44,7 +49,47 @@ pub struct VerifyMiddleware(S, bool, T); #[derive(Debug, thiserror::Error)] #[error("Error verifying digest")] #[doc(hidden)] -pub struct VerifyError; +pub struct VerifyError { + context: SpanTrace, + kind: VerifyErrorKind, +} + +impl VerifyError { + fn new(span: &Span, kind: VerifyErrorKind) -> Self { + span.in_scope(|| VerifyError { + context: SpanTrace::capture(), + kind, + }) + } +} + +#[derive(Debug, thiserror::Error)] +enum VerifyErrorKind { + #[error("Missing request extension")] + Extension, + + #[error("Digest header missing")] + MissingDigest, + + #[error("Digest header is empty")] + Empty, + + #[error("Failed to verify digest")] + Verify, + + #[error("Payload dropped. If this was unexpected, it could be that the payload isn't required in the route this middleware is guarding")] + Dropped, +} + +struct RxStream(mpsc::Receiver); + +impl Stream for RxStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_recv(cx) + } +} impl VerifyDigest where @@ -70,7 +115,11 @@ impl FromRequest for DigestVerified { type Config = (); fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { - let res = req.extensions().get::().copied().ok_or(VerifyError); + let res = req + .extensions() + .get::() + .copied() + .ok_or_else(|| VerifyError::new(&Span::current(), VerifyErrorKind::Extension)); if res.is_err() { debug!("Failed to fetch DigestVerified from request"); @@ -98,8 +147,6 @@ where } } -type FutResult = dyn Future>; - impl Service for VerifyMiddleware where T: DigestVerify + Clone + Send + 'static, @@ -109,50 +156,61 @@ where { type Response = ServiceResponse; type Error = actix_web::Error; - type Future = Pin>>; + type Future = LocalBoxFuture<'static, Result>; fn poll_ready(&self, cx: &mut Context) -> Poll> { self.0.poll_ready(cx) } fn call(&self, mut req: ServiceRequest) -> Self::Future { + let span = tracing::info_span!( + "Verify digest", + digest.required = tracing::field::display(&self.1), + ); + if let Some(digest) = req.headers().get("Digest") { let vec = match parse_digest(digest) { Some(vec) => vec, None => { - warn!("Digest header could not be parsed"); - return Box::pin(ready(Err(VerifyError.into()))); + return Box::pin(ready(Err( + VerifyError::new(&span, VerifyErrorKind::Empty).into() + ))); } }; let payload = req.take_payload(); let (tx, rx) = mpsc::channel(1); - let f1 = verify_payload(vec, self.2.clone(), payload, tx); + let f1 = span.in_scope(|| verify_payload(vec, self.2.clone(), payload, tx)); let payload: Pin> + 'static>> = - Box::pin(rx.map(Ok)); + Box::pin(RxStream(rx).map(Ok)); req.set_payload(payload.into()); req.extensions_mut().insert(DigestVerified); let f2 = self.0.call(req); Box::pin(async move { - f1.await?; - f2.await + let (_, res) = futures_util::future::join(f1, f2).await; + res }) } else if self.1 { - Box::pin(ready(Err(VerifyError.into()))) + Box::pin(ready(Err(VerifyError::new( + &span, + VerifyErrorKind::MissingDigest, + ) + .into()))) } else { Box::pin(self.0.call(req)) } } } +#[tracing::instrument(name = "Verify Payload", skip(verify_digest, payload, tx))] async fn verify_payload( vec: Vec, mut verify_digest: T, mut payload: Payload, - mut tx: mpsc::Sender, + tx: mpsc::Sender, ) -> Result<(), actix_web::Error> where T: DigestVerify + Clone + Send + 'static, @@ -161,16 +219,14 @@ where let bytes = res?; let bytes2 = bytes.clone(); verify_digest = web::block(move || { - verify_digest.update(&bytes2.as_ref()); + verify_digest.update(bytes2.as_ref()); Ok(verify_digest) as Result }) .await??; - if tx.is_closed() { - warn!("Payload dropped. If this was unexpected, it could be that the payload isn't required in the route this middleware is guarding"); - } - - tx.try_send(bytes).map_err(|_| VerifyError)?; + tx.send(bytes) + .await + .map_err(|_| VerifyError::new(&Span::current(), VerifyErrorKind::Dropped))?; } let verified = @@ -179,8 +235,7 @@ where if verified { Ok(()) } else { - warn!("Digest could not be verified"); - Err(VerifyError.into()) + Err(VerifyError::new(&Span::current(), VerifyErrorKind::Verify).into()) } } diff --git a/http-signature-normalization-actix/src/digest/mod.rs b/http-signature-normalization-actix/src/digest/mod.rs index e16f328..bb750bd 100644 --- a/http-signature-normalization-actix/src/digest/mod.rs +++ b/http-signature-normalization-actix/src/digest/mod.rs @@ -87,6 +87,7 @@ pub trait SignExt: Sign { } /// A parsed digest from the request +#[derive(Debug)] pub struct DigestPart { /// The alrogithm used to produce the digest pub algorithm: String, diff --git a/http-signature-normalization-actix/src/digest/sha2.rs b/http-signature-normalization-actix/src/digest/sha2.rs index d26960f..48bf3f3 100644 --- a/http-signature-normalization-actix/src/digest/sha2.rs +++ b/http-signature-normalization-actix/src/digest/sha2.rs @@ -1,5 +1,5 @@ -use log::{debug, warn}; use sha2::{Sha224, Sha256, Sha384, Sha512, Sha512Trunc224, Sha512Trunc256}; +use tracing::{debug, warn}; use super::{DigestCreate, DigestPart, DigestVerify}; diff --git a/http-signature-normalization-actix/src/digest/sha3.rs b/http-signature-normalization-actix/src/digest/sha3.rs index d27437d..decff3f 100644 --- a/http-signature-normalization-actix/src/digest/sha3.rs +++ b/http-signature-normalization-actix/src/digest/sha3.rs @@ -1,8 +1,8 @@ -use log::{debug, warn}; use sha3::{ Keccak224, Keccak256, Keccak256Full, Keccak384, Keccak512, Sha3_224, Sha3_256, Sha3_384, Sha3_512, }; +use tracing::{debug, warn}; use super::{DigestCreate, DigestPart, DigestVerify}; diff --git a/http-signature-normalization-actix/src/lib.rs b/http-signature-normalization-actix/src/lib.rs index c51dcea..c68ae8b 100644 --- a/http-signature-normalization-actix/src/lib.rs +++ b/http-signature-normalization-actix/src/lib.rs @@ -9,9 +9,9 @@ //! ### Use it in a server //! ```rust,ignore //! use actix_web::{http::StatusCode, web, App, HttpResponse, HttpServer, ResponseError}; -//! use futures::future::{err, ok, Ready}; //! use http_signature_normalization_actix::prelude::*; //! use sha2::{Digest, Sha256}; +//! use std::future::{ready, Ready}; //! //! #[derive(Clone, Debug)] //! struct MyVerify; @@ -29,19 +29,19 @@ //! ) -> Self::Future { //! match algorithm { //! Some(Algorithm::Hs2019) => (), -//! _ => return err(MyError::Algorithm), +//! _ => return ready(Err(MyError::Algorithm)), //! }; //! //! if key_id != "my-key-id" { -//! return err(MyError::Key); +//! return ready(Err(MyError::Key)); //! } //! //! let decoded = match base64::decode(&signature) { //! Ok(decoded) => decoded, -//! Err(_) => return err(MyError::Decode), +//! Err(_) => return ready(Err(MyError::Decode)), //! }; //! -//! ok(decoded == signing_string.as_bytes()) +//! ready(Ok(decoded == signing_string.as_bytes())) //! } //! } //! diff --git a/http-signature-normalization-actix/src/middleware.rs b/http-signature-normalization-actix/src/middleware.rs index 9ad1629..62f1307 100644 --- a/http-signature-normalization-actix/src/middleware.rs +++ b/http-signature-normalization-actix/src/middleware.rs @@ -6,12 +6,15 @@ use actix_web::{ http::StatusCode, Error, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, }; -use log::{debug, warn}; +use futures_util::future::LocalBoxFuture; use std::{ - future::{ready, Future, Ready}, - pin::Pin, + collections::HashSet, + future::{ready, Ready}, task::{Context, Poll}, }; +use tracing::{debug, Span}; +use tracing_error::SpanTrace; +use tracing_futures::Instrument; #[derive(Clone, Debug)] /// A marker type that can be used to guard routes when the signature middleware is set to @@ -55,10 +58,59 @@ enum HeaderKind { Signature, } +impl std::fmt::Display for HeaderKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Authorization => { + write!(f, "Authorization") + } + Self::Signature => { + write!(f, "Signature") + } + } + } +} + #[derive(Clone, Debug, thiserror::Error)] -#[error("Failed to verify http signature")] +#[error("Failed to verify http signature, {kind}")] #[doc(hidden)] -pub struct VerifyError; +pub struct VerifyError { + context: SpanTrace, + kind: VerifyErrorKind, +} + +#[derive(Clone, Debug, thiserror::Error)] +enum VerifyErrorKind { + #[error("Signature or Authorization header is missing")] + MissingSignature, + + #[error("Signature is expired")] + ExpiredSignature, + + #[error("Signature field could not be parsed")] + ParseField(&'static str), + + #[error("Signature is not a valid string")] + ParseSignature, + + #[error("Signature is invalid")] + Validate, + + #[error("Request extension not present")] + Extension, + + #[error("Required headers are missing")] + MissingHeader(HashSet), +} + +impl VerifyError { + fn new(span: &Span, kind: VerifyErrorKind) -> Self { + span.in_scope(|| VerifyError { + context: SpanTrace::capture(), + kind, + }) + } +} impl VerifySignature where @@ -97,8 +149,9 @@ where { fn handle( &self, + span: Span, req: ServiceRequest, - ) -> Pin, Error>>>> { + ) -> LocalBoxFuture<'static, Result, Error>> { let res = self.1.begin_verify( req.method(), req.uri().path_and_query(), @@ -108,24 +161,39 @@ where let unverified = match res { Ok(unverified) => unverified, Err(PrepareVerifyError::Expired) => { - warn!("Header is expired"); - return Box::pin(ready(Err(VerifyError.into()))); + return Box::pin(ready(Err(VerifyError::new( + &span, + VerifyErrorKind::ExpiredSignature, + ) + .into()))); } Err(PrepareVerifyError::Missing) => { - debug!("Header is missing"); - return Box::pin(ready(Err(VerifyError.into()))); + return Box::pin(ready(Err(VerifyError::new( + &span, + VerifyErrorKind::MissingSignature, + ) + .into()))); } Err(PrepareVerifyError::ParseField(field)) => { - debug!("Failed to parse field {}", field); - return Box::pin(ready(Err(VerifyError.into()))); + return Box::pin(ready(Err(VerifyError::new( + &span, + VerifyErrorKind::ParseField(field), + ) + .into()))); } - Err(PrepareVerifyError::Header(e)) => { - debug!("Failed to parse header {}", e); - return Box::pin(ready(Err(VerifyError.into()))); + Err(PrepareVerifyError::Header(_)) => { + return Box::pin(ready(Err(VerifyError::new( + &span, + VerifyErrorKind::ParseSignature, + ) + .into()))); } - Err(PrepareVerifyError::Required(req)) => { - debug!("Missing required headers, {:?}", req); - return Box::pin(ready(Err(VerifyError.into()))); + Err(PrepareVerifyError::Required(mut req)) => { + return Box::pin(ready(Err(VerifyError::new( + &span, + VerifyErrorKind::MissingHeader(req.take_headers()), + ) + .into()))); } }; @@ -133,12 +201,16 @@ where let key_id = unverified.key_id().to_owned(); let f1 = unverified.verify(|signature, signing_string| { - self.4.clone().signature_verify( - algorithm, - key_id.clone(), - signature.to_string(), - signing_string.to_string(), - ) + let fut = span.in_scope(|| { + self.4.clone().signature_verify( + algorithm, + key_id.clone(), + signature.to_string(), + signing_string.to_string(), + ) + }); + + fut.instrument(span.clone()) }); req.extensions_mut().insert(SignatureVerified(key_id)); @@ -146,11 +218,12 @@ where let f2 = self.0.call(req); Box::pin(async move { + let span = span; + if f1.await? { f2.await } else { - warn!("Signature is invalid"); - Err(VerifyError.into()) + Err(VerifyError::new(&span, VerifyErrorKind::Validate).into()) } }) } @@ -172,7 +245,11 @@ impl FromRequest for SignatureVerified { type Config = (); fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { - let res = req.extensions().get::().cloned().ok_or(VerifyError); + let res = req + .extensions() + .get::() + .cloned() + .ok_or_else(|| VerifyError::new(&Span::current(), VerifyErrorKind::Extension)); if res.is_err() { debug!("Failed to fetch SignatureVerified from request"); @@ -206,7 +283,6 @@ where } } -type FutResult = dyn Future>; impl Service for VerifyMiddleware where T: SignatureVerify + Clone + 'static, @@ -216,34 +292,47 @@ where { type Response = ServiceResponse; type Error = actix_web::Error; - type Future = Pin>>; + type Future = LocalBoxFuture<'static, Result>; fn poll_ready(&self, cx: &mut Context) -> Poll> { self.0.poll_ready(cx) } fn call(&self, req: ServiceRequest) -> Self::Future { + let span = tracing::info_span!( + "Signature Verification", + signature.kind = tracing::field::Empty, + signature.expected_kind = tracing::field::display(&self.2), + signature.optional = tracing::field::display(&self.3), + ); let authorization = req.headers().get("Authorization").is_some(); let signature = req.headers().get("Signature").is_some(); - if authorization || signature { - if self.2.is_authorization() && authorization { - return self.handle(req); - } + if authorization { + span.record("signature.kind", &tracing::field::display("Authorization")); - if self.2.is_signature() && signature { - return self.handle(req); + if self.2.is_authorization() { + return self.handle(span, req); } + } else if signature { + span.record("signature.kind", &tracing::field::display("Signature")); - debug!("Authorization or Signature headers are missing"); - Box::pin(ready(Err(VerifyError.into()))) - } else if self.3 { - debug!("Headers are missing but Optional is true, continuing"); - Box::pin(self.0.call(req)) + if self.2.is_signature() { + return self.handle(span, req); + } } else { - debug!("Authorization or Signature headers are missing"); - Box::pin(ready(Err(VerifyError.into()))) + span.record("signature.kind", &tracing::field::display("None")); + + if self.3 { + return Box::pin(self.0.call(req)); + } } + + Box::pin(ready(Err(VerifyError::new( + &span, + VerifyErrorKind::MissingSignature, + ) + .into()))) } }