Make CancelSafeProcessor impl more concise, format code

This commit is contained in:
Aode (Lion) 2021-09-05 19:05:58 -05:00
parent c53ae967d5
commit 80044616f9

View file

@ -9,11 +9,18 @@ use awc::Client;
use dashmap::{mapref::entry::Entry, DashMap}; use dashmap::{mapref::entry::Entry, DashMap};
use futures_core::stream::Stream; use futures_core::stream::Stream;
use once_cell::sync::{Lazy, OnceCell}; use once_cell::sync::{Lazy, OnceCell};
use std::{collections::HashSet, future::{Future, ready}, path::PathBuf, time::SystemTime, task::{Context, Poll}, pin::Pin}; use std::{
collections::HashSet,
future::{ready, Future},
path::PathBuf,
pin::Pin,
task::{Context, Poll},
time::SystemTime,
};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
sync::oneshot::{Sender, Receiver}, sync::oneshot::{Receiver, Sender},
}; };
use tracing::{debug, error, info, instrument, Span}; use tracing::{debug, error, info, instrument, Span};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
@ -97,7 +104,11 @@ where
} }
}; };
CancelSafeProcessor { path, receiver, fut } CancelSafeProcessor {
path,
receiver,
fut,
}
} }
} }
@ -109,25 +120,21 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ref mut rx) = self.receiver { if let Some(ref mut rx) = self.receiver {
Pin::new(rx).poll(cx).map(|res| res.map_err(|_| UploadError::Canceled)) Pin::new(rx)
.poll(cx)
.map(|res| res.map_err(|_| UploadError::Canceled))
} else { } else {
match Pin::new(&mut self.fut).poll(cx) { Pin::new(&mut self.fut).poll(cx).map(|res| {
Poll::Pending => Poll::Pending,
Poll::Ready(res) => {
let opt = PROCESS_MAP.remove(&self.path); let opt = PROCESS_MAP.remove(&self.path);
match res { res.map(|tup| {
Err(e) => Poll::Ready(Err(e)),
Ok(tup) => {
if let Some((_, vec)) = opt { if let Some((_, vec)) = opt {
for sender in vec { for sender in vec {
let _ = sender.send(tup.clone()); let _ = sender.send(tup.clone());
} }
} }
Poll::Ready(Ok(tup)) tup
} })
} })
}
}
} }
} }
} }
@ -533,8 +540,8 @@ async fn process(
Ok((details, bytes)) as Result<(Details, web::Bytes), UploadError> Ok((details, bytes)) as Result<(Details, web::Bytes), UploadError>
}; };
let (details, bytes) = CancelSafeProcessor::new(thumbnail_path.clone(), Box::pin(process_fut)).await?; let (details, bytes) =
CancelSafeProcessor::new(thumbnail_path.clone(), Box::pin(process_fut)).await?;
return Ok(srv_response( return Ok(srv_response(
HttpResponse::Ok(), HttpResponse::Ok(),