mirror of
https://git.asonix.dog/asonix/pict-rs.git
synced 2025-01-04 00:28:43 +00:00
Pass a State value around rather than a bunch of arguments
This commit is contained in:
parent
c3e322f2c9
commit
cf7c753e65
11 changed files with 548 additions and 1046 deletions
120
src/generate.rs
120
src/generate.rs
|
@ -8,7 +8,8 @@ use crate::{
|
|||
formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat},
|
||||
future::{WithMetrics, WithTimeout},
|
||||
magick::PolicyDir,
|
||||
repo::{ArcRepo, Hash, VariantAlreadyExists},
|
||||
repo::{Hash, VariantAlreadyExists},
|
||||
state::State,
|
||||
store::Store,
|
||||
tmp_file::TmpDir,
|
||||
};
|
||||
|
@ -49,47 +50,43 @@ impl Drop for MetricsGuard {
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, hash, process_map, config))]
|
||||
#[tracing::instrument(skip(state, process_map, hash))]
|
||||
pub(crate) async fn generate<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
state: &State<S>,
|
||||
process_map: &ProcessMap,
|
||||
format: InputProcessableFormat,
|
||||
thumbnail_path: PathBuf,
|
||||
thumbnail_args: Vec<String>,
|
||||
original_details: &Details,
|
||||
config: &crate::config::Configuration,
|
||||
hash: Hash,
|
||||
) -> Result<(Details, Bytes), Error> {
|
||||
if config.server.danger_dummy_mode {
|
||||
let identifier = repo
|
||||
if state.config.server.danger_dummy_mode {
|
||||
let identifier = state
|
||||
.repo
|
||||
.identifier(hash)
|
||||
.await?
|
||||
.ok_or(UploadError::MissingIdentifier)?;
|
||||
|
||||
let bytes = store.to_bytes(&identifier, None, None).await?.into_bytes();
|
||||
let bytes = state
|
||||
.store
|
||||
.to_bytes(&identifier, None, None)
|
||||
.await?
|
||||
.into_bytes();
|
||||
|
||||
Ok((original_details.clone(), bytes))
|
||||
} else {
|
||||
let process_fut = process(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
state,
|
||||
format,
|
||||
thumbnail_path.clone(),
|
||||
thumbnail_args,
|
||||
original_details,
|
||||
config,
|
||||
hash.clone(),
|
||||
);
|
||||
|
||||
let (details, bytes) = process_map
|
||||
.process(hash, thumbnail_path, process_fut)
|
||||
.with_timeout(Duration::from_secs(config.media.process_timeout * 4))
|
||||
.with_timeout(Duration::from_secs(state.config.media.process_timeout * 4))
|
||||
.with_metrics("pict-rs.generate.process")
|
||||
.await
|
||||
.map_err(|_| UploadError::ProcessTimeout)??;
|
||||
|
@ -99,37 +96,21 @@ pub(crate) async fn generate<S: Store + 'static>(
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, hash, config))]
|
||||
#[tracing::instrument(skip(state, hash))]
|
||||
async fn process<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
state: &State<S>,
|
||||
output_format: InputProcessableFormat,
|
||||
thumbnail_path: PathBuf,
|
||||
thumbnail_args: Vec<String>,
|
||||
original_details: &Details,
|
||||
config: &crate::config::Configuration,
|
||||
hash: Hash,
|
||||
) -> Result<(Details, Bytes), Error> {
|
||||
let guard = MetricsGuard::guard();
|
||||
let permit = crate::process_semaphore().acquire().await?;
|
||||
|
||||
let identifier = input_identifier(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
output_format,
|
||||
hash.clone(),
|
||||
original_details,
|
||||
&config.media,
|
||||
)
|
||||
.await?;
|
||||
let identifier = input_identifier(state, output_format, hash.clone(), original_details).await?;
|
||||
|
||||
let input_details =
|
||||
crate::ensure_details_identifier(tmp_dir, policy_dir, repo, store, config, &identifier)
|
||||
.await?;
|
||||
let input_details = crate::ensure_details_identifier(state, &identifier).await?;
|
||||
|
||||
let input_format = input_details
|
||||
.internal_format()
|
||||
|
@ -139,21 +120,19 @@ async fn process<S: Store + 'static>(
|
|||
let format = input_format.process_to(output_format);
|
||||
|
||||
let quality = match format {
|
||||
ProcessableFormat::Image(format) => config.media.image.quality_for(format),
|
||||
ProcessableFormat::Animation(format) => config.media.animation.quality_for(format),
|
||||
ProcessableFormat::Image(format) => state.config.media.image.quality_for(format),
|
||||
ProcessableFormat::Animation(format) => state.config.media.animation.quality_for(format),
|
||||
};
|
||||
|
||||
let stream = store.to_stream(&identifier, None, None).await?;
|
||||
let stream = state.store.to_stream(&identifier, None, None).await?;
|
||||
|
||||
let vec = crate::magick::process_image_stream_read(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
state,
|
||||
stream,
|
||||
thumbnail_args,
|
||||
input_format,
|
||||
format,
|
||||
quality,
|
||||
config.media.process_timeout,
|
||||
)
|
||||
.await?
|
||||
.into_vec()
|
||||
|
@ -165,18 +144,20 @@ async fn process<S: Store + 'static>(
|
|||
drop(permit);
|
||||
|
||||
let details = Details::from_bytes(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
config.media.process_timeout,
|
||||
&state.tmp_dir,
|
||||
&state.policy_dir,
|
||||
&state.config.media.process_timeout,
|
||||
bytes.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let identifier = store
|
||||
let identifier = state
|
||||
.store
|
||||
.save_bytes(bytes.clone(), details.media_type())
|
||||
.await?;
|
||||
|
||||
if let Err(VariantAlreadyExists) = repo
|
||||
if let Err(VariantAlreadyExists) = state
|
||||
.repo
|
||||
.relate_variant_identifier(
|
||||
hash,
|
||||
thumbnail_path.to_string_lossy().to_string(),
|
||||
|
@ -184,10 +165,10 @@ async fn process<S: Store + 'static>(
|
|||
)
|
||||
.await?
|
||||
{
|
||||
store.remove(&identifier).await?;
|
||||
state.store.remove(&identifier).await?;
|
||||
}
|
||||
|
||||
repo.relate_details(&identifier, &details).await?;
|
||||
state.repo.relate_details(&identifier, &details).await?;
|
||||
|
||||
guard.disarm();
|
||||
|
||||
|
@ -197,14 +178,10 @@ async fn process<S: Store + 'static>(
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn input_identifier<S>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
state: &State<S>,
|
||||
output_format: InputProcessableFormat,
|
||||
hash: Hash,
|
||||
original_details: &Details,
|
||||
media: &crate::config::Media,
|
||||
) -> Result<Arc<str>, Error>
|
||||
where
|
||||
S: Store + 'static,
|
||||
|
@ -220,11 +197,12 @@ where
|
|||
};
|
||||
|
||||
if should_thumbnail {
|
||||
if let Some(identifier) = repo.motion_identifier(hash.clone()).await? {
|
||||
if let Some(identifier) = state.repo.motion_identifier(hash.clone()).await? {
|
||||
return Ok(identifier);
|
||||
};
|
||||
|
||||
let identifier = repo
|
||||
let identifier = state
|
||||
.repo
|
||||
.identifier(hash.clone())
|
||||
.await?
|
||||
.ok_or(UploadError::MissingIdentifier)?;
|
||||
|
@ -232,24 +210,22 @@ where
|
|||
let (reader, media_type) = if let Some(processable_format) =
|
||||
original_details.internal_format().processable_format()
|
||||
{
|
||||
let thumbnail_format = media.image.format.unwrap_or(ImageFormat::Webp);
|
||||
let thumbnail_format = state.config.media.image.format.unwrap_or(ImageFormat::Webp);
|
||||
|
||||
let stream = store.to_stream(&identifier, None, None).await?;
|
||||
let stream = state.store.to_stream(&identifier, None, None).await?;
|
||||
|
||||
let reader = magick::thumbnail(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
state,
|
||||
stream,
|
||||
processable_format,
|
||||
ProcessableFormat::Image(thumbnail_format),
|
||||
media.image.quality_for(thumbnail_format),
|
||||
media.process_timeout,
|
||||
config.media.image.quality_for(thumbnail_format),
|
||||
)
|
||||
.await?;
|
||||
|
||||
(reader, thumbnail_format.media_type())
|
||||
} else {
|
||||
let thumbnail_format = match media.image.format {
|
||||
let thumbnail_format = match state.config.media.image.format {
|
||||
Some(ImageFormat::Webp | ImageFormat::Avif | ImageFormat::Jxl) => {
|
||||
ffmpeg::ThumbnailFormat::Webp
|
||||
}
|
||||
|
@ -258,14 +234,14 @@ where
|
|||
};
|
||||
|
||||
let reader = ffmpeg::thumbnail(
|
||||
tmp_dir,
|
||||
store.clone(),
|
||||
state.tmp_dir,
|
||||
state.store.clone(),
|
||||
identifier,
|
||||
original_details
|
||||
.video_format()
|
||||
.unwrap_or(InternalVideoFormat::Mp4),
|
||||
thumbnail_format,
|
||||
media.process_timeout,
|
||||
state.config.media.process_timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -273,16 +249,20 @@ where
|
|||
};
|
||||
|
||||
let motion_identifier = reader
|
||||
.with_stdout(|stdout| async { store.save_async_read(stdout, media_type).await })
|
||||
.with_stdout(|stdout| async { state.store.save_async_read(stdout, media_type).await })
|
||||
.await??;
|
||||
|
||||
repo.relate_motion_identifier(hash, &motion_identifier)
|
||||
state
|
||||
.repo
|
||||
.relate_motion_identifier(hash, &motion_identifier)
|
||||
.await?;
|
||||
|
||||
return Ok(motion_identifier);
|
||||
}
|
||||
|
||||
repo.identifier(hash)
|
||||
state
|
||||
.repo
|
||||
.identifier(hash)
|
||||
.await?
|
||||
.ok_or(UploadError::MissingIdentifier)
|
||||
.map_err(From::from)
|
||||
|
|
|
@ -6,29 +6,29 @@ use crate::{
|
|||
formats::ProcessableFormat,
|
||||
magick::{MagickError, PolicyDir, MAGICK_CONFIGURE_PATH, MAGICK_TEMPORARY_PATH},
|
||||
process::{Process, ProcessRead},
|
||||
state::State,
|
||||
stream::LocalBoxStream,
|
||||
tmp_file::TmpDir,
|
||||
};
|
||||
|
||||
async fn thumbnail_animation<F, Fut>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
async fn thumbnail_animation<S, F, Fut>(
|
||||
state: &State<S>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
quality: Option<u8>,
|
||||
timeout: u64,
|
||||
write_file: F,
|
||||
) -> Result<ProcessRead, MagickError>
|
||||
where
|
||||
F: FnOnce(crate::file::File) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
||||
{
|
||||
let temporary_path = tmp_dir
|
||||
let temporary_path = state
|
||||
.tmp_dir
|
||||
.tmp_folder()
|
||||
.await
|
||||
.map_err(MagickError::CreateTemporaryDirectory)?;
|
||||
|
||||
let input_file = tmp_dir.tmp_file(None);
|
||||
let input_file = state.tmp_dir.tmp_file(None);
|
||||
crate::store::file_store::safe_create_parent(&input_file)
|
||||
.await
|
||||
.map_err(MagickError::CreateDir)?;
|
||||
|
@ -62,10 +62,10 @@ where
|
|||
|
||||
let envs = [
|
||||
(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
|
||||
];
|
||||
|
||||
let reader = Process::run("magick", &args, &envs, timeout)?
|
||||
let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
|
||||
.read()
|
||||
.add_extras(input_file)
|
||||
.add_extras(temporary_path);
|
||||
|
@ -73,22 +73,18 @@ where
|
|||
Ok(reader)
|
||||
}
|
||||
|
||||
pub(super) async fn thumbnail(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
pub(super) async fn thumbnail<S>(
|
||||
state: &State<S>,
|
||||
stream: LocalBoxStream<'static, std::io::Result<Bytes>>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
quality: Option<u8>,
|
||||
timeout: u64,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
thumbnail_animation(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
state,
|
||||
input_format,
|
||||
format,
|
||||
quality,
|
||||
timeout,
|
||||
|mut tmp_file| async move {
|
||||
tmp_file
|
||||
.write_from_stream(stream)
|
||||
|
|
110
src/ingest.rs
110
src/ingest.rs
|
@ -8,6 +8,7 @@ use crate::{
|
|||
future::WithMetrics,
|
||||
magick::PolicyDir,
|
||||
repo::{Alias, ArcRepo, DeleteToken, Hash},
|
||||
state::State,
|
||||
store::Store,
|
||||
tmp_file::TmpDir,
|
||||
};
|
||||
|
@ -19,7 +20,7 @@ use streem::IntoStreamer;
|
|||
use tracing::{Instrument, Span};
|
||||
|
||||
mod hasher;
|
||||
use hasher::{Hasher, State};
|
||||
use hasher::Hasher;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Session {
|
||||
|
@ -50,12 +51,17 @@ where
|
|||
}
|
||||
|
||||
async fn process_ingest<S>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
store: &S,
|
||||
state: &State<S>,
|
||||
stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
|
||||
media: &crate::config::Media,
|
||||
) -> Result<(InternalFormat, Arc<str>, Details, Rc<RefCell<State>>), Error>
|
||||
) -> Result<
|
||||
(
|
||||
InternalFormat,
|
||||
Arc<str>,
|
||||
Details,
|
||||
Rc<RefCell<hasher::State>>,
|
||||
),
|
||||
Error,
|
||||
>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
|
@ -65,43 +71,30 @@ where
|
|||
|
||||
let permit = crate::process_semaphore().acquire().await?;
|
||||
|
||||
let prescribed = Validations {
|
||||
image: &media.image,
|
||||
animation: &media.animation,
|
||||
video: &media.video,
|
||||
};
|
||||
|
||||
tracing::trace!("Validating bytes");
|
||||
let (input_type, process_read) = crate::validate::validate_bytes(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
bytes,
|
||||
prescribed,
|
||||
media.process_timeout,
|
||||
)
|
||||
.await?;
|
||||
let (input_type, process_read) = crate::validate::validate_bytes(state, bytes).await?;
|
||||
|
||||
let process_read = if let Some(operations) = media.preprocess_steps() {
|
||||
let process_read = if let Some(operations) = state.config.media.preprocess_steps() {
|
||||
if let Some(format) = input_type.processable_format() {
|
||||
let (_, magick_args) =
|
||||
crate::processor::build_chain(operations, format.file_extension())?;
|
||||
|
||||
let quality = match format {
|
||||
crate::formats::ProcessableFormat::Image(format) => media.image.quality_for(format),
|
||||
crate::formats::ProcessableFormat::Image(format) => {
|
||||
state.config.media.image.quality_for(format)
|
||||
}
|
||||
crate::formats::ProcessableFormat::Animation(format) => {
|
||||
media.animation.quality_for(format)
|
||||
state.config.media.animation.quality_for(format)
|
||||
}
|
||||
};
|
||||
|
||||
crate::magick::process_image_process_read(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
state,
|
||||
process_read,
|
||||
magick_args,
|
||||
format,
|
||||
format,
|
||||
quality,
|
||||
media.process_timeout,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
|
@ -111,7 +104,7 @@ where
|
|||
process_read
|
||||
};
|
||||
|
||||
let (state, identifier) = process_read
|
||||
let (hash_state, identifier) = process_read
|
||||
.with_stdout(|stdout| async move {
|
||||
let hasher_reader = Hasher::new(stdout);
|
||||
let state = hasher_reader.state();
|
||||
|
@ -119,11 +112,11 @@ where
|
|||
store
|
||||
.save_async_read(hasher_reader, input_type.media_type())
|
||||
.await
|
||||
.map(move |identifier| (state, identifier))
|
||||
.map(move |identifier| (hash_state, identifier))
|
||||
})
|
||||
.await??;
|
||||
|
||||
let bytes_stream = store.to_bytes(&identifier, None, None).await?;
|
||||
let bytes_stream = state.store.to_bytes(&identifier, None, None).await?;
|
||||
let details = Details::from_bytes(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
|
@ -134,13 +127,21 @@ where
|
|||
|
||||
drop(permit);
|
||||
|
||||
Ok((input_type, identifier, details, state))
|
||||
Ok((input_type, identifier, details, hash_state))
|
||||
}
|
||||
|
||||
async fn dummy_ingest<S>(
|
||||
store: &S,
|
||||
state: &State<S>,
|
||||
stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
|
||||
) -> Result<(InternalFormat, Arc<str>, Details, Rc<RefCell<State>>), Error>
|
||||
) -> Result<
|
||||
(
|
||||
InternalFormat,
|
||||
Arc<str>,
|
||||
Details,
|
||||
Rc<RefCell<hasher::State>>,
|
||||
),
|
||||
Error,
|
||||
>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
|
@ -156,7 +157,8 @@ where
|
|||
|
||||
let input_type = InternalFormat::Image(crate::formats::ImageFormat::Png);
|
||||
|
||||
let identifier = store
|
||||
let identifier = state
|
||||
.store
|
||||
.save_async_read(hasher_reader, input_type.media_type())
|
||||
.await?;
|
||||
|
||||
|
@ -166,41 +168,37 @@ where
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, client, stream, config))]
|
||||
#[tracing::instrument(skip(state, stream))]
|
||||
pub(crate) async fn ingest<S>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
state: &State<S>,
|
||||
stream: impl Stream<Item = Result<Bytes, Error>> + 'static,
|
||||
declared_alias: Option<Alias>,
|
||||
config: &crate::config::Configuration,
|
||||
) -> Result<Session, Error>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
let (input_type, identifier, details, state) = if config.server.danger_dummy_mode {
|
||||
dummy_ingest(store, stream).await?
|
||||
let (input_type, identifier, details, hash_state) = if state.config.server.danger_dummy_mode {
|
||||
dummy_ingest(state, stream).await?
|
||||
} else {
|
||||
process_ingest(tmp_dir, policy_dir, store, stream, &config.media).await?
|
||||
process_ingest(state, stream).await?
|
||||
};
|
||||
|
||||
let mut session = Session {
|
||||
repo: repo.clone(),
|
||||
repo: state.repo.clone(),
|
||||
delete_token: DeleteToken::generate(),
|
||||
hash: None,
|
||||
alias: None,
|
||||
identifier: Some(identifier.clone()),
|
||||
};
|
||||
|
||||
if let Some(endpoint) = &config.media.external_validation {
|
||||
if let Some(endpoint) = &state.config.media.external_validation {
|
||||
let stream = store.to_stream(&identifier, None, None).await?;
|
||||
|
||||
let response = client
|
||||
let response = state
|
||||
.client
|
||||
.post(endpoint.as_str())
|
||||
.timeout(Duration::from_secs(
|
||||
config.media.external_validation_timeout,
|
||||
state.config.media.external_validation_timeout,
|
||||
))
|
||||
.header("Content-Type", input_type.media_type().as_ref())
|
||||
.body(Body::wrap_stream(crate::stream::make_send(stream)))
|
||||
|
@ -214,13 +212,13 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
let (hash, size) = state.borrow_mut().finalize_reset();
|
||||
let (hash, size) = hash_state.borrow_mut().finalize_reset();
|
||||
|
||||
let hash = Hash::new(hash, size, input_type);
|
||||
|
||||
save_upload(&mut session, repo, store, hash.clone(), &identifier).await?;
|
||||
save_upload(&mut session, state, hash.clone(), &identifier).await?;
|
||||
|
||||
repo.relate_details(&identifier, &details).await?;
|
||||
state.repo.relate_details(&identifier, &details).await?;
|
||||
|
||||
if let Some(alias) = declared_alias {
|
||||
session.add_existing_alias(hash, alias).await?
|
||||
|
@ -234,17 +232,21 @@ where
|
|||
#[tracing::instrument(level = "trace", skip_all)]
|
||||
async fn save_upload<S>(
|
||||
session: &mut Session,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
state: &State<S>,
|
||||
hash: Hash,
|
||||
identifier: &Arc<str>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
if repo.create_hash(hash.clone(), identifier).await?.is_err() {
|
||||
if state
|
||||
.repo
|
||||
.create_hash(hash.clone(), identifier)
|
||||
.await?
|
||||
.is_err()
|
||||
{
|
||||
// duplicate upload
|
||||
store.remove(identifier).await?;
|
||||
state.store.remove(identifier).await?;
|
||||
session.identifier.take();
|
||||
return Ok(());
|
||||
}
|
||||
|
|
891
src/lib.rs
891
src/lib.rs
File diff suppressed because it is too large
Load diff
|
@ -85,27 +85,25 @@ impl MagickError {
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn process_image<F, Fut>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
async fn process_image<S, F, Fut>(
|
||||
state: &State<S>,
|
||||
process_args: Vec<String>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
quality: Option<u8>,
|
||||
timeout: u64,
|
||||
write_file: F,
|
||||
) -> Result<ProcessRead, MagickError>
|
||||
where
|
||||
F: FnOnce(crate::file::File) -> Fut,
|
||||
Fut: std::future::Future<Output = Result<crate::file::File, MagickError>>,
|
||||
{
|
||||
let temporary_path = tmp_dir
|
||||
let temporary_path = state
|
||||
.tmp_dir
|
||||
.tmp_folder()
|
||||
.await
|
||||
.map_err(MagickError::CreateTemporaryDirectory)?;
|
||||
|
||||
let input_file = tmp_dir.tmp_file(None);
|
||||
let input_file = state.tmp_dir.tmp_file(None);
|
||||
crate::store::file_store::safe_create_parent(&input_file)
|
||||
.await
|
||||
.map_err(MagickError::CreateDir)?;
|
||||
|
@ -143,10 +141,10 @@ where
|
|||
|
||||
let envs = [
|
||||
(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
|
||||
];
|
||||
|
||||
let reader = Process::run("magick", &args, &envs, timeout)?
|
||||
let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)?
|
||||
.read()
|
||||
.add_extras(input_file)
|
||||
.add_extras(temporary_path);
|
||||
|
@ -154,25 +152,20 @@ where
|
|||
Ok(reader)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn process_image_stream_read(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
pub(crate) async fn process_image_stream_read<S>(
|
||||
state: &State<S>,
|
||||
stream: LocalBoxStream<'static, std::io::Result<Bytes>>,
|
||||
args: Vec<String>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
quality: Option<u8>,
|
||||
timeout: u64,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
process_image(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
state,
|
||||
args,
|
||||
input_format,
|
||||
format,
|
||||
quality,
|
||||
timeout,
|
||||
|mut tmp_file| async move {
|
||||
tmp_file
|
||||
.write_from_stream(stream)
|
||||
|
@ -184,25 +177,20 @@ pub(crate) async fn process_image_stream_read(
|
|||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn process_image_process_read(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
state: &State<S>,
|
||||
process_read: ProcessRead,
|
||||
args: Vec<String>,
|
||||
input_format: ProcessableFormat,
|
||||
format: ProcessableFormat,
|
||||
quality: Option<u8>,
|
||||
timeout: u64,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
process_image(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
state,
|
||||
args,
|
||||
input_format,
|
||||
format,
|
||||
quality,
|
||||
timeout,
|
||||
|mut tmp_file| async move {
|
||||
process_read
|
||||
.with_stdout(|stdout| async {
|
||||
|
|
140
src/queue.rs
140
src/queue.rs
|
@ -188,35 +188,12 @@ pub(crate) async fn queue_generate(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn process_cleanup<S: Store + 'static>(
|
||||
repo: ArcRepo,
|
||||
store: S,
|
||||
config: Configuration,
|
||||
) {
|
||||
process_jobs(&repo, &store, &config, CLEANUP_QUEUE, cleanup::perform).await
|
||||
pub(crate) async fn process_cleanup<S: Store + 'static>(state: State<S>) {
|
||||
process_jobs(state, CLEANUP_QUEUE, cleanup::perform).await
|
||||
}
|
||||
|
||||
pub(crate) async fn process_images<S: Store + 'static>(
|
||||
tmp_dir: ArcTmpDir,
|
||||
policy_dir: ArcPolicyDir,
|
||||
repo: ArcRepo,
|
||||
store: S,
|
||||
client: ClientWithMiddleware,
|
||||
process_map: ProcessMap,
|
||||
config: Configuration,
|
||||
) {
|
||||
process_image_jobs(
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&repo,
|
||||
&store,
|
||||
&client,
|
||||
&process_map,
|
||||
&config,
|
||||
PROCESS_QUEUE,
|
||||
process::perform,
|
||||
)
|
||||
.await
|
||||
pub(crate) async fn process_images<S: Store + 'static>(state: State<S>, process_map: ProcessMap) {
|
||||
process_image_jobs(state, process_map, PROCESS_QUEUE, process::perform).await
|
||||
}
|
||||
|
||||
struct MetricsGuard {
|
||||
|
@ -250,21 +227,10 @@ impl Drop for MetricsGuard {
|
|||
}
|
||||
}
|
||||
|
||||
async fn process_jobs<S, F>(
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
config: &Configuration,
|
||||
queue: &'static str,
|
||||
callback: F,
|
||||
) where
|
||||
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
|
||||
where
|
||||
S: Store,
|
||||
for<'a> F: Fn(
|
||||
&'a ArcRepo,
|
||||
&'a S,
|
||||
&'a Configuration,
|
||||
serde_json::Value,
|
||||
) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||
+ Copy,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
|
||||
{
|
||||
let worker_id = uuid::Uuid::new_v4();
|
||||
|
||||
|
@ -273,7 +239,7 @@ async fn process_jobs<S, F>(
|
|||
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
let res = job_loop(repo, store, config, worker_id, queue, callback).await;
|
||||
let res = job_loop(&state, worker_id, queue, callback).await;
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||
|
@ -291,22 +257,14 @@ async fn process_jobs<S, F>(
|
|||
}
|
||||
|
||||
async fn job_loop<S, F>(
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
config: &Configuration,
|
||||
state: &State<S>,
|
||||
worker_id: uuid::Uuid,
|
||||
queue: &'static str,
|
||||
callback: F,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: Store,
|
||||
for<'a> F: Fn(
|
||||
&'a ArcRepo,
|
||||
&'a S,
|
||||
&'a Configuration,
|
||||
serde_json::Value,
|
||||
) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||
+ Copy,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
|
||||
{
|
||||
loop {
|
||||
tracing::trace!("job_loop: looping");
|
||||
|
@ -314,20 +272,20 @@ where
|
|||
tokio::task::yield_now().await;
|
||||
|
||||
async {
|
||||
let (job_id, job) = repo.pop(queue, worker_id).await?;
|
||||
let (job_id, job) = state.repo.pop(queue, worker_id).await?;
|
||||
|
||||
let guard = MetricsGuard::guard(worker_id, queue);
|
||||
|
||||
let res = heartbeat(
|
||||
repo,
|
||||
&state.repo,
|
||||
queue,
|
||||
worker_id,
|
||||
job_id,
|
||||
(callback)(repo, store, config, job),
|
||||
(callback)(state, job),
|
||||
)
|
||||
.await;
|
||||
|
||||
repo.complete_job(queue, worker_id, job_id).await?;
|
||||
state.repo.complete_job(queue, worker_id, job_id).await?;
|
||||
|
||||
res?;
|
||||
|
||||
|
@ -340,29 +298,14 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn process_image_jobs<S, F>(
|
||||
tmp_dir: &ArcTmpDir,
|
||||
policy_dir: &ArcPolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
process_map: &ProcessMap,
|
||||
config: &Configuration,
|
||||
state: State<S>,
|
||||
process_map: ProcessMap,
|
||||
queue: &'static str,
|
||||
callback: F,
|
||||
) where
|
||||
S: Store,
|
||||
for<'a> F: Fn(
|
||||
&'a ArcTmpDir,
|
||||
&'a ArcPolicyDir,
|
||||
&'a ArcRepo,
|
||||
&'a S,
|
||||
&'a ClientWithMiddleware,
|
||||
&'a ProcessMap,
|
||||
&'a Configuration,
|
||||
serde_json::Value,
|
||||
) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||
for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||
+ Copy,
|
||||
{
|
||||
let worker_id = uuid::Uuid::new_v4();
|
||||
|
@ -372,19 +315,7 @@ async fn process_image_jobs<S, F>(
|
|||
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
let res = image_job_loop(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
client,
|
||||
process_map,
|
||||
config,
|
||||
worker_id,
|
||||
queue,
|
||||
callback,
|
||||
)
|
||||
.await;
|
||||
let res = image_job_loop(&state, &process_map, worker_id, queue, callback).await;
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||
|
@ -403,29 +334,15 @@ async fn process_image_jobs<S, F>(
|
|||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn image_job_loop<S, F>(
|
||||
tmp_dir: &ArcTmpDir,
|
||||
policy_dir: &ArcPolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
state: &State<S>,
|
||||
process_map: &ProcessMap,
|
||||
config: &Configuration,
|
||||
worker_id: uuid::Uuid,
|
||||
queue: &'static str,
|
||||
callback: F,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: Store,
|
||||
for<'a> F: Fn(
|
||||
&'a ArcTmpDir,
|
||||
&'a ArcPolicyDir,
|
||||
&'a ArcRepo,
|
||||
&'a S,
|
||||
&'a ClientWithMiddleware,
|
||||
&'a ProcessMap,
|
||||
&'a Configuration,
|
||||
serde_json::Value,
|
||||
) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||
for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||
+ Copy,
|
||||
{
|
||||
loop {
|
||||
|
@ -434,29 +351,20 @@ where
|
|||
tokio::task::yield_now().await;
|
||||
|
||||
async {
|
||||
let (job_id, job) = repo.pop(queue, worker_id).await?;
|
||||
let (job_id, job) = state.repo.pop(queue, worker_id).await?;
|
||||
|
||||
let guard = MetricsGuard::guard(worker_id, queue);
|
||||
|
||||
let res = heartbeat(
|
||||
repo,
|
||||
&state.repo,
|
||||
queue,
|
||||
worker_id,
|
||||
job_id,
|
||||
(callback)(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
client,
|
||||
process_map,
|
||||
config,
|
||||
job,
|
||||
),
|
||||
(callback)(state, process_map, job),
|
||||
)
|
||||
.await;
|
||||
|
||||
repo.complete_job(queue, worker_id, job_id).await?;
|
||||
state.repo.complete_job(queue, worker_id, job_id).await?;
|
||||
|
||||
res?;
|
||||
|
||||
|
|
|
@ -10,13 +10,12 @@ use crate::{
|
|||
queue::Cleanup,
|
||||
repo::{Alias, ArcRepo, DeleteToken, Hash},
|
||||
serde_str::Serde,
|
||||
state::State,
|
||||
store::Store,
|
||||
};
|
||||
|
||||
pub(super) fn perform<'a, S>(
|
||||
repo: &'a ArcRepo,
|
||||
store: &'a S,
|
||||
configuration: &'a Configuration,
|
||||
state: &'a State<S>,
|
||||
job: serde_json::Value,
|
||||
) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||
where
|
||||
|
@ -25,26 +24,28 @@ where
|
|||
Box::pin(async move {
|
||||
match serde_json::from_value(job) {
|
||||
Ok(job) => match job {
|
||||
Cleanup::Hash { hash: in_hash } => hash(repo, in_hash).await?,
|
||||
Cleanup::Hash { hash: in_hash } => hash(&state.repo, in_hash).await?,
|
||||
Cleanup::Identifier {
|
||||
identifier: in_identifier,
|
||||
} => identifier(repo, store, Arc::from(in_identifier)).await?,
|
||||
} => identifier(&state.repo, &state.store, Arc::from(in_identifier)).await?,
|
||||
Cleanup::Alias {
|
||||
alias: stored_alias,
|
||||
token,
|
||||
} => {
|
||||
alias(
|
||||
repo,
|
||||
&state.repo,
|
||||
Serde::into_inner(stored_alias),
|
||||
Serde::into_inner(token),
|
||||
)
|
||||
.await?
|
||||
}
|
||||
Cleanup::Variant { hash, variant } => hash_variant(repo, hash, variant).await?,
|
||||
Cleanup::AllVariants => all_variants(repo).await?,
|
||||
Cleanup::OutdatedVariants => outdated_variants(repo, configuration).await?,
|
||||
Cleanup::OutdatedProxies => outdated_proxies(repo, configuration).await?,
|
||||
Cleanup::Prune => prune(repo, store).await?,
|
||||
Cleanup::Variant { hash, variant } => {
|
||||
hash_variant(&state.repo, hash, variant).await?
|
||||
}
|
||||
Cleanup::AllVariants => all_variants(&state.repo).await?,
|
||||
Cleanup::OutdatedVariants => outdated_variants(&state.repo, &state.config).await?,
|
||||
Cleanup::OutdatedProxies => outdated_proxies(&state.repo, &state.config).await?,
|
||||
Cleanup::Prune => prune(&state.repo, &state.store).await?,
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::warn!("Invalid job: {}", format!("{e}"));
|
||||
|
|
|
@ -13,20 +13,15 @@ use crate::{
|
|||
queue::Process,
|
||||
repo::{Alias, ArcRepo, UploadId, UploadResult},
|
||||
serde_str::Serde,
|
||||
state::State,
|
||||
store::Store,
|
||||
tmp_file::{ArcTmpDir, TmpDir},
|
||||
};
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn perform<'a, S>(
|
||||
tmp_dir: &'a ArcTmpDir,
|
||||
policy_dir: &'a ArcPolicyDir,
|
||||
repo: &'a ArcRepo,
|
||||
store: &'a S,
|
||||
client: &'a ClientWithMiddleware,
|
||||
state: &'a State<S>,
|
||||
process_map: &'a ProcessMap,
|
||||
config: &'a Configuration,
|
||||
job: serde_json::Value,
|
||||
) -> LocalBoxFuture<'a, Result<(), Error>>
|
||||
where
|
||||
|
@ -41,15 +36,10 @@ where
|
|||
declared_alias,
|
||||
} => {
|
||||
process_ingest(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
client,
|
||||
state,
|
||||
Arc::from(identifier),
|
||||
Serde::into_inner(upload_id),
|
||||
declared_alias.map(Serde::into_inner),
|
||||
config,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
@ -60,16 +50,12 @@ where
|
|||
process_args,
|
||||
} => {
|
||||
generate(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
state,
|
||||
process_map,
|
||||
target_format,
|
||||
Serde::into_inner(source),
|
||||
process_path,
|
||||
process_args,
|
||||
config,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
@ -117,18 +103,12 @@ impl Drop for UploadGuard {
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, repo, store, client, config))]
|
||||
#[tracing::instrument(skip(state))]
|
||||
async fn process_ingest<S>(
|
||||
tmp_dir: &ArcTmpDir,
|
||||
policy_dir: &ArcPolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
client: &ClientWithMiddleware,
|
||||
state: &State<S>,
|
||||
unprocessed_identifier: Arc<str>,
|
||||
upload_id: UploadId,
|
||||
declared_alias: Option<Alias>,
|
||||
config: &Configuration,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: Store + 'static,
|
||||
|
@ -136,33 +116,18 @@ where
|
|||
let guard = UploadGuard::guard(upload_id);
|
||||
|
||||
let fut = async {
|
||||
let tmp_dir = tmp_dir.clone();
|
||||
let policy_dir = policy_dir.clone();
|
||||
let ident = unprocessed_identifier.clone();
|
||||
let store2 = store.clone();
|
||||
let repo = repo.clone();
|
||||
let client = client.clone();
|
||||
let state2 = state.clone();
|
||||
|
||||
let current_span = Span::current();
|
||||
let span = tracing::info_span!(parent: current_span, "error_boundary");
|
||||
|
||||
let config = config.clone();
|
||||
let error_boundary = crate::sync::abort_on_drop(crate::sync::spawn(
|
||||
"ingest-media",
|
||||
async move {
|
||||
let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?);
|
||||
let stream =
|
||||
crate::stream::from_err(state2.store.to_stream(&ident, None, None).await?);
|
||||
|
||||
let session = crate::ingest::ingest(
|
||||
&tmp_dir,
|
||||
&policy_dir,
|
||||
&repo,
|
||||
&store2,
|
||||
&client,
|
||||
stream,
|
||||
declared_alias,
|
||||
&config,
|
||||
)
|
||||
.await?;
|
||||
let session = crate::ingest::ingest(&state2, stream, declared_alias).await?;
|
||||
|
||||
Ok(session) as Result<Session, Error>
|
||||
}
|
||||
|
@ -170,7 +135,7 @@ where
|
|||
))
|
||||
.await;
|
||||
|
||||
store.remove(&unprocessed_identifier).await?;
|
||||
state.store.remove(&unprocessed_identifier).await?;
|
||||
|
||||
error_boundary.map_err(|_| UploadError::Canceled)?
|
||||
};
|
||||
|
@ -191,62 +156,46 @@ where
|
|||
}
|
||||
};
|
||||
|
||||
repo.complete_upload(upload_id, result).await?;
|
||||
state.repo.complete_upload(upload_id, result).await?;
|
||||
|
||||
guard.disarm();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
process_map,
|
||||
process_path,
|
||||
process_args,
|
||||
config
|
||||
))]
|
||||
#[tracing::instrument(skip(state, process_map, process_path, process_args))]
|
||||
async fn generate<S: Store + 'static>(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
repo: &ArcRepo,
|
||||
store: &S,
|
||||
state: &State<S>,
|
||||
process_map: &ProcessMap,
|
||||
target_format: InputProcessableFormat,
|
||||
source: Alias,
|
||||
process_path: PathBuf,
|
||||
process_args: Vec<String>,
|
||||
config: &Configuration,
|
||||
) -> Result<(), Error> {
|
||||
let Some(hash) = repo.hash(&source).await? else {
|
||||
let Some(hash) = state.repo.hash(&source).await? else {
|
||||
// Nothing to do
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let path_string = process_path.to_string_lossy().to_string();
|
||||
let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?;
|
||||
let identifier_opt = state
|
||||
.repo
|
||||
.variant_identifier(hash.clone(), path_string)
|
||||
.await?;
|
||||
|
||||
if identifier_opt.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let original_details =
|
||||
crate::ensure_details(tmp_dir, policy_dir, repo, store, config, &source).await?;
|
||||
let original_details = crate::ensure_details(state, &source).await?;
|
||||
|
||||
crate::generate::generate(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
repo,
|
||||
store,
|
||||
state,
|
||||
process_map,
|
||||
target_format,
|
||||
process_path,
|
||||
process_args,
|
||||
&original_details,
|
||||
config,
|
||||
hash,
|
||||
)
|
||||
.await?;
|
||||
|
|
13
src/state.rs
Normal file
13
src/state.rs
Normal file
|
@ -0,0 +1,13 @@
|
|||
use reqwest_middleware::ClientWithMiddleware;
|
||||
|
||||
use crate::{config::Configuration, magick::ArcPolicyDir, repo::ArcRepo, tmp_file::ArcTmpDir};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct State<S> {
|
||||
pub(super) config: Configuration,
|
||||
pub(super) tmp_dir: ArcTmpDir,
|
||||
pub(super) policy_dir: ArcPolicyDir,
|
||||
pub(super) repo: ArcRepo,
|
||||
pub(super) store: S,
|
||||
pub(super) client: ClientWithMiddleware,
|
||||
}
|
108
src/validate.rs
108
src/validate.rs
|
@ -8,10 +8,11 @@ use crate::{
|
|||
error_code::ErrorCode,
|
||||
formats::{
|
||||
AnimationFormat, AnimationOutput, ImageInput, ImageOutput, InputFile, InputVideoFormat,
|
||||
InternalFormat, Validations,
|
||||
InternalFormat,
|
||||
},
|
||||
magick::PolicyDir,
|
||||
process::ProcessRead,
|
||||
state::State,
|
||||
tmp_file::TmpDir,
|
||||
};
|
||||
use actix_web::web::Bytes;
|
||||
|
@ -57,12 +58,9 @@ impl ValidationError {
|
|||
const MEGABYTES: usize = 1024 * 1024;
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn validate_bytes(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
pub(crate) async fn validate_bytes<S>(
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
validations: Validations<'_>,
|
||||
timeout: u64,
|
||||
) -> Result<(InternalFormat, ProcessRead), Error> {
|
||||
if bytes.is_empty() {
|
||||
return Err(ValidationError::Empty.into());
|
||||
|
@ -77,66 +75,35 @@ pub(crate) async fn validate_bytes(
|
|||
|
||||
match &input {
|
||||
InputFile::Image(input) => {
|
||||
let (format, process_read) = process_image(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
bytes,
|
||||
*input,
|
||||
width,
|
||||
height,
|
||||
validations.image,
|
||||
timeout,
|
||||
)
|
||||
.await?;
|
||||
let (format, process_read) = process_image(state, bytes, *input, width, height).await?;
|
||||
|
||||
Ok((format, process_read))
|
||||
}
|
||||
InputFile::Animation(input) => {
|
||||
let (format, process_read) = process_animation(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
bytes,
|
||||
*input,
|
||||
width,
|
||||
height,
|
||||
frames.unwrap_or(1),
|
||||
validations.animation,
|
||||
timeout,
|
||||
)
|
||||
.await?;
|
||||
let (format, process_read) =
|
||||
process_animation(state, bytes, *input, width, height, frames.unwrap_or(1)).await?;
|
||||
|
||||
Ok((format, process_read))
|
||||
}
|
||||
InputFile::Video(input) => {
|
||||
let (format, process_read) = process_video(
|
||||
tmp_dir,
|
||||
bytes,
|
||||
*input,
|
||||
width,
|
||||
height,
|
||||
frames.unwrap_or(1),
|
||||
validations.video,
|
||||
timeout,
|
||||
)
|
||||
.await?;
|
||||
let (format, process_read) =
|
||||
process_video(state, bytes, *input, width, height, frames.unwrap_or(1)).await?;
|
||||
|
||||
Ok((format, process_read))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, bytes, validations))]
|
||||
async fn process_image(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
#[tracing::instrument(skip(state, bytes))]
|
||||
async fn process_image<S>(
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
input: ImageInput,
|
||||
width: u16,
|
||||
height: u16,
|
||||
validations: &crate::config::Image,
|
||||
timeout: u64,
|
||||
) -> Result<(InternalFormat, ProcessRead), Error> {
|
||||
let validations = &state.config.media.image;
|
||||
|
||||
if width > validations.max_width {
|
||||
return Err(ValidationError::Width.into());
|
||||
}
|
||||
|
@ -158,16 +125,7 @@ async fn process_image(
|
|||
let process_read = if needs_transcode {
|
||||
let quality = validations.quality_for(format);
|
||||
|
||||
magick::convert_image(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
input.format,
|
||||
format,
|
||||
quality,
|
||||
timeout,
|
||||
bytes,
|
||||
)
|
||||
.await?
|
||||
magick::convert_image(state, input.format, format, quality, bytes).await?
|
||||
} else {
|
||||
exiftool::clear_metadata_bytes_read(bytes, timeout)?
|
||||
};
|
||||
|
@ -201,19 +159,17 @@ fn validate_animation(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, policy_dir, bytes, validations))]
|
||||
#[tracing::instrument(skip(state, bytes))]
|
||||
async fn process_animation(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
input: AnimationFormat,
|
||||
width: u16,
|
||||
height: u16,
|
||||
frames: u32,
|
||||
validations: &crate::config::Animation,
|
||||
timeout: u64,
|
||||
) -> Result<(InternalFormat, ProcessRead), Error> {
|
||||
let validations = &state.config.media.animation;
|
||||
|
||||
validate_animation(bytes.len(), width, height, frames, validations)?;
|
||||
|
||||
let AnimationOutput {
|
||||
|
@ -224,10 +180,9 @@ async fn process_animation(
|
|||
let process_read = if needs_transcode {
|
||||
let quality = validations.quality_for(format);
|
||||
|
||||
magick::convert_animation(tmp_dir, policy_dir, input, format, quality, timeout, bytes)
|
||||
.await?
|
||||
magick::convert_animation(state, input, format, quality, bytes).await?
|
||||
} else {
|
||||
exiftool::clear_metadata_bytes_read(bytes, timeout)?
|
||||
exiftool::clear_metadata_bytes_read(bytes, state.config.media.process_timeout)?
|
||||
};
|
||||
|
||||
Ok((InternalFormat::Animation(format), process_read))
|
||||
|
@ -262,18 +217,17 @@ fn validate_video(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(skip(tmp_dir, bytes, validations))]
|
||||
async fn process_video(
|
||||
tmp_dir: &TmpDir,
|
||||
#[tracing::instrument(skip(state, bytes))]
|
||||
async fn process_video<S>(
|
||||
state: &State<S>,
|
||||
bytes: Bytes,
|
||||
input: InputVideoFormat,
|
||||
width: u16,
|
||||
height: u16,
|
||||
frames: u32,
|
||||
validations: &crate::config::Video,
|
||||
timeout: u64,
|
||||
) -> Result<(InternalFormat, ProcessRead), Error> {
|
||||
let validations = &state.config.media.video;
|
||||
|
||||
validate_video(bytes.len(), width, height, frames, validations)?;
|
||||
|
||||
let output = input.build_output(
|
||||
|
@ -284,7 +238,15 @@ async fn process_video(
|
|||
|
||||
let crf = validations.crf_for(width, height);
|
||||
|
||||
let process_read = ffmpeg::transcode_bytes(tmp_dir, input, output, crf, timeout, bytes).await?;
|
||||
let process_read = ffmpeg::transcode_bytes(
|
||||
&state.tmp_dir,
|
||||
input,
|
||||
output,
|
||||
crf,
|
||||
state.config.media.process_timeout,
|
||||
bytes,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok((
|
||||
InternalFormat::Video(output.format.internal_format()),
|
||||
|
|
|
@ -9,67 +9,57 @@ use crate::{
|
|||
tmp_file::TmpDir,
|
||||
};
|
||||
|
||||
pub(super) async fn convert_image(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
pub(super) async fn convert_image<S>(
|
||||
state: &State<S>,
|
||||
input: ImageFormat,
|
||||
output: ImageFormat,
|
||||
quality: Option<u8>,
|
||||
timeout: u64,
|
||||
bytes: Bytes,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
convert(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
state,
|
||||
input.magick_format(),
|
||||
output.magick_format(),
|
||||
false,
|
||||
quality,
|
||||
timeout,
|
||||
bytes,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn convert_animation(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
pub(super) async fn convert_animation<S>(
|
||||
state: &State<S>,
|
||||
input: AnimationFormat,
|
||||
output: AnimationFormat,
|
||||
quality: Option<u8>,
|
||||
timeout: u64,
|
||||
bytes: Bytes,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
convert(
|
||||
tmp_dir,
|
||||
policy_dir,
|
||||
state,
|
||||
input.magick_format(),
|
||||
output.magick_format(),
|
||||
true,
|
||||
quality,
|
||||
timeout,
|
||||
bytes,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn convert(
|
||||
tmp_dir: &TmpDir,
|
||||
policy_dir: &PolicyDir,
|
||||
state: &State<S>,
|
||||
input: &'static str,
|
||||
output: &'static str,
|
||||
coalesce: bool,
|
||||
quality: Option<u8>,
|
||||
timeout: u64,
|
||||
bytes: Bytes,
|
||||
) -> Result<ProcessRead, MagickError> {
|
||||
let temporary_path = tmp_dir
|
||||
let temporary_path = state
|
||||
.tmp_dir
|
||||
.tmp_folder()
|
||||
.await
|
||||
.map_err(MagickError::CreateTemporaryDirectory)?;
|
||||
|
||||
let input_file = tmp_dir.tmp_file(None);
|
||||
let input_file = state.tmp_dir.tmp_file(None);
|
||||
|
||||
crate::store::file_store::safe_create_parent(&input_file)
|
||||
.await
|
||||
|
@ -104,10 +94,10 @@ async fn convert(
|
|||
|
||||
let envs = [
|
||||
(MAGICK_TEMPORARY_PATH, temporary_path.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, policy_dir.as_os_str()),
|
||||
(MAGICK_CONFIGURE_PATH, state.policy_dir.as_os_str()),
|
||||
];
|
||||
|
||||
let reader = Process::run("magick", &args, &envs, timeout)?.read();
|
||||
let reader = Process::run("magick", &args, &envs, state.config.media.process_timeout)?.read();
|
||||
|
||||
let clean_reader = reader.add_extras(input_file).add_extras(temporary_path);
|
||||
|
||||
|
|
Loading…
Reference in a new issue