Instrument -actix with tracing

This commit is contained in:
Aode (lion) 2021-09-17 19:34:16 -05:00
parent f641335dfd
commit 7b9aff925a
7 changed files with 223 additions and 75 deletions

View file

@ -29,12 +29,15 @@ actix-web = { version = "4.0.0-beta.8", default-features = false }
awc = { version = "3.0.0-beta.7", default-features = false } awc = { version = "3.0.0-beta.7", default-features = false }
base64 = { version = "0.13", optional = true } base64 = { version = "0.13", optional = true }
chrono = "0.4.6" chrono = "0.4.6"
futures = "0.3" futures-util = { version = "0.3", default-features = false }
http-signature-normalization = { version = "0.5.1", path = ".." } http-signature-normalization = { version = "0.5.1", path = ".." }
log = "0.4"
sha2 = { version = "0.9", optional = true } sha2 = { version = "0.9", optional = true }
sha3 = { version = "0.9", optional = true } sha3 = { version = "0.9", optional = true }
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["sync"] }
tracing = "0.1"
tracing-error = "0.1"
tracing-futures = "0.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.1.0" actix-rt = "2.1.0"

View file

@ -7,13 +7,18 @@ use actix_web::{
http::{header::HeaderValue, StatusCode}, http::{header::HeaderValue, StatusCode},
web, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, web, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
}; };
use futures::{channel::mpsc, Stream, StreamExt}; use futures_util::{
use log::{debug, warn}; future::LocalBoxFuture,
stream::{Stream, StreamExt},
};
use std::{ use std::{
future::{ready, Future, Ready}, future::{ready, Ready},
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::sync::mpsc;
use tracing::{debug, Span};
use tracing_error::SpanTrace;
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
/// A type implementing FromRequest that can be used in route handler to guard for verified /// A type implementing FromRequest that can be used in route handler to guard for verified
@ -44,7 +49,47 @@ pub struct VerifyMiddleware<T, S>(S, bool, T);
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[error("Error verifying digest")] #[error("Error verifying digest")]
#[doc(hidden)] #[doc(hidden)]
pub struct VerifyError; pub struct VerifyError {
context: SpanTrace,
kind: VerifyErrorKind,
}
impl VerifyError {
fn new(span: &Span, kind: VerifyErrorKind) -> Self {
span.in_scope(|| VerifyError {
context: SpanTrace::capture(),
kind,
})
}
}
#[derive(Debug, thiserror::Error)]
enum VerifyErrorKind {
#[error("Missing request extension")]
Extension,
#[error("Digest header missing")]
MissingDigest,
#[error("Digest header is empty")]
Empty,
#[error("Failed to verify digest")]
Verify,
#[error("Payload dropped. If this was unexpected, it could be that the payload isn't required in the route this middleware is guarding")]
Dropped,
}
struct RxStream<T>(mpsc::Receiver<T>);
impl<T> Stream for RxStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_recv(cx)
}
}
impl<T> VerifyDigest<T> impl<T> VerifyDigest<T>
where where
@ -70,7 +115,11 @@ impl FromRequest for DigestVerified {
type Config = (); type Config = ();
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let res = req.extensions().get::<Self>().copied().ok_or(VerifyError); let res = req
.extensions()
.get::<Self>()
.copied()
.ok_or_else(|| VerifyError::new(&Span::current(), VerifyErrorKind::Extension));
if res.is_err() { if res.is_err() {
debug!("Failed to fetch DigestVerified from request"); debug!("Failed to fetch DigestVerified from request");
@ -98,8 +147,6 @@ where
} }
} }
type FutResult<T, E> = dyn Future<Output = Result<T, E>>;
impl<T, S, B> Service<ServiceRequest> for VerifyMiddleware<T, S> impl<T, S, B> Service<ServiceRequest> for VerifyMiddleware<T, S>
where where
T: DigestVerify + Clone + Send + 'static, T: DigestVerify + Clone + Send + 'static,
@ -109,50 +156,61 @@ where
{ {
type Response = ServiceResponse<B>; type Response = ServiceResponse<B>;
type Error = actix_web::Error; type Error = actix_web::Error;
type Future = Pin<Box<FutResult<Self::Response, Self::Error>>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, cx: &mut Context) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx) self.0.poll_ready(cx)
} }
fn call(&self, mut req: ServiceRequest) -> Self::Future { fn call(&self, mut req: ServiceRequest) -> Self::Future {
let span = tracing::info_span!(
"Verify digest",
digest.required = tracing::field::display(&self.1),
);
if let Some(digest) = req.headers().get("Digest") { if let Some(digest) = req.headers().get("Digest") {
let vec = match parse_digest(digest) { let vec = match parse_digest(digest) {
Some(vec) => vec, Some(vec) => vec,
None => { None => {
warn!("Digest header could not be parsed"); return Box::pin(ready(Err(
return Box::pin(ready(Err(VerifyError.into()))); VerifyError::new(&span, VerifyErrorKind::Empty).into()
)));
} }
}; };
let payload = req.take_payload(); let payload = req.take_payload();
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(1);
let f1 = verify_payload(vec, self.2.clone(), payload, tx); let f1 = span.in_scope(|| verify_payload(vec, self.2.clone(), payload, tx));
let payload: Pin<Box<dyn Stream<Item = Result<web::Bytes, PayloadError>> + 'static>> = let payload: Pin<Box<dyn Stream<Item = Result<web::Bytes, PayloadError>> + 'static>> =
Box::pin(rx.map(Ok)); Box::pin(RxStream(rx).map(Ok));
req.set_payload(payload.into()); req.set_payload(payload.into());
req.extensions_mut().insert(DigestVerified); req.extensions_mut().insert(DigestVerified);
let f2 = self.0.call(req); let f2 = self.0.call(req);
Box::pin(async move { Box::pin(async move {
f1.await?; let (_, res) = futures_util::future::join(f1, f2).await;
f2.await res
}) })
} else if self.1 { } else if self.1 {
Box::pin(ready(Err(VerifyError.into()))) Box::pin(ready(Err(VerifyError::new(
&span,
VerifyErrorKind::MissingDigest,
)
.into())))
} else { } else {
Box::pin(self.0.call(req)) Box::pin(self.0.call(req))
} }
} }
} }
#[tracing::instrument(name = "Verify Payload", skip(verify_digest, payload, tx))]
async fn verify_payload<T>( async fn verify_payload<T>(
vec: Vec<DigestPart>, vec: Vec<DigestPart>,
mut verify_digest: T, mut verify_digest: T,
mut payload: Payload, mut payload: Payload,
mut tx: mpsc::Sender<web::Bytes>, tx: mpsc::Sender<web::Bytes>,
) -> Result<(), actix_web::Error> ) -> Result<(), actix_web::Error>
where where
T: DigestVerify + Clone + Send + 'static, T: DigestVerify + Clone + Send + 'static,
@ -161,16 +219,14 @@ where
let bytes = res?; let bytes = res?;
let bytes2 = bytes.clone(); let bytes2 = bytes.clone();
verify_digest = web::block(move || { verify_digest = web::block(move || {
verify_digest.update(&bytes2.as_ref()); verify_digest.update(bytes2.as_ref());
Ok(verify_digest) as Result<T, VerifyError> Ok(verify_digest) as Result<T, VerifyError>
}) })
.await??; .await??;
if tx.is_closed() { tx.send(bytes)
warn!("Payload dropped. If this was unexpected, it could be that the payload isn't required in the route this middleware is guarding"); .await
} .map_err(|_| VerifyError::new(&Span::current(), VerifyErrorKind::Dropped))?;
tx.try_send(bytes).map_err(|_| VerifyError)?;
} }
let verified = let verified =
@ -179,8 +235,7 @@ where
if verified { if verified {
Ok(()) Ok(())
} else { } else {
warn!("Digest could not be verified"); Err(VerifyError::new(&Span::current(), VerifyErrorKind::Verify).into())
Err(VerifyError.into())
} }
} }

