relay/src/middleware/payload.rs

91 lines
2.5 KiB
Rust
Raw Normal View History

2020-07-10 20:07:47 +00:00
use actix_web::{
2021-04-17 17:36:47 +00:00
dev::{Body, Payload, Service, ServiceRequest, Transform},
http::{Method, StatusCode},
web::BytesMut,
2021-04-17 17:36:47 +00:00
BaseHttpResponse, HttpMessage, ResponseError,
2020-07-10 20:07:47 +00:00
};
use futures::{
future::{ok, LocalBoxFuture, Ready, TryFutureExt},
stream::{once, TryStreamExt},
2020-07-10 20:07:47 +00:00
};
2020-07-10 20:34:18 +00:00
use log::{error, info};
2020-07-10 20:07:47 +00:00
use std::task::{Context, Poll};
#[derive(Clone, Debug)]
2021-02-10 04:17:20 +00:00
pub(crate) struct DebugPayload(pub bool);
2020-07-10 20:07:47 +00:00
#[doc(hidden)]
#[derive(Clone, Debug)]
2021-02-10 04:17:20 +00:00
pub(crate) struct DebugPayloadMiddleware<S>(bool, S);
2020-07-10 20:07:47 +00:00
#[derive(Clone, Debug, thiserror::Error)]
#[error("Failed to read payload")]
2021-02-10 04:17:20 +00:00
pub(crate) struct DebugError;
2020-07-10 20:07:47 +00:00
impl ResponseError for DebugError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
2021-04-17 17:36:47 +00:00
fn error_response(&self) -> BaseHttpResponse<Body> {
BaseHttpResponse::new(self.status_code())
2020-07-10 20:07:47 +00:00
}
}
2021-02-11 00:00:11 +00:00
impl<S> Transform<S, ServiceRequest> for DebugPayload
2020-07-10 20:07:47 +00:00
where
2021-02-11 00:00:11 +00:00
S: Service<ServiceRequest, Error = actix_web::Error>,
2020-07-10 20:07:47 +00:00
S::Future: 'static,
S::Error: 'static,
{
type Response = S::Response;
type Error = S::Error;
type InitError = ();
type Transform = DebugPayloadMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(DebugPayloadMiddleware(self.0, service))
}
}
2021-02-11 00:00:11 +00:00
impl<S> Service<ServiceRequest> for DebugPayloadMiddleware<S>
2020-07-10 20:07:47 +00:00
where
2021-02-11 00:00:11 +00:00
S: Service<ServiceRequest, Error = actix_web::Error>,
2020-07-10 20:07:47 +00:00
S::Future: 'static,
S::Error: 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = LocalBoxFuture<'static, Result<S::Response, S::Error>>;
2021-02-11 00:00:11 +00:00
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
2020-07-10 20:07:47 +00:00
self.1.poll_ready(cx)
}
2021-02-11 00:00:11 +00:00
fn call(&self, mut req: ServiceRequest) -> Self::Future {
if self.0 && req.method() == Method::POST {
let pl = req.take_payload();
req.set_payload(Payload::Stream(Box::pin(once(
pl.try_fold(BytesMut::new(), |mut acc, bytes| async {
acc.extend(bytes);
Ok(acc)
})
.map_ok(|bytes| {
let bytes = bytes.freeze();
info!("{}", String::from_utf8_lossy(&bytes));
bytes
}),
))));
2020-07-10 20:07:47 +00:00
let fut = self.1.call(req);
Box::pin(async move { fut.await })
2020-07-10 20:34:18 +00:00
} else {
let fut = self.1.call(req);
2020-07-10 20:07:47 +00:00
2020-07-10 20:34:18 +00:00
Box::pin(async move { fut.await })
}
2020-07-10 20:07:47 +00:00
}
}