2020-07-10 20:07:47 +00:00
|
|
|
use actix_web::{
|
|
|
|
dev::{Payload, Service, ServiceRequest, Transform},
|
|
|
|
http::StatusCode,
|
2020-12-29 17:27:14 +00:00
|
|
|
web::BytesMut,
|
2020-07-10 20:07:47 +00:00
|
|
|
HttpMessage, HttpResponse, ResponseError,
|
|
|
|
};
|
|
|
|
use futures::{
|
2020-12-29 17:27:14 +00:00
|
|
|
channel::mpsc::channel,
|
2020-07-10 20:34:18 +00:00
|
|
|
future::{ok, try_join, LocalBoxFuture, Ready},
|
2020-12-29 17:27:14 +00:00
|
|
|
sink::SinkExt,
|
2020-07-10 20:07:47 +00:00
|
|
|
stream::StreamExt,
|
|
|
|
};
|
2020-07-10 20:34:18 +00:00
|
|
|
use log::{error, info};
|
2020-07-10 20:07:47 +00:00
|
|
|
use std::task::{Context, Poll};
|
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) struct DebugPayload(pub bool);
|
2020-07-10 20:07:47 +00:00
|
|
|
|
|
|
|
#[doc(hidden)]
|
|
|
|
#[derive(Clone, Debug)]
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) struct DebugPayloadMiddleware<S>(bool, S);
|
2020-07-10 20:07:47 +00:00
|
|
|
|
|
|
|
#[derive(Clone, Debug, thiserror::Error)]
|
|
|
|
#[error("Failed to read payload")]
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) struct DebugError;
|
2020-07-10 20:07:47 +00:00
|
|
|
|
|
|
|
impl ResponseError for DebugError {
|
|
|
|
fn status_code(&self) -> StatusCode {
|
|
|
|
StatusCode::BAD_REQUEST
|
|
|
|
}
|
|
|
|
|
|
|
|
fn error_response(&self) -> HttpResponse {
|
|
|
|
HttpResponse::new(self.status_code())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S> Transform<S> for DebugPayload
|
|
|
|
where
|
|
|
|
S: Service<Request = ServiceRequest, Error = actix_web::Error>,
|
|
|
|
S::Future: 'static,
|
|
|
|
S::Error: 'static,
|
|
|
|
{
|
|
|
|
type Request = S::Request;
|
|
|
|
type Response = S::Response;
|
|
|
|
type Error = S::Error;
|
|
|
|
type InitError = ();
|
|
|
|
type Transform = DebugPayloadMiddleware<S>;
|
|
|
|
type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
|
|
|
|
|
|
|
fn new_transform(&self, service: S) -> Self::Future {
|
|
|
|
ok(DebugPayloadMiddleware(self.0, service))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S> Service for DebugPayloadMiddleware<S>
|
|
|
|
where
|
|
|
|
S: Service<Request = ServiceRequest, Error = actix_web::Error>,
|
|
|
|
S::Future: 'static,
|
|
|
|
S::Error: 'static,
|
|
|
|
{
|
|
|
|
type Request = S::Request;
|
|
|
|
type Response = S::Response;
|
|
|
|
type Error = S::Error;
|
|
|
|
type Future = LocalBoxFuture<'static, Result<S::Response, S::Error>>;
|
|
|
|
|
|
|
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
self.1.poll_ready(cx)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, mut req: S::Request) -> Self::Future {
|
|
|
|
if self.0 {
|
2020-12-29 17:27:14 +00:00
|
|
|
let (mut tx, rx) = channel(0);
|
2020-07-10 20:07:47 +00:00
|
|
|
|
|
|
|
let mut pl = req.take_payload();
|
|
|
|
req.set_payload(Payload::Stream(Box::pin(rx)));
|
|
|
|
|
|
|
|
let fut = self.1.call(req);
|
|
|
|
|
2020-07-10 20:34:18 +00:00
|
|
|
let payload_fut = async move {
|
2020-07-10 20:07:47 +00:00
|
|
|
let mut bytes = BytesMut::new();
|
|
|
|
|
|
|
|
while let Some(res) = pl.next().await {
|
2020-07-10 20:34:18 +00:00
|
|
|
let b = res.map_err(|e| {
|
|
|
|
error!("Payload error, {}", e);
|
|
|
|
DebugError
|
|
|
|
})?;
|
2020-07-10 20:07:47 +00:00
|
|
|
bytes.extend(b);
|
|
|
|
}
|
|
|
|
|
|
|
|
info!("{}", String::from_utf8_lossy(bytes.as_ref()));
|
|
|
|
|
2020-07-10 20:34:18 +00:00
|
|
|
tx.send(Ok(bytes.freeze())).await.map_err(|e| {
|
|
|
|
error!("Error sending bytes, {}", e);
|
|
|
|
DebugError
|
|
|
|
})?;
|
2020-07-10 20:07:47 +00:00
|
|
|
|
2020-07-10 20:34:18 +00:00
|
|
|
Ok(()) as Result<(), actix_web::Error>
|
|
|
|
};
|
|
|
|
|
|
|
|
Box::pin(async move {
|
|
|
|
let (res, _) = try_join(fut, payload_fut).await?;
|
2020-07-10 20:07:47 +00:00
|
|
|
|
2020-07-10 20:34:18 +00:00
|
|
|
Ok(res)
|
|
|
|
})
|
|
|
|
} else {
|
|
|
|
let fut = self.1.call(req);
|
2020-07-10 20:07:47 +00:00
|
|
|
|
2020-07-10 20:34:18 +00:00
|
|
|
Box::pin(async move { fut.await })
|
|
|
|
}
|
2020-07-10 20:07:47 +00:00
|
|
|
}
|
|
|
|
}
|