View file

@ -87,6 +87,7 @@ pub trait SignExt: Sign {
} }
/// A parsed digest from the request /// A parsed digest from the request
#[derive(Debug)]
pub struct DigestPart { pub struct DigestPart {
/// The alrogithm used to produce the digest /// The alrogithm used to produce the digest
pub algorithm: String, pub algorithm: String,

View file

@ -1,5 +1,5 @@
use log::{debug, warn};
use sha2::{Sha224, Sha256, Sha384, Sha512, Sha512Trunc224, Sha512Trunc256}; use sha2::{Sha224, Sha256, Sha384, Sha512, Sha512Trunc224, Sha512Trunc256};
use tracing::{debug, warn};
use super::{DigestCreate, DigestPart, DigestVerify}; use super::{DigestCreate, DigestPart, DigestVerify};

View file

@ -1,8 +1,8 @@
use log::{debug, warn};
use sha3::{ use sha3::{
Keccak224, Keccak256, Keccak256Full, Keccak384, Keccak512, Sha3_224, Sha3_256, Sha3_384, Keccak224, Keccak256, Keccak256Full, Keccak384, Keccak512, Sha3_224, Sha3_256, Sha3_384,
Sha3_512, Sha3_512,
}; };
use tracing::{debug, warn};
use super::{DigestCreate, DigestPart, DigestVerify}; use super::{DigestCreate, DigestPart, DigestVerify};

View file

@ -9,9 +9,9 @@
//! ### Use it in a server //! ### Use it in a server
//! ```rust,ignore //! ```rust,ignore
//! use actix_web::{http::StatusCode, web, App, HttpResponse, HttpServer, ResponseError}; //! use actix_web::{http::StatusCode, web, App, HttpResponse, HttpServer, ResponseError};
//! use futures::future::{err, ok, Ready};
//! use http_signature_normalization_actix::prelude::*; //! use http_signature_normalization_actix::prelude::*;
//! use sha2::{Digest, Sha256}; //! use sha2::{Digest, Sha256};
//! use std::future::{ready, Ready};
//! //!
//! #[derive(Clone, Debug)] //! #[derive(Clone, Debug)]
//! struct MyVerify; //! struct MyVerify;
@ -29,19 +29,19 @@
//! ) -> Self::Future { //! ) -> Self::Future {
//! match algorithm { //! match algorithm {
//! Some(Algorithm::Hs2019) => (), //! Some(Algorithm::Hs2019) => (),
//! _ => return err(MyError::Algorithm), //! _ => return ready(Err(MyError::Algorithm)),
//! }; //! };
//! //!
//! if key_id != "my-key-id" { //! if key_id != "my-key-id" {
//! return err(MyError::Key); //! return ready(Err(MyError::Key));
//! } //! }
//! //!
//! let decoded = match base64::decode(&signature) { //! let decoded = match base64::decode(&signature) {
//! Ok(decoded) => decoded, //! Ok(decoded) => decoded,
//! Err(_) => return err(MyError::Decode), //! Err(_) => return ready(Err(MyError::Decode)),
//! }; //! };
//! //!
//! ok(decoded == signing_string.as_bytes()) //! ready(Ok(decoded == signing_string.as_bytes()))
//! } //! }
//! } //! }
//! //!

View file

@ -6,12 +6,15 @@ use actix_web::{
http::StatusCode, http::StatusCode,
Error, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, Error, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
}; };
use log::{debug, warn}; use futures_util::future::LocalBoxFuture;
use std::{ use std::{
future::{ready, Future, Ready}, collections::HashSet,
pin::Pin, future::{ready, Ready},
task::{Context, Poll}, task::{Context, Poll},
}; };
use tracing::{debug, Span};
use tracing_error::SpanTrace;
use tracing_futures::Instrument;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// A marker type that can be used to guard routes when the signature middleware is set to /// A marker type that can be used to guard routes when the signature middleware is set to
@ -55,10 +58,59 @@ enum HeaderKind {
Signature, Signature,
} }
impl std::fmt::Display for HeaderKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Authorization => {
write!(f, "Authorization")
}
Self::Signature => {
write!(f, "Signature")
}
}
}
}
#[derive(Clone, Debug, thiserror::Error)] #[derive(Clone, Debug, thiserror::Error)]
#[error("Failed to verify http signature")] #[error("Failed to verify http signature, {kind}")]
#[doc(hidden)] #[doc(hidden)]
pub struct VerifyError; pub struct VerifyError {
context: SpanTrace,
kind: VerifyErrorKind,
}
#[derive(Clone, Debug, thiserror::Error)]
enum VerifyErrorKind {
#[error("Signature or Authorization header is missing")]
MissingSignature,
#[error("Signature is expired")]
ExpiredSignature,
#[error("Signature field could not be parsed")]
ParseField(&'static str),
#[error("Signature is not a valid string")]
ParseSignature,
#[error("Signature is invalid")]
Validate,
#[error("Request extension not present")]
Extension,
#[error("Required headers are missing")]
MissingHeader(HashSet<String>),
}
impl VerifyError {
fn new(span: &Span, kind: VerifyErrorKind) -> Self {
span.in_scope(|| VerifyError {
context: SpanTrace::capture(),
kind,
})
}
}
impl<T> VerifySignature<T> impl<T> VerifySignature<T>
where where
@ -97,8 +149,9 @@ where
{ {
fn handle( fn handle(
&self, &self,
span: Span,
req: ServiceRequest, req: ServiceRequest,
) -> Pin<Box<dyn Future<Output = Result<ServiceResponse<B>, Error>>>> { ) -> LocalBoxFuture<'static, Result<ServiceResponse<B>, Error>> {
let res = self.1.begin_verify( let res = self.1.begin_verify(
req.method(), req.method(),
req.uri().path_and_query(), req.uri().path_and_query(),
@ -108,24 +161,39 @@ where
let unverified = match res { let unverified = match res {
Ok(unverified) => unverified, Ok(unverified) => unverified,
Err(PrepareVerifyError::Expired) => { Err(PrepareVerifyError::Expired) => {
warn!("Header is expired"); return Box::pin(ready(Err(VerifyError::new(
return Box::pin(ready(Err(VerifyError.into()))); &span,
VerifyErrorKind::ExpiredSignature,
)
.into())));
} }
Err(PrepareVerifyError::Missing) => { Err(PrepareVerifyError::Missing) => {
debug!("Header is missing"); return Box::pin(ready(Err(VerifyError::new(
return Box::pin(ready(Err(VerifyError.into()))); &span,
VerifyErrorKind::MissingSignature,
)
.into())));
} }
Err(PrepareVerifyError::ParseField(field)) => { Err(PrepareVerifyError::ParseField(field)) => {
debug!("Failed to parse field {}", field); return Box::pin(ready(Err(VerifyError::new(
return Box::pin(ready(Err(VerifyError.into()))); &span,
VerifyErrorKind::ParseField(field),
)
.into())));
} }
Err(PrepareVerifyError::Header(e)) => { Err(PrepareVerifyError::Header(_)) => {
debug!("Failed to parse header {}", e); return Box::pin(ready(Err(VerifyError::new(
return Box::pin(ready(Err(VerifyError.into()))); &span,
VerifyErrorKind::ParseSignature,
)
.into())));
} }
Err(PrepareVerifyError::Required(req)) => { Err(PrepareVerifyError::Required(mut req)) => {
debug!("Missing required headers, {:?}", req); return Box::pin(ready(Err(VerifyError::new(
return Box::pin(ready(Err(VerifyError.into()))); &span,
VerifyErrorKind::MissingHeader(req.take_headers()),
)
.into())));
} }
}; };
@ -133,12 +201,16 @@ where
let key_id = unverified.key_id().to_owned(); let key_id = unverified.key_id().to_owned();
let f1 = unverified.verify(|signature, signing_string| { let f1 = unverified.verify(|signature, signing_string| {
self.4.clone().signature_verify( let fut = span.in_scope(|| {
algorithm, self.4.clone().signature_verify(
key_id.clone(), algorithm,
signature.to_string(), key_id.clone(),
signing_string.to_string(), signature.to_string(),
) signing_string.to_string(),
)
});
fut.instrument(span.clone())
}); });
req.extensions_mut().insert(SignatureVerified(key_id)); req.extensions_mut().insert(SignatureVerified(key_id));
@ -146,11 +218,12 @@ where
let f2 = self.0.call(req); let f2 = self.0.call(req);
Box::pin(async move { Box::pin(async move {
let span = span;
if f1.await? { if f1.await? {
f2.await f2.await
} else { } else {
warn!("Signature is invalid"); Err(VerifyError::new(&span, VerifyErrorKind::Validate).into())
Err(VerifyError.into())
} }
}) })
} }
@ -172,7 +245,11 @@ impl FromRequest for SignatureVerified {
type Config = (); type Config = ();
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let res = req.extensions().get::<Self>().cloned().ok_or(VerifyError); let res = req
.extensions()
.get::<Self>()
.cloned()
.ok_or_else(|| VerifyError::new(&Span::current(), VerifyErrorKind::Extension));
if res.is_err() { if res.is_err() {
debug!("Failed to fetch SignatureVerified from request"); debug!("Failed to fetch SignatureVerified from request");
@ -206,7 +283,6 @@ where
} }
} }
type FutResult<T, E> = dyn Future<Output = Result<T, E>>;
impl<T, S, B> Service<ServiceRequest> for VerifyMiddleware<T, S> impl<T, S, B> Service<ServiceRequest> for VerifyMiddleware<T, S>
where where
T: SignatureVerify + Clone + 'static, T: SignatureVerify + Clone + 'static,
@ -216,34 +292,47 @@ where
{ {
type Response = ServiceResponse<B>; type Response = ServiceResponse<B>;
type Error = actix_web::Error; type Error = actix_web::Error;
type Future = Pin<Box<FutResult<Self::Response, Self::Error>>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, cx: &mut Context) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx) self.0.poll_ready(cx)
} }
fn call(&self, req: ServiceRequest) -> Self::Future { fn call(&self, req: ServiceRequest) -> Self::Future {
let span = tracing::info_span!(
"Signature Verification",
signature.kind = tracing::field::Empty,
signature.expected_kind = tracing::field::display(&self.2),
signature.optional = tracing::field::display(&self.3),
);
let authorization = req.headers().get("Authorization").is_some(); let authorization = req.headers().get("Authorization").is_some();
let signature = req.headers().get("Signature").is_some(); let signature = req.headers().get("Signature").is_some();
if authorization || signature { if authorization {
if self.2.is_authorization() && authorization { span.record("signature.kind", &tracing::field::display("Authorization"));
return self.handle(req);
}
if self.2.is_signature() && signature { if self.2.is_authorization() {
return self.handle(req); return self.handle(span, req);
} }
} else if signature {
span.record("signature.kind", &tracing::field::display("Signature"));
debug!("Authorization or Signature headers are missing"); if self.2.is_signature() {
Box::pin(ready(Err(VerifyError.into()))) return self.handle(span, req);
} else if self.3 { }
debug!("Headers are missing but Optional is true, continuing");
Box::pin(self.0.call(req))
} else { } else {
debug!("Authorization or Signature headers are missing"); span.record("signature.kind", &tracing::field::display("None"));
Box::pin(ready(Err(VerifyError.into())))
if self.3 {
return Box::pin(self.0.call(req));
}
} }
Box::pin(ready(Err(VerifyError::new(
&span,
VerifyErrorKind::MissingSignature,
)
.into())))
} }
} }