diff --git a/Cargo.toml b/Cargo.toml index 4bd5e9d..87f9a22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,4 +96,4 @@ webpki-roots = "0.26.0" [dependencies.tracing-actix-web] version = "0.7.10" default-features = false -features = ["emit_event_on_error", "opentelemetry_0_22"] +features = ["opentelemetry_0_22"] diff --git a/src/lib.rs b/src/lib.rs index afb5bc0..dbf54fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,18 +42,12 @@ use actix_web::{ http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES}, web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, }; -use details::{ApiDetails, HumanDate}; -use future::{WithPollTimer, WithTimeout}; use futures_core::Stream; -use magick::ArcPolicyDir; use metrics_exporter_prometheus::PrometheusBuilder; -use middleware::{Metrics, Payload}; -use repo::ArcRepo; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; use rustls_channel_resolver::ChannelSender; use rusty_s3::UrlStyle; -use state::State; use std::{ marker::PhantomData, path::Path, @@ -62,8 +56,6 @@ use std::{ time::{Duration, SystemTime}, }; use streem::IntoStreamer; -use sync::DropHandle; -use tmp_file::{ArcTmpDir, TmpDir}; use tokio::sync::Semaphore; use tracing::Instrument; use tracing_actix_web::TracingLogger; @@ -71,20 +63,25 @@ use tracing_actix_web::TracingLogger; use self::{ backgrounded::Backgrounded, config::{Configuration, Operation}, - details::Details, + details::{ApiDetails, Details, HumanDate}, either::Either, error::{Error, UploadError}, formats::InputProcessableFormat, + future::{WithPollTimer, WithTimeout}, ingest::Session, init_tracing::init_tracing, - middleware::{Deadline, Internal}, + magick::ArcPolicyDir, + middleware::{Deadline, Internal, Log, Metrics, Payload}, migrate_store::migrate_store, queue::queue_generate, - repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult}, + repo::{sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, Repo, UploadId, UploadResult}, serde_str::Serde, + state::State, store::{file_store::FileStore, object_store::ObjectStore, Store}, stream::empty, + sync::DropHandle, tls::Tls, + tmp_file::{ArcTmpDir, TmpDir}, }; pub use self::config::{ConfigSource, PictRsConfiguration}; @@ -1744,6 +1741,7 @@ async fn launch< spawn_workers(state.clone()); App::new() + .wrap(Log) .wrap(TracingLogger::default()) .wrap(Deadline) .wrap(Metrics) diff --git a/src/middleware.rs b/src/middleware.rs index 9d3d179..37a644d 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,9 +1,11 @@ mod deadline; mod internal; +mod log; mod metrics; mod payload; pub(crate) use self::deadline::Deadline; pub(crate) use self::internal::Internal; +pub(crate) use self::log::Log; pub(crate) use self::metrics::Metrics; pub(crate) use self::payload::Payload; diff --git a/src/middleware/log.rs b/src/middleware/log.rs new file mode 100644 index 0000000..bfd4b92 --- /dev/null +++ b/src/middleware/log.rs @@ -0,0 +1,185 @@ +use std::future::{ready, Future, Ready}; + +use actix_web::{ + body::MessageBody, + dev::{Service, ServiceRequest, ServiceResponse, Transform}, + http::StatusCode, + ResponseError, +}; + +pub(crate) struct Log; +pub(crate) struct LogMiddleware { + inner: S, +} + +#[derive(Debug)] +pub(crate) struct LogError(actix_web::Error); + +pin_project_lite::pin_project! { + pub(crate) struct LogFuture { + #[pin] + inner: F, + } +} + +pin_project_lite::pin_project! { + pub(crate) struct LogBody { + status: Option, + + #[pin] + inner: B, + } +} + +impl Transform for Log +where + B: MessageBody, + S: Service>, + S::Future: 'static, + S::Error: Into, +{ + type Response = ServiceResponse>; + type Error = actix_web::Error; + type InitError = (); + type Transform = LogMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(LogMiddleware { inner: service })) + } +} + +impl Service for LogMiddleware +where + B: MessageBody, + S: Service>, + S::Future: 'static, + S::Error: Into, +{ + type Response = ServiceResponse>; + type Error = actix_web::Error; + type Future = LogFuture; + + fn poll_ready( + &self, + ctx: &mut core::task::Context<'_>, + ) -> std::task::Poll> { + self.inner + .poll_ready(ctx) + .map(|res| res.map_err(|e| LogError(e.into()).into())) + } + + fn call(&self, req: ServiceRequest) -> Self::Future { + LogFuture { + inner: self.inner.call(req), + } + } +} + +impl Future for LogFuture +where + B: MessageBody, + F: Future, E>>, + E: Into, +{ + type Output = Result>, actix_web::Error>; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + + std::task::Poll::Ready(match std::task::ready!(this.inner.poll(cx)) { + Ok(response) => { + let status = response.status(); + + let status = if response.response().body().size().is_eof() { + emit(status); + None + } else { + Some(status) + }; + + Ok(response.map_body(|_, inner| LogBody { status, inner })) + } + Err(e) => Err(LogError(e.into()).into()), + }) + } +} + +impl MessageBody for LogBody +where + B: MessageBody, +{ + type Error = B::Error; + + fn size(&self) -> actix_web::body::BodySize { + self.inner.size() + } + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> { + let this = self.project(); + + let opt = std::task::ready!(this.inner.poll_next(cx)); + + if opt.is_none() { + if let Some(status) = this.status.take() { + emit(status); + } + } + + std::task::Poll::Ready(opt) + } +} + +impl std::fmt::Display for LogError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::error::Error for LogError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.0.source() + } +} + +impl ResponseError for LogError { + fn status_code(&self) -> actix_web::http::StatusCode { + self.0.as_response_error().status_code() + } + + fn error_response(&self) -> actix_web::HttpResponse { + let response = self.0.error_response(); + let status = response.status(); + + if response.body().size().is_eof() { + emit(status); + response + } else { + response.map_body(|_, inner| { + LogBody { + status: Some(status), + inner, + } + .boxed() + }) + } + } +} + +fn emit(status: StatusCode) { + if status.is_server_error() { + tracing::error!("server error"); + } else if status.is_client_error() { + tracing::warn!("client error"); + } else if status.is_redirection() { + tracing::info!("redirected"); + } else { + tracing::info!("completed"); + } +}