Join upstream service with payload fut

This commit is contained in:
asonix 2020-07-10 15:34:18 -05:00
parent fbe8baaee9
commit b7e8d2e465
2 changed files with 24 additions and 12 deletions

View file

@ -132,9 +132,9 @@ async fn main() -> Result<(), anyhow::Error> {
.service(web::resource("/media/{path}").route(web::get().to(routes::media)))
.service(
web::resource("/inbox")
.wrap(DebugPayload(config.debug()))
.wrap(config.digest_middleware())
.wrap(config.signature_middleware(state.requests(), actors.clone()))
.wrap(DebugPayload(config.debug()))
.route(web::post().to(inbox)),
)
.service(web::resource("/actor").route(web::get().to(actor)))

View file

@ -5,10 +5,10 @@ use actix_web::{
};
use bytes::BytesMut;
use futures::{
future::{ok, LocalBoxFuture, Ready},
future::{ok, try_join, LocalBoxFuture, Ready},
stream::StreamExt,
};
use log::info;
use log::{error, info};
use std::task::{Context, Poll};
use tokio::sync::mpsc::channel;
@ -75,24 +75,36 @@ where
let fut = self.1.call(req);
return Box::pin(async move {
let payload_fut = async move {
let mut bytes = BytesMut::new();
while let Some(res) = pl.next().await {
let b = res.map_err(|_| DebugError)?;
let b = res.map_err(|e| {
error!("Payload error, {}", e);
DebugError
})?;
bytes.extend(b);
}
info!("{}", String::from_utf8_lossy(bytes.as_ref()));
tx.send(Ok(bytes.freeze())).await.map_err(|_| DebugError)?;
tx.send(Ok(bytes.freeze())).await.map_err(|e| {
error!("Error sending bytes, {}", e);
DebugError
})?;
fut.await
});
}
Ok(()) as Result<(), actix_web::Error>
};
Box::pin(async move {
let (res, _) = try_join(fut, payload_fut).await?;
Ok(res)
})
} else {
let fut = self.1.call(req);
Box::pin(async move { fut.await })
}
}
}