mirror of
https://git.asonix.dog/asonix/pict-rs.git
synced 2025-02-16 05:15:15 +00:00
Log for completed requests
This commit is contained in:
parent
8cf8b2bc05
commit
286bc8b97a
4 changed files with 197 additions and 12 deletions
|
@ -96,4 +96,4 @@ webpki-roots = "0.26.0"
|
||||||
[dependencies.tracing-actix-web]
|
[dependencies.tracing-actix-web]
|
||||||
version = "0.7.10"
|
version = "0.7.10"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = ["emit_event_on_error", "opentelemetry_0_22"]
|
features = ["opentelemetry_0_22"]
|
||||||
|
|
20
src/lib.rs
20
src/lib.rs
|
@ -42,18 +42,12 @@ use actix_web::{
|
||||||
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
|
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
|
||||||
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
|
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
|
||||||
};
|
};
|
||||||
use details::{ApiDetails, HumanDate};
|
|
||||||
use future::{WithPollTimer, WithTimeout};
|
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use magick::ArcPolicyDir;
|
|
||||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
use middleware::{Metrics, Payload};
|
|
||||||
use repo::ArcRepo;
|
|
||||||
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
|
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
|
||||||
use reqwest_tracing::TracingMiddleware;
|
use reqwest_tracing::TracingMiddleware;
|
||||||
use rustls_channel_resolver::ChannelSender;
|
use rustls_channel_resolver::ChannelSender;
|
||||||
use rusty_s3::UrlStyle;
|
use rusty_s3::UrlStyle;
|
||||||
use state::State;
|
|
||||||
use std::{
|
use std::{
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
path::Path,
|
path::Path,
|
||||||
|
@ -62,8 +56,6 @@ use std::{
|
||||||
time::{Duration, SystemTime},
|
time::{Duration, SystemTime},
|
||||||
};
|
};
|
||||||
use streem::IntoStreamer;
|
use streem::IntoStreamer;
|
||||||
use sync::DropHandle;
|
|
||||||
use tmp_file::{ArcTmpDir, TmpDir};
|
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
use tracing::Instrument;
|
use tracing::Instrument;
|
||||||
use tracing_actix_web::TracingLogger;
|
use tracing_actix_web::TracingLogger;
|
||||||
|
@ -71,20 +63,25 @@ use tracing_actix_web::TracingLogger;
|
||||||
use self::{
|
use self::{
|
||||||
backgrounded::Backgrounded,
|
backgrounded::Backgrounded,
|
||||||
config::{Configuration, Operation},
|
config::{Configuration, Operation},
|
||||||
details::Details,
|
details::{ApiDetails, Details, HumanDate},
|
||||||
either::Either,
|
either::Either,
|
||||||
error::{Error, UploadError},
|
error::{Error, UploadError},
|
||||||
formats::InputProcessableFormat,
|
formats::InputProcessableFormat,
|
||||||
|
future::{WithPollTimer, WithTimeout},
|
||||||
ingest::Session,
|
ingest::Session,
|
||||||
init_tracing::init_tracing,
|
init_tracing::init_tracing,
|
||||||
middleware::{Deadline, Internal},
|
magick::ArcPolicyDir,
|
||||||
|
middleware::{Deadline, Internal, Log, Metrics, Payload},
|
||||||
migrate_store::migrate_store,
|
migrate_store::migrate_store,
|
||||||
queue::queue_generate,
|
queue::queue_generate,
|
||||||
repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult},
|
repo::{sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, Repo, UploadId, UploadResult},
|
||||||
serde_str::Serde,
|
serde_str::Serde,
|
||||||
|
state::State,
|
||||||
store::{file_store::FileStore, object_store::ObjectStore, Store},
|
store::{file_store::FileStore, object_store::ObjectStore, Store},
|
||||||
stream::empty,
|
stream::empty,
|
||||||
|
sync::DropHandle,
|
||||||
tls::Tls,
|
tls::Tls,
|
||||||
|
tmp_file::{ArcTmpDir, TmpDir},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use self::config::{ConfigSource, PictRsConfiguration};
|
pub use self::config::{ConfigSource, PictRsConfiguration};
|
||||||
|
@ -1744,6 +1741,7 @@ async fn launch<
|
||||||
spawn_workers(state.clone());
|
spawn_workers(state.clone());
|
||||||
|
|
||||||
App::new()
|
App::new()
|
||||||
|
.wrap(Log)
|
||||||
.wrap(TracingLogger::default())
|
.wrap(TracingLogger::default())
|
||||||
.wrap(Deadline)
|
.wrap(Deadline)
|
||||||
.wrap(Metrics)
|
.wrap(Metrics)
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
mod deadline;
|
mod deadline;
|
||||||
mod internal;
|
mod internal;
|
||||||
|
mod log;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
mod payload;
|
mod payload;
|
||||||
|
|
||||||
pub(crate) use self::deadline::Deadline;
|
pub(crate) use self::deadline::Deadline;
|
||||||
pub(crate) use self::internal::Internal;
|
pub(crate) use self::internal::Internal;
|
||||||
|
pub(crate) use self::log::Log;
|
||||||
pub(crate) use self::metrics::Metrics;
|
pub(crate) use self::metrics::Metrics;
|
||||||
pub(crate) use self::payload::Payload;
|
pub(crate) use self::payload::Payload;
|
||||||
|
|
185
src/middleware/log.rs
Normal file
185
src/middleware/log.rs
Normal file
|
@ -0,0 +1,185 @@
|
||||||
|
use std::future::{ready, Future, Ready};
|
||||||
|
|
||||||
|
use actix_web::{
|
||||||
|
body::MessageBody,
|
||||||
|
dev::{Service, ServiceRequest, ServiceResponse, Transform},
|
||||||
|
http::StatusCode,
|
||||||
|
ResponseError,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub(crate) struct Log;
|
||||||
|
pub(crate) struct LogMiddleware<S> {
|
||||||
|
inner: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct LogError(actix_web::Error);
|
||||||
|
|
||||||
|
pin_project_lite::pin_project! {
|
||||||
|
pub(crate) struct LogFuture<F> {
|
||||||
|
#[pin]
|
||||||
|
inner: F,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pin_project_lite::pin_project! {
|
||||||
|
pub(crate) struct LogBody<B> {
|
||||||
|
status: Option<StatusCode>,
|
||||||
|
|
||||||
|
#[pin]
|
||||||
|
inner: B,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, B> Transform<S, ServiceRequest> for Log
|
||||||
|
where
|
||||||
|
B: MessageBody,
|
||||||
|
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
|
||||||
|
S::Future: 'static,
|
||||||
|
S::Error: Into<actix_web::Error>,
|
||||||
|
{
|
||||||
|
type Response = ServiceResponse<LogBody<B>>;
|
||||||
|
type Error = actix_web::Error;
|
||||||
|
type InitError = ();
|
||||||
|
type Transform = LogMiddleware<S>;
|
||||||
|
type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
||||||
|
|
||||||
|
fn new_transform(&self, service: S) -> Self::Future {
|
||||||
|
ready(Ok(LogMiddleware { inner: service }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, B> Service<ServiceRequest> for LogMiddleware<S>
|
||||||
|
where
|
||||||
|
B: MessageBody,
|
||||||
|
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
|
||||||
|
S::Future: 'static,
|
||||||
|
S::Error: Into<actix_web::Error>,
|
||||||
|
{
|
||||||
|
type Response = ServiceResponse<LogBody<B>>;
|
||||||
|
type Error = actix_web::Error;
|
||||||
|
type Future = LogFuture<S::Future>;
|
||||||
|
|
||||||
|
fn poll_ready(
|
||||||
|
&self,
|
||||||
|
ctx: &mut core::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||||
|
self.inner
|
||||||
|
.poll_ready(ctx)
|
||||||
|
.map(|res| res.map_err(|e| LogError(e.into()).into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&self, req: ServiceRequest) -> Self::Future {
|
||||||
|
LogFuture {
|
||||||
|
inner: self.inner.call(req),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F, B, E> Future for LogFuture<F>
|
||||||
|
where
|
||||||
|
B: MessageBody,
|
||||||
|
F: Future<Output = Result<ServiceResponse<B>, E>>,
|
||||||
|
E: Into<actix_web::Error>,
|
||||||
|
{
|
||||||
|
type Output = Result<ServiceResponse<LogBody<B>>, actix_web::Error>;
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Self::Output> {
|
||||||
|
let this = self.project();
|
||||||
|
|
||||||
|
std::task::Poll::Ready(match std::task::ready!(this.inner.poll(cx)) {
|
||||||
|
Ok(response) => {
|
||||||
|
let status = response.status();
|
||||||
|
|
||||||
|
let status = if response.response().body().size().is_eof() {
|
||||||
|
emit(status);
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(status)
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(response.map_body(|_, inner| LogBody { status, inner }))
|
||||||
|
}
|
||||||
|
Err(e) => Err(LogError(e.into()).into()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> MessageBody for LogBody<B>
|
||||||
|
where
|
||||||
|
B: MessageBody,
|
||||||
|
{
|
||||||
|
type Error = B::Error;
|
||||||
|
|
||||||
|
fn size(&self) -> actix_web::body::BodySize {
|
||||||
|
self.inner.size()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Option<Result<actix_web::web::Bytes, Self::Error>>> {
|
||||||
|
let this = self.project();
|
||||||
|
|
||||||
|
let opt = std::task::ready!(this.inner.poll_next(cx));
|
||||||
|
|
||||||
|
if opt.is_none() {
|
||||||
|
if let Some(status) = this.status.take() {
|
||||||
|
emit(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::task::Poll::Ready(opt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for LogError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
self.0.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for LogError {
|
||||||
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
|
self.0.source()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ResponseError for LogError {
|
||||||
|
fn status_code(&self) -> actix_web::http::StatusCode {
|
||||||
|
self.0.as_response_error().status_code()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
|
||||||
|
let response = self.0.error_response();
|
||||||
|
let status = response.status();
|
||||||
|
|
||||||
|
if response.body().size().is_eof() {
|
||||||
|
emit(status);
|
||||||
|
response
|
||||||
|
} else {
|
||||||
|
response.map_body(|_, inner| {
|
||||||
|
LogBody {
|
||||||
|
status: Some(status),
|
||||||
|
inner,
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn emit(status: StatusCode) {
|
||||||
|
if status.is_server_error() {
|
||||||
|
tracing::error!("server error");
|
||||||
|
} else if status.is_client_error() {
|
||||||
|
tracing::warn!("client error");
|
||||||
|
} else if status.is_redirection() {
|
||||||
|
tracing::info!("redirected");
|
||||||
|
} else {
|
||||||
|
tracing::info!("completed");
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue