diff --git a/src/generate.rs b/src/generate.rs index a17d22c..79129b7 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -8,7 +8,8 @@ use crate::{ formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat}, future::{WithMetrics, WithTimeout}, magick::PolicyDir, - repo::{ArcRepo, Hash, VariantAlreadyExists}, + repo::{Hash, VariantAlreadyExists}, + state::State, store::Store, tmp_file::TmpDir, }; @@ -49,47 +50,43 @@ impl Drop for MetricsGuard { } } -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, hash, process_map, config))] +#[tracing::instrument(skip(state, process_map, hash))] pub(crate) async fn generate( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, - store: &S, + state: &State, process_map: &ProcessMap, format: InputProcessableFormat, thumbnail_path: PathBuf, thumbnail_args: Vec, original_details: &Details, - config: &crate::config::Configuration, hash: Hash, ) -> Result<(Details, Bytes), Error> { - if config.server.danger_dummy_mode { - let identifier = repo + if state.config.server.danger_dummy_mode { + let identifier = state + .repo .identifier(hash) .await? .ok_or(UploadError::MissingIdentifier)?; - let bytes = store.to_bytes(&identifier, None, None).await?.into_bytes(); + let bytes = state + .store + .to_bytes(&identifier, None, None) + .await? + .into_bytes(); Ok((original_details.clone(), bytes)) } else { let process_fut = process( - tmp_dir, - policy_dir, - repo, - store, + state, format, thumbnail_path.clone(), thumbnail_args, original_details, - config, hash.clone(), ); let (details, bytes) = process_map .process(hash, thumbnail_path, process_fut) - .with_timeout(Duration::from_secs(config.media.process_timeout * 4)) + .with_timeout(Duration::from_secs(state.config.media.process_timeout * 4)) .with_metrics("pict-rs.generate.process") .await .map_err(|_| UploadError::ProcessTimeout)??; @@ -99,37 +96,21 @@ pub(crate) async fn generate( } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, hash, config))] +#[tracing::instrument(skip(state, hash))] async fn process( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, - store: &S, + state: &State, output_format: InputProcessableFormat, thumbnail_path: PathBuf, thumbnail_args: Vec, original_details: &Details, - config: &crate::config::Configuration, hash: Hash, ) -> Result<(Details, Bytes), Error> { let guard = MetricsGuard::guard(); let permit = crate::process_semaphore().acquire().await?; - let identifier = input_identifier( - tmp_dir, - policy_dir, - repo, - store, - output_format, - hash.clone(), - original_details, - &config.media, - ) - .await?; + let identifier = input_identifier(state, output_format, hash.clone(), original_details).await?; - let input_details = - crate::ensure_details_identifier(tmp_dir, policy_dir, repo, store, config, &identifier) - .await?; + let input_details = crate::ensure_details_identifier(state, &identifier).await?; let input_format = input_details .internal_format() @@ -139,21 +120,19 @@ async fn process( let format = input_format.process_to(output_format); let quality = match format { - ProcessableFormat::Image(format) => config.media.image.quality_for(format), - ProcessableFormat::Animation(format) => config.media.animation.quality_for(format), + ProcessableFormat::Image(format) => state.config.media.image.quality_for(format), + ProcessableFormat::Animation(format) => state.config.media.animation.quality_for(format), }; - let stream = store.to_stream(&identifier, None, None).await?; + let stream = state.store.to_stream(&identifier, None, None).await?; let vec = crate::magick::process_image_stream_read( - tmp_dir, - policy_dir, + state, stream, thumbnail_args, input_format, format, quality, - config.media.process_timeout, ) .await? .into_vec() @@ -165,18 +144,20 @@ async fn process( drop(permit); let details = Details::from_bytes( - tmp_dir, - policy_dir, - config.media.process_timeout, + &state.tmp_dir, + &state.policy_dir, + &state.config.media.process_timeout, bytes.clone(), ) .await?; - let identifier = store + let identifier = state + .store .save_bytes(bytes.clone(), details.media_type()) .await?; - if let Err(VariantAlreadyExists) = repo + if let Err(VariantAlreadyExists) = state + .repo .relate_variant_identifier( hash, thumbnail_path.to_string_lossy().to_string(), @@ -184,10 +165,10 @@ async fn process( ) .await? { - store.remove(&identifier).await?; + state.store.remove(&identifier).await?; } - repo.relate_details(&identifier, &details).await?; + state.repo.relate_details(&identifier, &details).await?; guard.disarm(); @@ -197,14 +178,10 @@ async fn process( #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all)] async fn input_identifier( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, - store: &S, + state: &State, output_format: InputProcessableFormat, hash: Hash, original_details: &Details, - media: &crate::config::Media, ) -> Result, Error> where S: Store + 'static, @@ -220,11 +197,12 @@ where }; if should_thumbnail { - if let Some(identifier) = repo.motion_identifier(hash.clone()).await? { + if let Some(identifier) = state.repo.motion_identifier(hash.clone()).await? { return Ok(identifier); }; - let identifier = repo + let identifier = state + .repo .identifier(hash.clone()) .await? .ok_or(UploadError::MissingIdentifier)?; @@ -232,24 +210,22 @@ where let (reader, media_type) = if let Some(processable_format) = original_details.internal_format().processable_format() { - let thumbnail_format = media.image.format.unwrap_or(ImageFormat::Webp); + let thumbnail_format = state.config.media.image.format.unwrap_or(ImageFormat::Webp); - let stream = store.to_stream(&identifier, None, None).await?; + let stream = state.store.to_stream(&identifier, None, None).await?; let reader = magick::thumbnail( - tmp_dir, - policy_dir, + state, stream, processable_format, ProcessableFormat::Image(thumbnail_format), - media.image.quality_for(thumbnail_format), - media.process_timeout, + config.media.image.quality_for(thumbnail_format), ) .await?; (reader, thumbnail_format.media_type()) } else { - let thumbnail_format = match media.image.format { + let thumbnail_format = match state.config.media.image.format { Some(ImageFormat::Webp | ImageFormat::Avif | ImageFormat::Jxl) => { ffmpeg::ThumbnailFormat::Webp } @@ -258,14 +234,14 @@ where }; let reader = ffmpeg::thumbnail( - tmp_dir, - store.clone(), + state.tmp_dir, + state.store.clone(), identifier, original_details .video_format() .unwrap_or(InternalVideoFormat::Mp4), thumbnail_format, - media.process_timeout, + state.config.media.process_timeout, ) .await?; @@ -273,16 +249,20 @@ where }; let motion_identifier = reader - .with_stdout(|stdout| async { store.save_async_read(stdout, media_type).await }) + .with_stdout(|stdout| async { state.store.save_async_read(stdout, media_type).await }) .await??; - repo.relate_motion_identifier(hash, &motion_identifier) + state + .repo + .relate_motion_identifier(hash, &motion_identifier) .await?; return Ok(motion_identifier); } - repo.identifier(hash) + state + .repo + .identifier(hash) .await? .ok_or(UploadError::MissingIdentifier) .map_err(From::from) diff --git a/src/generate/magick.rs b/src/generate/magick.rs index e0b1799..d722d57 100644 --- a/src/generate/magick.rs +++ b/src/generate/magick.rs @@ -6,29 +6,29 @@ use crate::{ formats::ProcessableFormat, magick::{MagickError, PolicyDir, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH}, process::{Process, ProcessRead}, + state::State, stream::LocalBoxStream, tmp_file::TmpDir, }; -async fn thumbnail_animation( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, +async fn thumbnail_animation( + state: &State, input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, - timeout: u64, write_file: F, ) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - let temporary_path = tmp_dir + let temporary_path = state + .tmp_dir .tmp_folder() .await .map_err(MagickError::CreateTemporaryDirectory)?; - let input_file = tmp_dir.tmp_file(None); + let input_file = state.tmp_dir.tmp_file(None); crate::store::file_store::safe_create_parent(&input_file) .await .map_err(MagickError::CreateDir)?; @@ -62,10 +62,10 @@ where let envs = [ (MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()), - (MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()), + (MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()), ]; - let reader = Process::run("magick", &args, &envs, timeout)? + let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)? .read() .add_extras(input_file) .add_extras(temporary_path); @@ -73,22 +73,18 @@ where Ok(reader) } -pub(super) async fn thumbnail( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, +pub(super) async fn thumbnail( + state: &State, stream: LocalBoxStream<'static, std::io::Result>, input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, - timeout: u64, ) -> Result { thumbnail_animation( - tmp_dir, - policy_dir, + state, input_format, format, quality, - timeout, |mut tmp_file| async move { tmp_file .write_from_stream(stream) diff --git a/src/ingest.rs b/src/ingest.rs index 7e4358c..d140fe7 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -8,6 +8,7 @@ use crate::{ future::WithMetrics, magick::PolicyDir, repo::{Alias, ArcRepo, DeleteToken, Hash}, + state::State, store::Store, tmp_file::TmpDir, }; @@ -19,7 +20,7 @@ use streem::IntoStreamer; use tracing::{Instrument, Span}; mod hasher; -use hasher::{Hasher, State}; +use hasher::Hasher; #[derive(Debug)] pub(crate) struct Session { @@ -50,12 +51,17 @@ where } async fn process_ingest( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - store: &S, + state: &State, stream: impl Stream> + 'static, - media: &crate::config::Media, -) -> Result<(InternalFormat, Arc, Details, Rc>), Error> +) -> Result< + ( + InternalFormat, + Arc, + Details, + Rc>, + ), + Error, +> where S: Store, { @@ -65,43 +71,30 @@ where let permit = crate::process_semaphore().acquire().await?; - let prescribed = Validations { - image: &media.image, - animation: &media.animation, - video: &media.video, - }; - tracing::trace!("Validating bytes"); - let (input_type, process_read) = crate::validate::validate_bytes( - tmp_dir, - policy_dir, - bytes, - prescribed, - media.process_timeout, - ) - .await?; + let (input_type, process_read) = crate::validate::validate_bytes(state, bytes).await?; - let process_read = if let Some(operations) = media.preprocess_steps() { + let process_read = if let Some(operations) = state.config.media.preprocess_steps() { if let Some(format) = input_type.processable_format() { let (_, magick_args) = crate::processor::build_chain(operations, format.file_extension())?; let quality = match format { - crate::formats::ProcessableFormat::Image(format) => media.image.quality_for(format), + crate::formats::ProcessableFormat::Image(format) => { + state.config.media.image.quality_for(format) + } crate::formats::ProcessableFormat::Animation(format) => { - media.animation.quality_for(format) + state.config.media.animation.quality_for(format) } }; crate::magick::process_image_process_read( - tmp_dir, - policy_dir, + state, process_read, magick_args, format, format, quality, - media.process_timeout, ) .await? } else { @@ -111,7 +104,7 @@ where process_read }; - let (state, identifier) = process_read + let (hash_state, identifier) = process_read .with_stdout(|stdout| async move { let hasher_reader = Hasher::new(stdout); let state = hasher_reader.state(); @@ -119,11 +112,11 @@ where store .save_async_read(hasher_reader, input_type.media_type()) .await - .map(move |identifier| (state, identifier)) + .map(move |identifier| (hash_state, identifier)) }) .await??; - let bytes_stream = store.to_bytes(&identifier, None, None).await?; + let bytes_stream = state.store.to_bytes(&identifier, None, None).await?; let details = Details::from_bytes( tmp_dir, policy_dir, @@ -134,13 +127,21 @@ where drop(permit); - Ok((input_type, identifier, details, state)) + Ok((input_type, identifier, details, hash_state)) } async fn dummy_ingest( - store: &S, + state: &State, stream: impl Stream> + 'static, -) -> Result<(InternalFormat, Arc, Details, Rc>), Error> +) -> Result< + ( + InternalFormat, + Arc, + Details, + Rc>, + ), + Error, +> where S: Store, { @@ -156,7 +157,8 @@ where let input_type = InternalFormat::Image(crate::formats::ImageFormat::Png); - let identifier = store + let identifier = state + .store .save_async_read(hasher_reader, input_type.media_type()) .await?; @@ -166,41 +168,37 @@ where } #[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, client, stream, config))] +#[tracing::instrument(skip(state, stream))] pub(crate) async fn ingest( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, - store: &S, - client: &ClientWithMiddleware, + state: &State, stream: impl Stream> + 'static, declared_alias: Option, - config: &crate::config::Configuration, ) -> Result where S: Store, { - let (input_type, identifier, details, state) = if config.server.danger_dummy_mode { - dummy_ingest(store, stream).await? + let (input_type, identifier, details, hash_state) = if state.config.server.danger_dummy_mode { + dummy_ingest(state, stream).await? } else { - process_ingest(tmp_dir, policy_dir, store, stream, &config.media).await? + process_ingest(state, stream).await? }; let mut session = Session { - repo: repo.clone(), + repo: state.repo.clone(), delete_token: DeleteToken::generate(), hash: None, alias: None, identifier: Some(identifier.clone()), }; - if let Some(endpoint) = &config.media.external_validation { + if let Some(endpoint) = &state.config.media.external_validation { let stream = store.to_stream(&identifier, None, None).await?; - let response = client + let response = state + .client .post(endpoint.as_str()) .timeout(Duration::from_secs( - config.media.external_validation_timeout, + state.config.media.external_validation_timeout, )) .header("Content-Type", input_type.media_type().as_ref()) .body(Body::wrap_stream(crate::stream::make_send(stream))) @@ -214,13 +212,13 @@ where } } - let (hash, size) = state.borrow_mut().finalize_reset(); + let (hash, size) = hash_state.borrow_mut().finalize_reset(); let hash = Hash::new(hash, size, input_type); - save_upload(&mut session, repo, store, hash.clone(), &identifier).await?; + save_upload(&mut session, state, hash.clone(), &identifier).await?; - repo.relate_details(&identifier, &details).await?; + state.repo.relate_details(&identifier, &details).await?; if let Some(alias) = declared_alias { session.add_existing_alias(hash, alias).await? @@ -234,17 +232,21 @@ where #[tracing::instrument(level = "trace", skip_all)] async fn save_upload( session: &mut Session, - repo: &ArcRepo, - store: &S, + state: &State, hash: Hash, identifier: &Arc, ) -> Result<(), Error> where S: Store, { - if repo.create_hash(hash.clone(), identifier).await?.is_err() { + if state + .repo + .create_hash(hash.clone(), identifier) + .await? + .is_err() + { // duplicate upload - store.remove(identifier).await?; + state.store.remove(identifier).await?; session.identifier.take(); return Ok(()); } diff --git a/src/lib.rs b/src/lib.rs index c47717f..2afa6be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ mod read; mod repo; mod repo_04; mod serde_str; +mod state; mod store; mod stream; mod sync; @@ -49,6 +50,7 @@ use repo::ArcRepo; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; use rusty_s3::UrlStyle; +use state::State; use std::{ marker::PhantomData, path::Path, @@ -106,53 +108,45 @@ fn process_semaphore() -> &'static Semaphore { } async fn ensure_details( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, - store: &S, - config: &Configuration, + state: &State, alias: &Alias, ) -> Result { - let Some(identifier) = repo.identifier_from_alias(alias).await? else { + let Some(identifier) = state.repo.identifier_from_alias(alias).await? else { return Err(UploadError::MissingAlias.into()); }; - ensure_details_identifier(tmp_dir, policy_dir, repo, store, config, &identifier).await + ensure_details_identifier(state, &identifier).await } async fn ensure_details_identifier( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, - store: &S, - config: &Configuration, + state: &State, identifier: &Arc, ) -> Result { - let details = repo.details(identifier).await?; + let details = state.repo.details(identifier).await?; if let Some(details) = details { tracing::debug!("details exist"); Ok(details) } else { - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); - } else if config.server.danger_dummy_mode { + } else if state.config.server.danger_dummy_mode { return Ok(Details::danger_dummy(formats::InternalFormat::Image( formats::ImageFormat::Png, ))); } tracing::debug!("generating new details from {:?}", identifier); - let bytes_stream = store.to_bytes(identifier, None, None).await?; + let bytes_stream = state.store.to_bytes(identifier, None, None).await?; let new_details = Details::from_bytes( - tmp_dir, - policy_dir, - config.media.process_timeout, + &state.tmp_dir, + &state.policy_dir, + state.config.media.process_timeout, bytes_stream.into_bytes(), ) .await?; tracing::debug!("storing details for {:?}", identifier); - repo.relate_details(identifier, &new_details).await?; + state.repo.relate_details(identifier, &new_details).await?; tracing::debug!("stored"); Ok(new_details) } @@ -165,47 +159,22 @@ impl FormData for Upload { type Error = Error; fn form(req: &HttpRequest) -> Form { - let tmp_dir = req - .app_data::>() - .expect("No TmpDir in request") - .clone(); - let policy_dir = req - .app_data::>() - .expect("No TmpDir in request") - .clone(); - let repo = req - .app_data::>() - .expect("No repo in request") - .clone(); - let store = req - .app_data::>() - .expect("No store in request") - .clone(); - let client = req - .app_data::>() - .expect("No client in request") - .clone(); - let config = req - .app_data::>() - .expect("No configuration in request") + let state = req + .app_data::>>() + .expect("No state in request") .clone(); // Create a new Multipart Form validator // // This form is expecting a single array field, 'images' with at most 10 files in it Form::new() - .max_files(config.server.max_file_count) - .max_file_size(config.media.max_file_size * MEGABYTES) + .max_files(state.config.server.max_file_count) + .max_file_size(state.config.media.max_file_size * MEGABYTES) .transform_error(transform_error) .field( "images", Field::array(Field::file(move |filename, _, stream| { - let tmp_dir = tmp_dir.clone(); - let policy_dir = policy_dir.clone(); - let repo = repo.clone(); - let store = store.clone(); - let client = client.clone(); - let config = config.clone(); + let state = state.clone(); metrics::counter!("pict-rs.files", "upload" => "inline").increment(1); @@ -213,23 +182,13 @@ impl FormData for Upload { Box::pin( async move { - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } let stream = crate::stream::from_err(stream); - ingest::ingest( - &tmp_dir, - &policy_dir, - &repo, - &**store, - &client, - stream, - None, - &config, - ) - .await + ingest::ingest(&state, stream, None).await } .instrument(span), ) @@ -249,47 +208,22 @@ impl FormData for Import { type Error = Error; fn form(req: &actix_web::HttpRequest) -> Form { - let tmp_dir = req - .app_data::>() - .expect("No TmpDir in request") - .clone(); - let policy_dir = req - .app_data::>() - .expect("No TmpDir in request") - .clone(); - let repo = req - .app_data::>() - .expect("No repo in request") - .clone(); - let store = req - .app_data::>() - .expect("No store in request") - .clone(); - let client = req - .app_data::() - .expect("No client in request") - .clone(); - let config = req - .app_data::>() - .expect("No configuration in request") + let state = req + .app_data::>>() + .expect("No state in request") .clone(); // Create a new Multipart Form validator for internal imports // // This form is expecting a single array field, 'images' with at most 10 files in it Form::new() - .max_files(config.server.max_file_count) - .max_file_size(config.media.max_file_size * MEGABYTES) + .max_files(state.config.server.max_file_count) + .max_file_size(state.config.media.max_file_size * MEGABYTES) .transform_error(transform_error) .field( "images", Field::array(Field::file(move |filename, _, stream| { - let tmp_dir = tmp_dir.clone(); - let policy_dir = policy_dir.clone(); - let repo = repo.clone(); - let store = store.clone(); - let client = client.clone(); - let config = config.clone(); + let state = state.clone(); metrics::counter!("pict-rs.files", "import" => "inline").increment(1); @@ -297,23 +231,14 @@ impl FormData for Import { Box::pin( async move { - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } let stream = crate::stream::from_err(stream); - ingest::ingest( - &tmp_dir, - &policy_dir, - &repo, - &**store, - &client, - stream, - Some(Alias::from_existing(&filename)), - &config, - ) - .await + ingest::ingest(&state, stream, Some(Alias::from_existing(&filename))) + .await } .instrument(span), ) @@ -330,49 +255,28 @@ impl FormData for Import { } /// Handle responding to successful uploads -#[tracing::instrument( - name = "Uploaded files", - skip(value, tmp_dir, policy_dir, repo, store, config) -)] +#[tracing::instrument(name = "Uploaded files", skip(value, state))] async fn upload( Multipart(Upload(value, _)): Multipart>, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - handle_upload(value, tmp_dir, policy_dir, repo, store, config).await + handle_upload(value, state).await } /// Handle responding to successful uploads -#[tracing::instrument( - name = "Imported files", - skip(value, tmp_dir, policy_dir, repo, store, config) -)] +#[tracing::instrument(name = "Imported files", skip(value, state))] async fn import( Multipart(Import(value, _)): Multipart>, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - handle_upload(value, tmp_dir, policy_dir, repo, store, config).await + handle_upload(value, state).await } /// Handle responding to successful uploads -#[tracing::instrument( - name = "Uploaded files", - skip(value, tmp_dir, policy_dir, repo, store, config) -)] +#[tracing::instrument(name = "Uploaded files", skip(value, state))] async fn handle_upload( value: Value, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { let images = value .map() @@ -391,8 +295,7 @@ async fn handle_upload( tracing::debug!("Uploaded {} as {:?}", image.filename, alias); let delete_token = image.result.delete_token(); - let details = - ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, alias).await?; + let details = ensure_details(&state, alias).await?; files.push(serde_json::json!({ "file": alias.to_string(), @@ -422,30 +325,19 @@ impl FormData for BackgroundedUpload { // Create a new Multipart Form validator for backgrounded uploads // // This form is expecting a single array field, 'images' with at most 10 files in it - let repo = req - .app_data::>() - .expect("No repo in request") + let state = req + .app_data::>>() + .expect("No state in request") .clone(); - let store = req - .app_data::>() - .expect("No store in request") - .clone(); - let config = req - .app_data::>() - .expect("No configuration in request") - .clone(); - - let read_only = config.server.read_only; Form::new() - .max_files(config.server.max_file_count) - .max_file_size(config.media.max_file_size * MEGABYTES) + .max_files(state.config.server.max_file_count) + .max_file_size(state.config.media.max_file_size * MEGABYTES) .transform_error(transform_error) .field( "images", Field::array(Field::file(move |filename, _, stream| { - let repo = (**repo).clone(); - let store = (**store).clone(); + let state = state.clone(); metrics::counter!("pict-rs.files", "upload" => "background").increment(1); @@ -453,13 +345,13 @@ impl FormData for BackgroundedUpload { Box::pin( async move { - if read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } let stream = crate::stream::from_err(stream); - Backgrounded::proxy(repo, store, stream).await + Backgrounded::proxy(&state.repo, &state.store, stream).await } .instrument(span), ) @@ -475,10 +367,10 @@ impl FormData for BackgroundedUpload { } } -#[tracing::instrument(name = "Uploaded files", skip(value, repo))] +#[tracing::instrument(name = "Uploaded files", skip(value, state))] async fn upload_backgrounded( Multipart(BackgroundedUpload(value, _)): Multipart>, - repo: web::Data, + state: web::Data>, ) -> Result { let images = value .map() @@ -496,7 +388,7 @@ async fn upload_backgrounded( let upload_id = image.result.upload_id().expect("Upload ID exists"); let identifier = image.result.identifier().expect("Identifier exists"); - queue::queue_ingest(&repo, identifier, upload_id, None).await?; + queue::queue_ingest(&state.repo, identifier, upload_id, None).await?; files.push(serde_json::json!({ "upload_id": upload_id.to_string(), @@ -521,30 +413,25 @@ struct ClaimQuery { /// Claim a backgrounded upload #[tracing::instrument(name = "Waiting on upload", skip_all)] async fn claim_upload( - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, query: web::Query, + state: web::Data>, ) -> Result { let upload_id = Serde::into_inner(query.into_inner().upload_id); - match repo + match state + .repo .wait(upload_id) .with_timeout(Duration::from_secs(10)) .await { Ok(wait_res) => { let upload_result = wait_res?; - repo.claim(upload_id).await?; + state.repo.claim(upload_id).await?; metrics::counter!("pict-rs.background.upload.claim").increment(1); match upload_result { UploadResult::Success { alias, token } => { - let details = - ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias) - .await?; + let details = ensure_details(&state, &alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", @@ -576,21 +463,13 @@ struct UrlQuery { async fn ingest_inline( stream: impl Stream> + 'static, - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, - store: &S, - client: &ClientWithMiddleware, - config: &Configuration, + state: &State, ) -> Result<(Alias, DeleteToken, Details), Error> { - let session = ingest::ingest( - tmp_dir, policy_dir, repo, store, client, stream, None, config, - ) - .await?; + let session = ingest::ingest(state, stream, None).await?; let alias = session.alias().expect("alias should exist").to_owned(); - let details = ensure_details(tmp_dir, policy_dir, repo, store, config, &alias).await?; + let details = ensure_details(state, &alias).await?; let delete_token = session.disarm(); @@ -598,68 +477,50 @@ async fn ingest_inline( } /// download an image from a URL -#[tracing::instrument( - name = "Downloading file", - skip(client, tmp_dir, policy_dir, repo, store, config) -)] +#[tracing::instrument(name = "Downloading file", skip(state))] async fn download( - client: web::Data, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, query: web::Query, + state: web::Data>, ) -> Result { - let stream = download_stream(&client, &query.url, &config).await?; + let stream = download_stream(&query.url, &state).await?; if query.backgrounded { - do_download_backgrounded(stream, repo, store).await + do_download_backgrounded(stream, state).await } else { - do_download_inline(stream, &tmp_dir, &policy_dir, repo, store, &client, config).await + do_download_inline(stream, &state).await } } -async fn download_stream( - client: &ClientWithMiddleware, +async fn download_stream( url: &str, - config: &Configuration, + state: &State, ) -> Result> + 'static, Error> { - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } - let res = client.get(url).send().await?; + let res = state.client.get(url).send().await?; if !res.status().is_success() { return Err(UploadError::Download(res.status()).into()); } let stream = crate::stream::limit( - config.media.max_file_size * MEGABYTES, + state.config.media.max_file_size * MEGABYTES, crate::stream::from_err(res.bytes_stream()), ); Ok(stream) } -#[tracing::instrument( - name = "Downloading file inline", - skip(stream, tmp_dir, policy_dir, repo, store, client, config) -)] +#[tracing::instrument(name = "Downloading file inline", skip(stream, state))] async fn do_download_inline( stream: impl Stream> + 'static, - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: web::Data, - store: web::Data, - client: &ClientWithMiddleware, - config: web::Data, + state: &State, ) -> Result { metrics::counter!("pict-rs.files", "download" => "inline").increment(1); - let (alias, delete_token, details) = - ingest_inline(stream, tmp_dir, policy_dir, &repo, &store, client, &config).await?; + let (alias, delete_token, details) = ingest_inline(stream, state).await?; Ok(HttpResponse::Created().json(&serde_json::json!({ "msg": "ok", @@ -671,11 +532,10 @@ async fn do_download_inline( }))) } -#[tracing::instrument(name = "Downloading file in background", skip(stream, repo, store))] +#[tracing::instrument(name = "Downloading file in background", skip(stream, state))] async fn do_download_backgrounded( stream: impl Stream> + 'static, - repo: web::Data, - store: web::Data, + state: web::Data>, ) -> Result { metrics::counter!("pict-rs.files", "download" => "background").increment(1); @@ -727,9 +587,9 @@ struct HashJson { } /// Get a page of hashes -#[tracing::instrument(name = "Hash Page", skip(repo))] -async fn page( - repo: web::Data, +#[tracing::instrument(name = "Hash Page", skip(state))] +async fn page( + state: web::Data>, web::Query(PageQuery { slug, timestamp, @@ -739,9 +599,12 @@ async fn page( let limit = limit.unwrap_or(20); let page = if let Some(timestamp) = timestamp { - repo.hash_page_by_date(timestamp.timestamp, limit).await? + state + .repo + .hash_page_by_date(timestamp.timestamp, limit) + .await? } else { - repo.hash_page(slug, limit).await? + state.repo.hash_page(slug, limit).await? }; let mut hashes = Vec::with_capacity(page.hashes.len()); @@ -755,9 +618,11 @@ async fn page( .map(|a| a.to_string()) .collect(); - let identifier = repo.identifier(hash.clone()).await?; + let identifier = state.repo.identifier(hash.clone()).await?; let details = if let Some(identifier) = identifier { - repo.details(&identifier) + state + .repo + .details(&identifier) .await? .map(|d| d.into_api_details()) } else { @@ -786,13 +651,12 @@ async fn page( } /// Delete aliases and files -#[tracing::instrument(name = "Deleting file", skip(repo, config))] -async fn delete( - repo: web::Data, - config: web::Data, +#[tracing::instrument(name = "Deleting file", skip(state))] +async fn delete( + state: web::Data>, path_entries: web::Path<(String, String)>, ) -> Result { - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -802,7 +666,7 @@ async fn delete( let alias = Alias::from_existing(&alias); // delete alias inline - queue::cleanup::alias(&repo, alias, token).await?; + queue::cleanup::alias(&state.repo, alias, token).await?; Ok(HttpResponse::NoContent().finish()) } @@ -844,19 +708,18 @@ fn prepare_process( Ok((format, thumbnail_path, thumbnail_args)) } -#[tracing::instrument(name = "Fetching derived details", skip(repo, config))] +#[tracing::instrument(name = "Fetching derived details", skip(state))] async fn process_details( web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, - repo: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { let alias = match source { ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => { Serde::into_inner(alias) } ProcessSource::Proxy { proxy } => { - let Some(alias) = repo.related(proxy).await? else { + let Some(alias) = state.repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().json(&serde_json::json!({ "msg": "No images associated with provided proxy url" }))); @@ -865,9 +728,9 @@ async fn process_details( } }; - let (_, thumbnail_path, _) = prepare_process(&config, operations, ext.as_str())?; + let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?; - let Some(hash) = repo.hash(&alias).await? else { + let Some(hash) = state.repo.hash(&alias).await? else { // Invalid alias return Ok(HttpResponse::NotFound().json(&serde_json::json!({ "msg": "No images associated with provided alias", @@ -876,17 +739,20 @@ async fn process_details( let thumbnail_string = thumbnail_path.to_string_lossy().to_string(); - if !config.server.read_only { - repo.accessed_variant(hash.clone(), thumbnail_string.clone()) + if !state.config.server.read_only { + state + .repo + .accessed_variant(hash.clone(), thumbnail_string.clone()) .await?; } - let identifier = repo + let identifier = state + .repo .variant_identifier(hash, thumbnail_string) .await? .ok_or(UploadError::MissingAlias)?; - let details = repo.details(&identifier).await?; + let details = state.repo.details(&identifier).await?; let details = details.ok_or(UploadError::NoFiles)?; @@ -913,20 +779,12 @@ async fn not_found_hash(repo: &ArcRepo) -> Result, Error> /// Process files #[allow(clippy::too_many_arguments)] -#[tracing::instrument( - name = "Serving processed image", - skip(tmp_dir, policy_dir, repo, store, client, config, process_map) -)] +#[tracing::instrument(name = "Serving processed image", skip(state, process_map))] async fn process( range: Option>, web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - client: web::Data, - config: web::Data, + state: web::Data>, process_map: web::Data, ) -> Result { let alias = match source { @@ -934,31 +792,22 @@ async fn process( Serde::into_inner(alias) } ProcessSource::Proxy { proxy } => { - let alias = if let Some(alias) = repo.related(proxy.clone()).await? { + let alias = if let Some(alias) = state.repo.related(proxy.clone()).await? { alias } else if !config.server.read_only { - let stream = download_stream(&client, proxy.as_str(), &config).await?; + let stream = download_stream(proxy.as_str(), &state).await?; - let (alias, _, _) = ingest_inline( - stream, - &tmp_dir, - &policy_dir, - &repo, - &store, - &client, - &config, - ) - .await?; + let (alias, _, _) = ingest_inline(stream, &state).await?; - repo.relate_url(proxy, alias.clone()).await?; + state.repo.relate_url(proxy, alias.clone()).await?; alias } else { return Err(UploadError::ReadOnly.into()); }; - if !config.server.read_only { - repo.accessed_alias(alias.clone()).await?; + if !state.config.server.read_only { + state.repo.accessed_alias(alias.clone()).await?; } alias @@ -966,59 +815,54 @@ async fn process( }; let (format, thumbnail_path, thumbnail_args) = - prepare_process(&config, operations, ext.as_str())?; + prepare_process(&state.config, operations, ext.as_str())?; let path_string = thumbnail_path.to_string_lossy().to_string(); - let (hash, alias, not_found) = if let Some(hash) = repo.hash(&alias).await? { + let (hash, alias, not_found) = if let Some(hash) = state.repo.hash(&alias).await? { (hash, alias, false) } else { - let Some((alias, hash)) = not_found_hash(&repo).await? else { + let Some((alias, hash)) = not_found_hash(&state.repo).await? else { return Ok(HttpResponse::NotFound().finish()); }; (hash, alias, true) }; - if !config.server.read_only { - repo.accessed_variant(hash.clone(), path_string.clone()) + if !state.config.server.read_only { + state + .repo + .accessed_variant(hash.clone(), path_string.clone()) .await?; } let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; if let Some(identifier) = identifier_opt { - let details = - ensure_details_identifier(&tmp_dir, &policy_dir, &repo, &store, &config, &identifier) - .await?; + let details = ensure_details_identifier(&state, &identifier).await?; - if let Some(public_url) = store.public_url(&identifier) { + if let Some(public_url) = state.store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) .finish()); } - return ranged_file_resp(&store, identifier, range, details, not_found).await; + return ranged_file_resp(&state.store, identifier, range, details, not_found).await; } if config.server.read_only { return Err(UploadError::ReadOnly.into()); } - let original_details = - ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias).await?; + let original_details = ensure_details(&state, &alias).await?; let (details, bytes) = generate::generate( - &tmp_dir, - &policy_dir, - &repo, - &store, + &state, &process_map, format, thumbnail_path, thumbnail_args, &original_details, - &config, hash, ) .await?; @@ -1058,78 +902,71 @@ async fn process( } #[allow(clippy::too_many_arguments)] -#[tracing::instrument( - name = "Serving processed image headers", - skip(tmp_dir, policy_dir, repo, store, config) -)] +#[tracing::instrument(name = "Serving processed image headers", skip(state))] async fn process_head( range: Option>, web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { let alias = match source { ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => { Serde::into_inner(alias) } ProcessSource::Proxy { proxy } => { - let Some(alias) = repo.related(proxy).await? else { + let Some(alias) = state.repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().finish()); }; alias } }; - let (_, thumbnail_path, _) = prepare_process(&config, operations, ext.as_str())?; + let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?; let path_string = thumbnail_path.to_string_lossy().to_string(); - let Some(hash) = repo.hash(&alias).await? else { + let Some(hash) = state.repo.hash(&alias).await? else { // Invalid alias return Ok(HttpResponse::NotFound().finish()); }; - if !config.server.read_only { + if !state.config.server.read_only { repo.accessed_variant(hash.clone(), path_string.clone()) .await?; } - let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; + let identifier_opt = state + .repo + .variant_identifier(hash.clone(), path_string) + .await?; if let Some(identifier) = identifier_opt { - let details = - ensure_details_identifier(&tmp_dir, &policy_dir, &repo, &store, &config, &identifier) - .await?; + let details = ensure_details_identifier(&state, &identifier).await?; - if let Some(public_url) = store.public_url(&identifier) { + if let Some(public_url) = state.store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) .finish()); } - return ranged_file_head_resp(&store, identifier, range, details).await; + return ranged_file_head_resp(&state.store, identifier, range, details).await; } Ok(HttpResponse::NotFound().finish()) } /// Process files -#[tracing::instrument(name = "Spawning image process", skip(repo))] +#[tracing::instrument(name = "Spawning image process", skip(state))] async fn process_backgrounded( web::Query(ProcessQuery { source, operations }): web::Query, ext: web::Path, - repo: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { let source = match source { ProcessSource::Alias { alias } | ProcessSource::Source { src: alias } => { Serde::into_inner(alias) } ProcessSource::Proxy { proxy } => { - let Some(alias) = repo.related(proxy).await? else { + let Some(alias) = state.repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().finish()); }; alias @@ -1137,46 +974,49 @@ async fn process_backgrounded( }; let (target_format, process_path, process_args) = - prepare_process(&config, operations, ext.as_str())?; + prepare_process(&state.config, operations, ext.as_str())?; let path_string = process_path.to_string_lossy().to_string(); - let Some(hash) = repo.hash(&source).await? else { + let Some(hash) = state.repo.hash(&source).await? else { // Invalid alias return Ok(HttpResponse::BadRequest().finish()); }; - let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; + let identifier_opt = state + .repo + .variant_identifier(hash.clone(), path_string) + .await?; if identifier_opt.is_some() { return Ok(HttpResponse::Accepted().finish()); } - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } - queue_generate(&repo, target_format, source, process_path, process_args).await?; + queue_generate( + &state.repo, + target_format, + source, + process_path, + process_args, + ) + .await?; Ok(HttpResponse::Accepted().finish()) } /// Fetch file details -#[tracing::instrument( - name = "Fetching query details", - skip(tmp_dir, policy_dir, repo, store, config) -)] +#[tracing::instrument(name = "Fetching query details", skip(state))] async fn details_query( web::Query(alias_query): web::Query, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Proxy { proxy } => { - let Some(alias) = repo.related(proxy).await? else { + let Some(alias) = state.repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().json(&serde_json::json!({ "msg": "Provided proxy URL has not been cached", }))); @@ -1185,232 +1025,147 @@ async fn details_query( } }; - do_details(alias, tmp_dir, policy_dir, repo, store, config).await + let details = ensure_details(&state, &alias).await?; + + Ok(HttpResponse::Ok().json(&details.into_api_details())) } /// Fetch file details -#[tracing::instrument( - name = "Fetching details", - skip(tmp_dir, policy_dir, repo, store, config) -)] +#[tracing::instrument(name = "Fetching details", skip(state))] async fn details( alias: web::Path>, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - do_details( - Serde::into_inner(alias.into_inner()), - tmp_dir, - policy_dir, - repo, - store, - config, - ) - .await -} - -async fn do_details( - alias: Alias, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, -) -> Result { - let details = ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias).await?; + let details = ensure_details(&state, &alias).await?; Ok(HttpResponse::Ok().json(&details.into_api_details())) } /// Serve files based on alias query #[allow(clippy::too_many_arguments)] -#[tracing::instrument( - name = "Serving file query", - skip(tmp_dir, policy_dir, repo, store, client, config) -)] +#[tracing::instrument(name = "Serving file query", skip(state))] async fn serve_query( range: Option>, web::Query(alias_query): web::Query, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - client: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Proxy { proxy } => { - let alias = if let Some(alias) = repo.related(proxy.clone()).await? { + let alias = if let Some(alias) = state.repo.related(proxy.clone()).await? { alias - } else if !config.server.read_only { - let stream = download_stream(&client, proxy.as_str(), &config).await?; + } else if !state.config.server.read_only { + let stream = download_stream(proxy.as_str(), &state).await?; - let (alias, _, _) = ingest_inline( - stream, - &tmp_dir, - &policy_dir, - &repo, - &store, - &client, - &config, - ) - .await?; + let (alias, _, _) = ingest_inline(stream, &state).await?; - repo.relate_url(proxy, alias.clone()).await?; + state.repo.relate_url(proxy, alias.clone()).await?; alias } else { return Err(UploadError::ReadOnly.into()); }; - if !config.server.read_only { - repo.accessed_alias(alias.clone()).await?; + if !state.config.server.read_only { + state.repo.accessed_alias(alias.clone()).await?; } alias } }; - do_serve(range, alias, tmp_dir, policy_dir, repo, store, config).await + do_serve(range, alias, state).await } /// Serve files -#[tracing::instrument(name = "Serving file", skip(tmp_dir, policy_dir, repo, store, config))] +#[tracing::instrument(name = "Serving file", skip(state))] async fn serve( range: Option>, alias: web::Path>, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - do_serve( - range, - Serde::into_inner(alias.into_inner()), - tmp_dir, - policy_dir, - repo, - store, - config, - ) - .await + do_serve(range, Serde::into_inner(alias.into_inner()), state).await } async fn do_serve( range: Option>, alias: Alias, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - let (hash, alias, not_found) = if let Some(hash) = repo.hash(&alias).await? { + let (hash, alias, not_found) = if let Some(hash) = state.repo.hash(&alias).await? { (hash, alias, false) } else { - let Some((alias, hash)) = not_found_hash(&repo).await? else { + let Some((alias, hash)) = not_found_hash(&state.repo).await? else { return Ok(HttpResponse::NotFound().finish()); }; (hash, alias, true) }; - let Some(identifier) = repo.identifier(hash.clone()).await? else { + let Some(identifier) = state.repo.identifier(hash.clone()).await? else { tracing::warn!("Original File identifier for hash {hash:?} is missing, queue cleanup task",); - crate::queue::cleanup_hash(&repo, hash).await?; + crate::queue::cleanup_hash(&state.repo, hash).await?; return Ok(HttpResponse::NotFound().finish()); }; - let details = ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias).await?; + let details = ensure_details(&state, &alias).await?; - if let Some(public_url) = store.public_url(&identifier) { + if let Some(public_url) = state.store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) .finish()); } - ranged_file_resp(&store, identifier, range, details, not_found).await + ranged_file_resp(&state.store, identifier, range, details, not_found).await } -#[tracing::instrument( - name = "Serving query file headers", - skip(repo, tmp_dir, policy_dir, store, config) -)] +#[tracing::instrument(name = "Serving query file headers", skip(state))] async fn serve_query_head( range: Option>, web::Query(alias_query): web::Query, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Proxy { proxy } => { - let Some(alias) = repo.related(proxy).await? else { + let Some(alias) = state.repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().finish()); }; alias } }; - do_serve_head(range, alias, tmp_dir, policy_dir, repo, store, config).await + do_serve_head(range, alias, state).await } -#[tracing::instrument( - name = "Serving file headers", - skip(tmp_dir, policy_dir, repo, store, config) -)] +#[tracing::instrument(name = "Serving file headers", skip(state))] async fn serve_head( range: Option>, alias: web::Path>, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - do_serve_head( - range, - Serde::into_inner(alias.into_inner()), - tmp_dir, - policy_dir, - repo, - store, - config, - ) - .await + do_serve_head(range, Serde::into_inner(alias.into_inner()), state).await } async fn do_serve_head( range: Option>, alias: Alias, - tmp_dir: web::Data, - policy_dir: web::Data, - repo: web::Data, - store: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - let Some(identifier) = repo.identifier_from_alias(&alias).await? else { + let Some(identifier) = state.repo.identifier_from_alias(&alias).await? else { // Invalid alias return Ok(HttpResponse::NotFound().finish()); }; - let details = ensure_details(&tmp_dir, &policy_dir, &repo, &store, &config, &alias).await?; + let details = ensure_details(&state, &alias).await?; - if let Some(public_url) = store.public_url(&identifier) { + if let Some(public_url) = state.store.public_url(&identifier) { return Ok(HttpResponse::SeeOther() .insert_header((actix_web::http::header::LOCATION, public_url.as_str())) .finish()); } - ranged_file_head_resp(&store, identifier, range, details).await + ranged_file_head_resp(&state.store, identifier, range, details).await } async fn ranged_file_head_resp( @@ -1562,14 +1317,18 @@ struct PruneQuery { force: bool, } -#[tracing::instrument(name = "Prune missing identifiers", skip(repo))] -async fn prune_missing( - repo: web::Data, +#[tracing::instrument(name = "Prune missing identifiers", skip(state))] +async fn prune_missing( + state: web::Data>, query: Option>, ) -> Result { - let total = repo.size().await?; + if state.config.server.read_only { + return Err(UploadError::ReadOnly.into()); + } - let progress = if let Some(progress) = repo.get("prune-missing-queued").await? { + let total = state.repo.size().await?; + + let progress = if let Some(progress) = state.repo.get("prune-missing-queued").await? { progress .as_ref() .try_into() @@ -1579,12 +1338,12 @@ async fn prune_missing( 0 }; - let complete = repo.get("prune-missing-complete").await?.is_some(); + let complete = state.repo.get("prune-missing-complete").await?.is_some(); - let started = repo.get("prune-missing-started").await?.is_some(); + let started = state.repo.get("prune-missing-started").await?.is_some(); if !started || query.is_some_and(|q| q.force) { - queue::prune_missing(&repo).await?; + queue::prune_missing(&state.repo).await?; } Ok(HttpResponse::Ok().json(PruneResponse { @@ -1594,16 +1353,13 @@ async fn prune_missing( })) } -#[tracing::instrument(name = "Spawning variant cleanup", skip(repo, config))] -async fn clean_variants( - repo: web::Data, - config: web::Data, -) -> Result { - if config.server.read_only { +#[tracing::instrument(name = "Spawning variant cleanup", skip(state))] +async fn clean_variants(state: web::Data>) -> Result { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } - queue::cleanup_all_variants(&repo).await?; + queue::cleanup_all_variants(&state.repo).await?; Ok(HttpResponse::NoContent().finish()) } @@ -1614,14 +1370,12 @@ enum AliasQuery { Alias { alias: Serde }, } -#[tracing::instrument(name = "Setting 404 Image", skip(repo, config))] -async fn set_not_found( +#[tracing::instrument(name = "Setting 404 Image", skip(state))] +async fn set_not_found( json: web::Json, - repo: web::Data, - client: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } @@ -1634,47 +1388,49 @@ async fn set_not_found( } }; - if repo.hash(&alias).await?.is_none() { + if state.repo.hash(&alias).await?.is_none() { return Ok(HttpResponse::BadRequest().json(serde_json::json!({ "msg": "No hash associated with provided alias" }))); } - repo.set(NOT_FOUND_KEY, alias.to_bytes().into()).await?; + state + .repo + .set(NOT_FOUND_KEY, alias.to_bytes().into()) + .await?; Ok(HttpResponse::Created().json(serde_json::json!({ "msg": "ok", }))) } -#[tracing::instrument(name = "Purging file", skip(repo, config))] -async fn purge( +#[tracing::instrument(name = "Purging file", skip(state))] +async fn purge( web::Query(alias_query): web::Query, - repo: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Proxy { proxy } => { - let Some(alias) = repo.related(proxy).await? else { + let Some(alias) = state.repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().finish()); }; alias } }; - let aliases = repo.aliases_from_alias(&alias).await?; + let aliases = state.repo.aliases_from_alias(&alias).await?; - let Some(hash) = repo.hash(&alias).await? else { + let Some(hash) = state.repo.hash(&alias).await? else { return Ok(HttpResponse::BadRequest().json(&serde_json::json!({ "msg": "No images associated with provided alias", }))); }; - queue::cleanup_hash(&repo, hash).await?; + queue::cleanup_hash(&state.repo, hash).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", @@ -1682,28 +1438,27 @@ async fn purge( }))) } -#[tracing::instrument(name = "Deleting alias", skip(repo, config))] -async fn delete_alias( +#[tracing::instrument(name = "Deleting alias", skip(state))] +async fn delete_alias( web::Query(alias_query): web::Query, - repo: web::Data, - config: web::Data, + state: web::Data>, ) -> Result { - if config.server.read_only { + if state.config.server.read_only { return Err(UploadError::ReadOnly.into()); } let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Proxy { proxy } => { - let Some(alias) = repo.related(proxy).await? else { + let Some(alias) = state.repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().finish()); }; alias } }; - if let Some(token) = repo.delete_token(&alias).await? { - queue::cleanup_alias(&repo, alias, token).await?; + if let Some(token) = state.repo.delete_token(&alias).await? { + queue::cleanup_alias(&state.repo, alias, token).await?; } else { return Ok(HttpResponse::NotFound().finish()); } @@ -1713,22 +1468,22 @@ async fn delete_alias( }))) } -#[tracing::instrument(name = "Fetching aliases", skip(repo))] -async fn aliases( +#[tracing::instrument(name = "Fetching aliases", skip(state))] +async fn aliases( web::Query(alias_query): web::Query, - repo: web::Data, + state: web::Data>, ) -> Result { let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Proxy { proxy } => { - let Some(alias) = repo.related(proxy).await? else { + let Some(alias) = state.repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().finish()); }; alias } }; - let aliases = repo.aliases_from_alias(&alias).await?; + let aliases = state.repo.aliases_from_alias(&alias).await?; Ok(HttpResponse::Ok().json(&serde_json::json!({ "msg": "ok", @@ -1736,22 +1491,22 @@ async fn aliases( }))) } -#[tracing::instrument(name = "Fetching identifier", skip(repo))] -async fn identifier( +#[tracing::instrument(name = "Fetching identifier", skip(state))] +async fn identifier( web::Query(alias_query): web::Query, - repo: web::Data, + state: web::Data>, ) -> Result { let alias = match alias_query { AliasQuery::Alias { alias } => Serde::into_inner(alias), AliasQuery::Proxy { proxy } => { - let Some(alias) = repo.related(proxy).await? else { + let Some(alias) = state.repo.related(proxy).await? else { return Ok(HttpResponse::NotFound().finish()); }; alias } }; - let Some(identifier) = repo.identifier_from_alias(&alias).await? else { + let Some(identifier) = state.repo.identifier_from_alias(&alias).await? else { // Invalid alias return Ok(HttpResponse::NotFound().json(serde_json::json!({ "msg": "No identifiers associated with provided alias" @@ -1764,13 +1519,10 @@ async fn identifier( }))) } -#[tracing::instrument(skip(repo, store))] -async fn healthz( - repo: web::Data, - store: web::Data, -) -> Result { - repo.health_check().await?; - store.health_check().await?; +#[tracing::instrument(skip(state))] +async fn healthz(state: web::Data>) -> Result { + state.repo.health_check().await?; + state.store.health_check().await?; Ok(HttpResponse::Ok().finish()) } @@ -1794,17 +1546,11 @@ fn build_client() -> Result { fn configure_endpoints( config: &mut web::ServiceConfig, - repo: ArcRepo, - store: S, - configuration: Configuration, - client: ClientWithMiddleware, + state: State, extra_config: F, ) { config - .app_data(web::Data::new(repo)) - .app_data(web::Data::new(store)) - .app_data(web::Data::new(client)) - .app_data(web::Data::new(configuration.clone())) + .app_data(web::Data::new(state)) .route("/healthz", web::get().to(healthz::)) .service( web::scope("/image") @@ -1825,8 +1571,8 @@ fn configure_endpoints( .service(web::resource("/download").route(web::get().to(download::))) .service( web::resource("/delete/{delete_token}/{filename}") - .route(web::delete().to(delete)) - .route(web::get().to(delete)), + .route(web::delete().to(delete::)) + .route(web::get().to(delete::)), ) .service( web::scope("/original") @@ -1868,23 +1614,23 @@ fn configure_endpoints( .service( web::scope("/internal") .wrap(Internal( - configuration.server.api_key.as_ref().map(|s| s.to_owned()), + state.config.server.api_key.as_ref().map(|s| s.to_owned()), )) .service(web::resource("/import").route(web::post().to(import::))) - .service(web::resource("/variants").route(web::delete().to(clean_variants))) - .service(web::resource("/purge").route(web::post().to(purge))) - .service(web::resource("/delete").route(web::post().to(delete_alias))) - .service(web::resource("/aliases").route(web::get().to(aliases))) + .service(web::resource("/variants").route(web::delete().to(clean_variants::))) + .service(web::resource("/purge").route(web::post().to(purge::))) + .service(web::resource("/delete").route(web::post().to(delete_alias::))) + .service(web::resource("/aliases").route(web::get().to(aliases::))) .service(web::resource("/identifier").route(web::get().to(identifier::))) - .service(web::resource("/set_not_found").route(web::post().to(set_not_found))) + .service(web::resource("/set_not_found").route(web::post().to(set_not_found::))) .service(web::resource("/hashes").route(web::get().to(page))) - .service(web::resource("/prune_missing").route(web::post().to(prune_missing))) + .service(web::resource("/prune_missing").route(web::post().to(prune_missing::))) .configure(extra_config), ); } -fn spawn_cleanup(repo: ArcRepo, config: &Configuration) { - if config.server.read_only { +fn spawn_cleanup(state: State) { + if state.config.server.read_only { return; } @@ -1896,14 +1642,14 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) { interval.tick().await; - if let Err(e) = queue::cleanup_outdated_variants(&repo).await { + if let Err(e) = queue::cleanup_outdated_variants(&state.repo).await { tracing::warn!( "Failed to spawn cleanup for outdated variants:{}", format!("\n{e}\n{e:?}") ); } - if let Err(e) = queue::cleanup_outdated_proxies(&repo).await { + if let Err(e) = queue::cleanup_outdated_proxies(&state.repo).await { tracing::warn!( "Failed to spawn cleanup for outdated proxies:{}", format!("\n{e}\n{e:?}") @@ -1913,33 +1659,12 @@ fn spawn_cleanup(repo: ArcRepo, config: &Configuration) { }); } -fn spawn_workers( - tmp_dir: ArcTmpDir, - policy_dir: ArcPolicyDir, - repo: ArcRepo, - store: S, - client: ClientWithMiddleware, - config: Configuration, - process_map: ProcessMap, -) where +fn spawn_workers(state: State, process_map: ProcessMap) +where S: Store + 'static, { - crate::sync::spawn( - "cleanup-worker", - queue::process_cleanup(repo.clone(), store.clone(), config.clone()), - ); - crate::sync::spawn( - "process-worker", - queue::process_images( - tmp_dir, - policy_dir, - repo, - store, - client, - process_map, - config, - ), - ); + crate::sync::spawn("cleanup-worker", queue::process_cleanup(state.clone())); + crate::sync::spawn("process-worker", queue::process_images(state, process_map)); } async fn launch_file_store( @@ -1955,38 +1680,32 @@ async fn launch_file_store( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, +async fn process_image( + state: &State, process_args: Vec, input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, - timeout: u64, write_file: F, ) -> Result where F: FnOnce(crate::file::File) -> Fut, Fut: std::future::Future>, { - let temporary_path = tmp_dir + let temporary_path = state + .tmp_dir .tmp_folder() .await .map_err(MagickError::CreateTemporaryDirectory)?; - let input_file = tmp_dir.tmp_file(None); + let input_file = state.tmp_dir.tmp_file(None); crate::store::file_store::safe_create_parent(&input_file) .await .map_err(MagickError::CreateDir)?; @@ -143,10 +141,10 @@ where let envs = [ (MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()), - (MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()), + (MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()), ]; - let reader = Process::run("magick", &args, &envs, timeout)? + let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)? .read() .add_extras(input_file) .add_extras(temporary_path); @@ -154,25 +152,20 @@ where Ok(reader) } -#[allow(clippy::too_many_arguments)] -pub(crate) async fn process_image_stream_read( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, +pub(crate) async fn process_image_stream_read( + state: &State, stream: LocalBoxStream<'static, std::io::Result>, args: Vec, input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, - timeout: u64, ) -> Result { process_image( - tmp_dir, - policy_dir, + state, args, input_format, format, quality, - timeout, |mut tmp_file| async move { tmp_file .write_from_stream(stream) @@ -184,25 +177,20 @@ pub(crate) async fn process_image_stream_read( .await } -#[allow(clippy::too_many_arguments)] pub(crate) async fn process_image_process_read( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, + state: &State, process_read: ProcessRead, args: Vec, input_format: ProcessableFormat, format: ProcessableFormat, quality: Option, - timeout: u64, ) -> Result { process_image( - tmp_dir, - policy_dir, + state, args, input_format, format, quality, - timeout, |mut tmp_file| async move { process_read .with_stdout(|stdout| async { diff --git a/src/queue.rs b/src/queue.rs index 9107c5e..5c906e0 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -188,35 +188,12 @@ pub(crate) async fn queue_generate( Ok(()) } -pub(crate) async fn process_cleanup( - repo: ArcRepo, - store: S, - config: Configuration, -) { - process_jobs(&repo, &store, &config, CLEANUP_QUEUE, cleanup::perform).await +pub(crate) async fn process_cleanup(state: State) { + process_jobs(state, CLEANUP_QUEUE, cleanup::perform).await } -pub(crate) async fn process_images( - tmp_dir: ArcTmpDir, - policy_dir: ArcPolicyDir, - repo: ArcRepo, - store: S, - client: ClientWithMiddleware, - process_map: ProcessMap, - config: Configuration, -) { - process_image_jobs( - &tmp_dir, - &policy_dir, - &repo, - &store, - &client, - &process_map, - &config, - PROCESS_QUEUE, - process::perform, - ) - .await +pub(crate) async fn process_images(state: State, process_map: ProcessMap) { + process_image_jobs(state, process_map, PROCESS_QUEUE, process::perform).await } struct MetricsGuard { @@ -250,21 +227,10 @@ impl Drop for MetricsGuard { } } -async fn process_jobs( - repo: &ArcRepo, - store: &S, - config: &Configuration, - queue: &'static str, - callback: F, -) where +async fn process_jobs(state: State, queue: &'static str, callback: F) +where S: Store, - for<'a> F: Fn( - &'a ArcRepo, - &'a S, - &'a Configuration, - serde_json::Value, - ) -> LocalBoxFuture<'a, Result<(), Error>> - + Copy, + for<'a> F: Fn(&'a State, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { let worker_id = uuid::Uuid::new_v4(); @@ -273,7 +239,7 @@ async fn process_jobs( tokio::task::yield_now().await; - let res = job_loop(repo, store, config, worker_id, queue, callback).await; + let res = job_loop(&state, worker_id, queue, callback).await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); @@ -291,22 +257,14 @@ async fn process_jobs( } async fn job_loop( - repo: &ArcRepo, - store: &S, - config: &Configuration, + state: &State, worker_id: uuid::Uuid, queue: &'static str, callback: F, ) -> Result<(), Error> where S: Store, - for<'a> F: Fn( - &'a ArcRepo, - &'a S, - &'a Configuration, - serde_json::Value, - ) -> LocalBoxFuture<'a, Result<(), Error>> - + Copy, + for<'a> F: Fn(&'a State, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { tracing::trace!("job_loop: looping"); @@ -314,20 +272,20 @@ where tokio::task::yield_now().await; async { - let (job_id, job) = repo.pop(queue, worker_id).await?; + let (job_id, job) = state.repo.pop(queue, worker_id).await?; let guard = MetricsGuard::guard(worker_id, queue); let res = heartbeat( - repo, + &state.repo, queue, worker_id, job_id, - (callback)(repo, store, config, job), + (callback)(state, job), ) .await; - repo.complete_job(queue, worker_id, job_id).await?; + state.repo.complete_job(queue, worker_id, job_id).await?; res?; @@ -340,29 +298,14 @@ where } } -#[allow(clippy::too_many_arguments)] async fn process_image_jobs( - tmp_dir: &ArcTmpDir, - policy_dir: &ArcPolicyDir, - repo: &ArcRepo, - store: &S, - client: &ClientWithMiddleware, - process_map: &ProcessMap, - config: &Configuration, + state: State, + process_map: ProcessMap, queue: &'static str, callback: F, ) where S: Store, - for<'a> F: Fn( - &'a ArcTmpDir, - &'a ArcPolicyDir, - &'a ArcRepo, - &'a S, - &'a ClientWithMiddleware, - &'a ProcessMap, - &'a Configuration, - serde_json::Value, - ) -> LocalBoxFuture<'a, Result<(), Error>> + for<'a> F: Fn(&'a State, &'a ProcessMap, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { let worker_id = uuid::Uuid::new_v4(); @@ -372,19 +315,7 @@ async fn process_image_jobs( tokio::task::yield_now().await; - let res = image_job_loop( - tmp_dir, - policy_dir, - repo, - store, - client, - process_map, - config, - worker_id, - queue, - callback, - ) - .await; + let res = image_job_loop(&state, &process_map, worker_id, queue, callback).await; if let Err(e) = res { tracing::warn!("Error processing jobs: {}", format!("{e}")); @@ -403,29 +334,15 @@ async fn process_image_jobs( #[allow(clippy::too_many_arguments)] async fn image_job_loop( - tmp_dir: &ArcTmpDir, - policy_dir: &ArcPolicyDir, - repo: &ArcRepo, - store: &S, - client: &ClientWithMiddleware, + state: &State, process_map: &ProcessMap, - config: &Configuration, worker_id: uuid::Uuid, queue: &'static str, callback: F, ) -> Result<(), Error> where S: Store, - for<'a> F: Fn( - &'a ArcTmpDir, - &'a ArcPolicyDir, - &'a ArcRepo, - &'a S, - &'a ClientWithMiddleware, - &'a ProcessMap, - &'a Configuration, - serde_json::Value, - ) -> LocalBoxFuture<'a, Result<(), Error>> + for<'a> F: Fn(&'a State, &'a ProcessMap, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, { loop { @@ -434,29 +351,20 @@ where tokio::task::yield_now().await; async { - let (job_id, job) = repo.pop(queue, worker_id).await?; + let (job_id, job) = state.repo.pop(queue, worker_id).await?; let guard = MetricsGuard::guard(worker_id, queue); let res = heartbeat( - repo, + &state.repo, queue, worker_id, job_id, - (callback)( - tmp_dir, - policy_dir, - repo, - store, - client, - process_map, - config, - job, - ), + (callback)(state, process_map, job), ) .await; - repo.complete_job(queue, worker_id, job_id).await?; + state.repo.complete_job(queue, worker_id, job_id).await?; res?; diff --git a/src/queue/cleanup.rs b/src/queue/cleanup.rs index e99cad0..473b37c 100644 --- a/src/queue/cleanup.rs +++ b/src/queue/cleanup.rs @@ -10,13 +10,12 @@ use crate::{ queue::Cleanup, repo::{Alias, ArcRepo, DeleteToken, Hash}, serde_str::Serde, + state::State, store::Store, }; pub(super) fn perform<'a, S>( - repo: &'a ArcRepo, - store: &'a S, - configuration: &'a Configuration, + state: &'a State, job: serde_json::Value, ) -> LocalBoxFuture<'a, Result<(), Error>> where @@ -25,26 +24,28 @@ where Box::pin(async move { match serde_json::from_value(job) { Ok(job) => match job { - Cleanup::Hash { hash: in_hash } => hash(repo, in_hash).await?, + Cleanup::Hash { hash: in_hash } => hash(&state.repo, in_hash).await?, Cleanup::Identifier { identifier: in_identifier, - } => identifier(repo, store, Arc::from(in_identifier)).await?, + } => identifier(&state.repo, &state.store, Arc::from(in_identifier)).await?, Cleanup::Alias { alias: stored_alias, token, } => { alias( - repo, + &state.repo, Serde::into_inner(stored_alias), Serde::into_inner(token), ) .await? } - Cleanup::Variant { hash, variant } => hash_variant(repo, hash, variant).await?, - Cleanup::AllVariants => all_variants(repo).await?, - Cleanup::OutdatedVariants => outdated_variants(repo, configuration).await?, - Cleanup::OutdatedProxies => outdated_proxies(repo, configuration).await?, - Cleanup::Prune => prune(repo, store).await?, + Cleanup::Variant { hash, variant } => { + hash_variant(&state.repo, hash, variant).await? + } + Cleanup::AllVariants => all_variants(&state.repo).await?, + Cleanup::OutdatedVariants => outdated_variants(&state.repo, &state.config).await?, + Cleanup::OutdatedProxies => outdated_proxies(&state.repo, &state.config).await?, + Cleanup::Prune => prune(&state.repo, &state.store).await?, }, Err(e) => { tracing::warn!("Invalid job: {}", format!("{e}")); diff --git a/src/queue/process.rs b/src/queue/process.rs index 9cfbed7..e2d2c9f 100644 --- a/src/queue/process.rs +++ b/src/queue/process.rs @@ -13,20 +13,15 @@ use crate::{ queue::Process, repo::{Alias, ArcRepo, UploadId, UploadResult}, serde_str::Serde, + state::State, store::Store, tmp_file::{ArcTmpDir, TmpDir}, }; use std::{path::PathBuf, sync::Arc}; -#[allow(clippy::too_many_arguments)] pub(super) fn perform<'a, S>( - tmp_dir: &'a ArcTmpDir, - policy_dir: &'a ArcPolicyDir, - repo: &'a ArcRepo, - store: &'a S, - client: &'a ClientWithMiddleware, + state: &'a State, process_map: &'a ProcessMap, - config: &'a Configuration, job: serde_json::Value, ) -> LocalBoxFuture<'a, Result<(), Error>> where @@ -41,15 +36,10 @@ where declared_alias, } => { process_ingest( - tmp_dir, - policy_dir, - repo, - store, - client, + state, Arc::from(identifier), Serde::into_inner(upload_id), declared_alias.map(Serde::into_inner), - config, ) .await? } @@ -60,16 +50,12 @@ where process_args, } => { generate( - tmp_dir, - policy_dir, - repo, - store, + state, process_map, target_format, Serde::into_inner(source), process_path, process_args, - config, ) .await? } @@ -117,18 +103,12 @@ impl Drop for UploadGuard { } } -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, client, config))] +#[tracing::instrument(skip(state))] async fn process_ingest( - tmp_dir: &ArcTmpDir, - policy_dir: &ArcPolicyDir, - repo: &ArcRepo, - store: &S, - client: &ClientWithMiddleware, + state: &State, unprocessed_identifier: Arc, upload_id: UploadId, declared_alias: Option, - config: &Configuration, ) -> Result<(), Error> where S: Store + 'static, @@ -136,33 +116,18 @@ where let guard = UploadGuard::guard(upload_id); let fut = async { - let tmp_dir = tmp_dir.clone(); - let policy_dir = policy_dir.clone(); - let ident = unprocessed_identifier.clone(); - let store2 = store.clone(); - let repo = repo.clone(); - let client = client.clone(); + let state2 = state.clone(); let current_span = Span::current(); let span = tracing::info_span!(parent: current_span, "error_boundary"); - let config = config.clone(); let error_boundary = crate::sync::abort_on_drop(crate::sync::spawn( "ingest-media", async move { - let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?); + let stream = + crate::stream::from_err(state2.store.to_stream(&ident, None, None).await?); - let session = crate::ingest::ingest( - &tmp_dir, - &policy_dir, - &repo, - &store2, - &client, - stream, - declared_alias, - &config, - ) - .await?; + let session = crate::ingest::ingest(&state2, stream, declared_alias).await?; Ok(session) as Result } @@ -170,7 +135,7 @@ where )) .await; - store.remove(&unprocessed_identifier).await?; + state.store.remove(&unprocessed_identifier).await?; error_boundary.map_err(|_| UploadError::Canceled)? }; @@ -191,62 +156,46 @@ where } }; - repo.complete_upload(upload_id, result).await?; + state.repo.complete_upload(upload_id, result).await?; guard.disarm(); Ok(()) } -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip( - tmp_dir, - policy_dir, - repo, - store, - process_map, - process_path, - process_args, - config -))] +#[tracing::instrument(skip(state, process_map, process_path, process_args))] async fn generate( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, - repo: &ArcRepo, - store: &S, + state: &State, process_map: &ProcessMap, target_format: InputProcessableFormat, source: Alias, process_path: PathBuf, process_args: Vec, - config: &Configuration, ) -> Result<(), Error> { - let Some(hash) = repo.hash(&source).await? else { + let Some(hash) = state.repo.hash(&source).await? else { // Nothing to do return Ok(()); }; let path_string = process_path.to_string_lossy().to_string(); - let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?; + let identifier_opt = state + .repo + .variant_identifier(hash.clone(), path_string) + .await?; if identifier_opt.is_some() { return Ok(()); } - let original_details = - crate::ensure_details(tmp_dir, policy_dir, repo, store, config, &source).await?; + let original_details = crate::ensure_details(state, &source).await?; crate::generate::generate( - tmp_dir, - policy_dir, - repo, - store, + state, process_map, target_format, process_path, process_args, &original_details, - config, hash, ) .await?; diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..1319e09 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,13 @@ +use reqwest_middleware::ClientWithMiddleware; + +use crate::{config::Configuration, magick::ArcPolicyDir, repo::ArcRepo, tmp_file::ArcTmpDir}; + +#[derive(Clone)] +pub(crate) struct State { + pub(super) config: Configuration, + pub(super) tmp_dir: ArcTmpDir, + pub(super) policy_dir: ArcPolicyDir, + pub(super) repo: ArcRepo, + pub(super) store: S, + pub(super) client: ClientWithMiddleware, +} diff --git a/src/validate.rs b/src/validate.rs index 14c9a21..bed66e7 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -8,10 +8,11 @@ use crate::{ error_code::ErrorCode, formats::{ AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat, - InternalFormat, Validations, + InternalFormat, }, magick::PolicyDir, process::ProcessRead, + state::State, tmp_file::TmpDir, }; use actix_web::web::Bytes; @@ -57,12 +58,9 @@ impl ValidationError { const MEGABYTES: usize = 1024 * 1024; #[tracing::instrument(skip_all)] -pub(crate) async fn validate_bytes( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, +pub(crate) async fn validate_bytes( + state: &State, bytes: Bytes, - validations: Validations<'_>, - timeout: u64, ) -> Result<(InternalFormat, ProcessRead), Error> { if bytes.is_empty() { return Err(ValidationError::Empty.into()); @@ -77,66 +75,35 @@ pub(crate) async fn validate_bytes( match &input { InputFile::Image(input) => { - let (format, process_read) = process_image( - tmp_dir, - policy_dir, - bytes, - *input, - width, - height, - validations.image, - timeout, - ) - .await?; + let (format, process_read) = process_image(state, bytes, *input, width, height).await?; Ok((format, process_read)) } InputFile::Animation(input) => { - let (format, process_read) = process_animation( - tmp_dir, - policy_dir, - bytes, - *input, - width, - height, - frames.unwrap_or(1), - validations.animation, - timeout, - ) - .await?; + let (format, process_read) = + process_animation(state, bytes, *input, width, height, frames.unwrap_or(1)).await?; Ok((format, process_read)) } InputFile::Video(input) => { - let (format, process_read) = process_video( - tmp_dir, - bytes, - *input, - width, - height, - frames.unwrap_or(1), - validations.video, - timeout, - ) - .await?; + let (format, process_read) = + process_video(state, bytes, *input, width, height, frames.unwrap_or(1)).await?; Ok((format, process_read)) } } } -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(tmp_dir, policy_dir, bytes, validations))] -async fn process_image( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, +#[tracing::instrument(skip(state, bytes))] +async fn process_image( + state: &State, bytes: Bytes, input: ImageInput, width: u16, height: u16, - validations: &crate::config::Image, - timeout: u64, ) -> Result<(InternalFormat, ProcessRead), Error> { + let validations = &state.config.media.image; + if width > validations.max_width { return Err(ValidationError::Width.into()); } @@ -158,16 +125,7 @@ async fn process_image( let process_read = if needs_transcode { let quality = validations.quality_for(format); - magick::convert_image( - tmp_dir, - policy_dir, - input.format, - format, - quality, - timeout, - bytes, - ) - .await? + magick::convert_image(state, input.format, format, quality, bytes).await? } else { exiftool::clear_metadata_bytes_read(bytes, timeout)? }; @@ -201,19 +159,17 @@ fn validate_animation( Ok(()) } -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(tmp_dir, policy_dir, bytes, validations))] +#[tracing::instrument(skip(state, bytes))] async fn process_animation( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, + state: &State, bytes: Bytes, input: AnimationFormat, width: u16, height: u16, frames: u32, - validations: &crate::config::Animation, - timeout: u64, ) -> Result<(InternalFormat, ProcessRead), Error> { + let validations = &state.config.media.animation; + validate_animation(bytes.len(), width, height, frames, validations)?; let AnimationOutput { @@ -224,10 +180,9 @@ async fn process_animation( let process_read = if needs_transcode { let quality = validations.quality_for(format); - magick::convert_animation(tmp_dir, policy_dir, input, format, quality, timeout, bytes) - .await? + magick::convert_animation(state, input, format, quality, bytes).await? } else { - exiftool::clear_metadata_bytes_read(bytes, timeout)? + exiftool::clear_metadata_bytes_read(bytes, state.config.media.process_timeout)? }; Ok((InternalFormat::Animation(format), process_read)) @@ -262,18 +217,17 @@ fn validate_video( Ok(()) } -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(tmp_dir, bytes, validations))] -async fn process_video( - tmp_dir: &TmpDir, +#[tracing::instrument(skip(state, bytes))] +async fn process_video( + state: &State, bytes: Bytes, input: InputVideoFormat, width: u16, height: u16, frames: u32, - validations: &crate::config::Video, - timeout: u64, ) -> Result<(InternalFormat, ProcessRead), Error> { + let validations = &state.config.media.video; + validate_video(bytes.len(), width, height, frames, validations)?; let output = input.build_output( @@ -284,7 +238,15 @@ async fn process_video( let crf = validations.crf_for(width, height); - let process_read = ffmpeg::transcode_bytes(tmp_dir, input, output, crf, timeout, bytes).await?; + let process_read = ffmpeg::transcode_bytes( + &state.tmp_dir, + input, + output, + crf, + state.config.media.process_timeout, + bytes, + ) + .await?; Ok(( InternalFormat::Video(output.format.internal_format()), diff --git a/src/validate/magick.rs b/src/validate/magick.rs index c1d6131..edb4bbe 100644 --- a/src/validate/magick.rs +++ b/src/validate/magick.rs @@ -9,67 +9,57 @@ use crate::{ tmp_file::TmpDir, }; -pub(super) async fn convert_image( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, +pub(super) async fn convert_image( + state: &State, input: ImageFormat, output: ImageFormat, quality: Option, - timeout: u64, bytes: Bytes, ) -> Result { convert( - tmp_dir, - policy_dir, + state, input.magick_format(), output.magick_format(), false, quality, - timeout, bytes, ) .await } -pub(super) async fn convert_animation( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, +pub(super) async fn convert_animation( + state: &State, input: AnimationFormat, output: AnimationFormat, quality: Option, - timeout: u64, bytes: Bytes, ) -> Result { convert( - tmp_dir, - policy_dir, + state, input.magick_format(), output.magick_format(), true, quality, timeout, - bytes, ) .await } -#[allow(clippy::too_many_arguments)] async fn convert( - tmp_dir: &TmpDir, - policy_dir: &PolicyDir, + state: &State, input: &'static str, output: &'static str, coalesce: bool, quality: Option, - timeout: u64, bytes: Bytes, ) -> Result { - let temporary_path = tmp_dir + let temporary_path = state + .tmp_dir .tmp_folder() .await .map_err(MagickError::CreateTemporaryDirectory)?; - let input_file = tmp_dir.tmp_file(None); + let input_file = state.tmp_dir.tmp_file(None); crate::store::file_store::safe_create_parent(&input_file) .await @@ -104,10 +94,10 @@ async fn convert( let envs = [ (MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()), - (MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()), + (MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()), ]; - let reader = Process::run("magick", &args, &envs, timeout)?.read(); + let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)?.read(); let clean_reader = reader.add_extras(input_file).add_extras(temporary_path);