From 810831ca738f6db5f0c16f03af1f97f0ffe6ba5f Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Sun, 5 Sep 2021 17:15:30 -0500 Subject: [PATCH] Don't process images that are already being processed --- Cargo.lock | 13 +++- Cargo.toml | 3 +- src/main.rs | 194 +++++++++++++++++++++++++++++++++++++--------------- 3 files changed, 153 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37ff9af..afd7559 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,6 +461,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "dashmap" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" +dependencies = [ + "cfg-if", + "num_cpus", +] + [[package]] name = "derive_more" version = "0.99.16" @@ -960,7 +970,7 @@ dependencies = [ [[package]] name = "pict-rs" -version = "0.3.0-alpha.24" +version = "0.3.0-alpha.25" dependencies = [ "actix-form-data", "actix-rt", @@ -968,6 +978,7 @@ dependencies = [ "anyhow", "awc", "base64", + "dashmap", "futures-core", "mime", "num_cpus", diff --git a/Cargo.toml b/Cargo.toml index cdffe68..81d633a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "pict-rs" description = "A simple image hosting service" -version = "0.3.0-alpha.24" +version = "0.3.0-alpha.25" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" @@ -17,6 +17,7 @@ actix-web = { version = "4.0.0-beta.8", default-features = false } anyhow = "1.0" awc = { version = "3.0.0-beta.7", default-features = false } base64 = "0.13.0" +dashmap = "4.0.2" futures-core = "0.3.17" mime = "0.3.1" num_cpus = "1.13" diff --git a/src/main.rs b/src/main.rs index 3d48de7..53f59fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,11 +6,15 @@ use actix_web::{ web, App, HttpResponse, HttpResponseBuilder, HttpServer, }; use awc::Client; +use dashmap::{mapref::entry::Entry, DashMap}; use futures_core::stream::Stream; use once_cell::sync::{Lazy, OnceCell}; -use std::{collections::HashSet, future::ready, path::PathBuf, time::SystemTime}; +use std::{collections::HashSet, future::{Future, ready}, path::PathBuf, time::SystemTime, task::{Context, Poll}, pin::Pin}; use structopt::StructOpt; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::oneshot::{Sender, Receiver}, +}; use tracing::{debug, error, info, instrument, Span}; use tracing_subscriber::EnvFilter; @@ -60,12 +64,82 @@ static TMP_DIR: Lazy = Lazy::new(|| { }); static CONFIG: Lazy = Lazy::new(Config::from_args); static PROCESS_SEMAPHORE: OnceCell = OnceCell::new(); +static PROCESS_MAP: Lazy>>> = + Lazy::new(DashMap::new); fn process_semaphore() -> &'static tokio::sync::Semaphore { PROCESS_SEMAPHORE .get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) } +struct CancelSafeProcessor { + path: PathBuf, + receiver: Option>, + fut: F, +} + +impl CancelSafeProcessor +where + F: Future> + Unpin, +{ + pub(crate) fn new(path: PathBuf, fut: F) -> Self { + let entry = PROCESS_MAP.entry(path.clone()); + + let receiver = match entry { + Entry::Vacant(vacant) => { + vacant.insert(Vec::new()); + None + } + Entry::Occupied(mut occupied) => { + let (tx, rx) = tokio::sync::oneshot::channel(); + occupied.get_mut().push(tx); + Some(rx) + } + }; + + CancelSafeProcessor { path, receiver, fut } + } +} + +impl Future for CancelSafeProcessor +where + F: Future> + Unpin, +{ + type Output = Result<(Details, web::Bytes), UploadError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some(ref mut rx) = self.receiver { + Pin::new(rx).poll(cx).map(|res| res.map_err(|_| UploadError::Canceled)) + } else { + match Pin::new(&mut self.fut).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(res) => { + let opt = PROCESS_MAP.remove(&self.path); + match res { + Err(e) => Poll::Ready(Err(e)), + Ok(tup) => { + if let Some((_, vec)) = opt { + for sender in vec { + let _ = sender.send(tup.clone()); + } + } + Poll::Ready(Ok(tup)) + } + } + } + } + } + } +} + +impl Drop for CancelSafeProcessor { + fn drop(&mut self) { + if self.receiver.is_none() { + PROCESS_MAP.remove(&self.path); + } + } +} + // try moving a file #[instrument] async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), UploadError> { @@ -389,68 +463,78 @@ async fn process( let mut original_path = manager.image_dir(); original_path.push(name.clone()); - // Create and save a JPG for motion images (gif, mp4) - if let Some((updated_path, exists)) = - self::processor::prepare_image(original_path.clone()).await? - { - original_path = updated_path.clone(); + let thumbnail_path2 = thumbnail_path.clone(); + let process_fut = async { + let thumbnail_path = thumbnail_path2; + // Create and save a JPG for motion images (gif, mp4) + if let Some((updated_path, exists)) = + self::processor::prepare_image(original_path.clone()).await? + { + original_path = updated_path.clone(); - if exists.is_new() { - // Save the transcoded file in another task - debug!("Spawning storage task"); - let span = Span::current(); - let manager2 = manager.clone(); - let name = name.clone(); - actix_rt::spawn(async move { - let entered = span.enter(); - if let Err(e) = manager2.store_variant(updated_path, name).await { - error!("Error storing variant, {}", e); - return; - } - drop(entered); - }); + if exists.is_new() { + // Save the transcoded file in another task + debug!("Spawning storage task"); + let span = Span::current(); + let manager2 = manager.clone(); + let name = name.clone(); + actix_rt::spawn(async move { + let entered = span.enter(); + if let Err(e) = manager2.store_variant(updated_path, name).await { + error!("Error storing variant, {}", e); + return; + } + drop(entered); + }); + } } - } - let permit = process_semaphore().acquire().await?; - let file = tokio::fs::File::open(original_path.clone()).await?; + let permit = process_semaphore().acquire().await?; - let mut processed_reader = - crate::magick::process_image_write_read(file, thumbnail_args, format)?; + let file = tokio::fs::File::open(original_path.clone()).await?; - let mut vec = Vec::new(); - processed_reader.read_to_end(&mut vec).await?; - drop(permit); + let mut processed_reader = + crate::magick::process_image_write_read(file, thumbnail_args, format)?; - let bytes = web::Bytes::from(vec); + let mut vec = Vec::new(); + processed_reader.read_to_end(&mut vec).await?; + let bytes = web::Bytes::from(vec); - let details = if let Some(details) = details { - details - } else { - Details::from_bytes(bytes.clone()).await? + drop(permit); + + let details = if let Some(details) = details { + details + } else { + Details::from_bytes(bytes.clone()).await? + }; + + let span = tracing::Span::current(); + let details2 = details.clone(); + let bytes2 = bytes.clone(); + actix_rt::spawn(async move { + let entered = span.enter(); + if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await { + tracing::warn!("Error saving thumbnail: {}", e); + return; + } + if let Err(e) = manager + .store_variant_details(thumbnail_path.clone(), name.clone(), &details2) + .await + { + tracing::warn!("Error saving variant details: {}", e); + return; + } + if let Err(e) = manager.store_variant(thumbnail_path, name.clone()).await { + tracing::warn!("Error saving variant info: {}", e); + } + drop(entered); + }); + + Ok((details, bytes)) as Result<(Details, web::Bytes), UploadError> }; - let span = tracing::Span::current(); - let details2 = details.clone(); - let bytes2 = bytes.clone(); - actix_rt::spawn(async move { - let entered = span.enter(); - if let Err(e) = safe_save_file(thumbnail_path.clone(), bytes2).await { - tracing::warn!("Error saving thumbnail: {}", e); - return; - } - if let Err(e) = manager - .store_variant_details(thumbnail_path.clone(), name.clone(), &details2) - .await - { - tracing::warn!("Error saving variant details: {}", e); - return; - } - if let Err(e) = manager.store_variant(thumbnail_path, name.clone()).await { - tracing::warn!("Error saving variant info: {}", e); - } - drop(entered); - }); + let (details, bytes) = CancelSafeProcessor::new(thumbnail_path.clone(), Box::pin(process_fut)).await?; + return Ok(srv_response( HttpResponse::Ok(),