Add per-upload validations and per-upload preprocess steps

This commit is contained in:
asonix 2024-03-27 19:00:54 -05:00
parent 84a882392a
commit 55bc4b64c1
8 changed files with 133 additions and 34 deletions

View file

@ -58,8 +58,8 @@ rustls-channel-resolver = "0.2.0"
rustls-pemfile = "2.0.0" rustls-pemfile = "2.0.0"
rusty-s3 = "0.5.0" rusty-s3 = "0.5.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde-tuple-vec-map = "1.0.1"
serde_json = "1.0" serde_json = "1.0"
serde-tuple-vec-map = "1.0.1"
serde_urlencoded = "0.7.1" serde_urlencoded = "0.7.1"
sha2 = "0.10.0" sha2 = "0.10.0"
sled = { version = "0.34.7" } sled = { version = "0.34.7" }

View file

@ -33,11 +33,13 @@
cargo-outdated cargo-outdated
certstrap certstrap
clippy clippy
curl
diesel-cli diesel-cli
exiftool exiftool
ffmpeg_6-full ffmpeg_6-full
garage garage
imagemagick imagemagick
jq
minio-client minio-client
rust-analyzer rust-analyzer
rustc rustc

View file

@ -111,8 +111,11 @@ pub(crate) enum UploadError {
#[error("Invalid job popped from job queue: {1}")] #[error("Invalid job popped from job queue: {1}")]
InvalidJob(#[source] serde_json::Error, String), InvalidJob(#[source] serde_json::Error, String),
#[error("Error parsing upload query")] #[error("Invalid query supplied")]
InvalidUploadQuery(#[source] actix_web::error::QueryPayloadError), InvalidQuery(#[source] actix_web::error::QueryPayloadError),
#[error("Invalid json supplied")]
InvalidJson(#[source] actix_web::error::JsonPayloadError),
#[error("pict-rs is in read-only mode")] #[error("pict-rs is in read-only mode")]
ReadOnly, ReadOnly,
@ -212,7 +215,8 @@ impl UploadError {
Self::ProcessTimeout => ErrorCode::COMMAND_TIMEOUT, Self::ProcessTimeout => ErrorCode::COMMAND_TIMEOUT,
Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION, Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION,
Self::InvalidJob(_, _) => ErrorCode::INVALID_JOB, Self::InvalidJob(_, _) => ErrorCode::INVALID_JOB,
Self::InvalidUploadQuery(_) => ErrorCode::INVALID_UPLOAD_QUERY, Self::InvalidQuery(_) => ErrorCode::INVALID_QUERY,
Self::InvalidJson(_) => ErrorCode::INVALID_JSON,
#[cfg(feature = "random-errors")] #[cfg(feature = "random-errors")]
Self::RandomError => ErrorCode::RANDOM_ERROR, Self::RandomError => ErrorCode::RANDOM_ERROR,
} }
@ -265,7 +269,8 @@ impl ResponseError for Error {
)) ))
| UploadError::Repo(crate::repo::RepoError::AlreadyClaimed) | UploadError::Repo(crate::repo::RepoError::AlreadyClaimed)
| UploadError::Validation(_) | UploadError::Validation(_)
| UploadError::InvalidUploadQuery(_) | UploadError::InvalidQuery(_)
| UploadError::InvalidJson(_)
| UploadError::UnsupportedProcessExtension | UploadError::UnsupportedProcessExtension
| UploadError::ReadOnly | UploadError::ReadOnly
| UploadError::FailedExternalValidation | UploadError::FailedExternalValidation

View file

@ -100,6 +100,9 @@ impl ErrorCode {
pub(crate) const VIDEO_DISABLED: ErrorCode = ErrorCode { pub(crate) const VIDEO_DISABLED: ErrorCode = ErrorCode {
code: "video-disabled", code: "video-disabled",
}; };
pub(crate) const MEDIA_DISALLOWED: ErrorCode = ErrorCode {
code: "media-disallowed",
};
pub(crate) const HTTP_CLIENT_ERROR: ErrorCode = ErrorCode { pub(crate) const HTTP_CLIENT_ERROR: ErrorCode = ErrorCode {
code: "http-client-error", code: "http-client-error",
}; };
@ -147,8 +150,11 @@ impl ErrorCode {
pub(crate) const INVALID_JOB: ErrorCode = ErrorCode { pub(crate) const INVALID_JOB: ErrorCode = ErrorCode {
code: "invalid-job", code: "invalid-job",
}; };
pub(crate) const INVALID_UPLOAD_QUERY: ErrorCode = ErrorCode { pub(crate) const INVALID_QUERY: ErrorCode = ErrorCode {
code: "invalid-upload-query", code: "invalid-query",
};
pub(crate) const INVALID_JSON: ErrorCode = ErrorCode {
code: "invalid-json",
}; };
#[cfg(feature = "random-errors")] #[cfg(feature = "random-errors")]
pub(crate) const RANDOM_ERROR: ErrorCode = ErrorCode { pub(crate) const RANDOM_ERROR: ErrorCode = ErrorCode {

View file

@ -64,7 +64,13 @@ where
.with_poll_timer("validate-bytes-stream") .with_poll_timer("validate-bytes-stream")
.await?; .await?;
let process_read = if let Some(operations) = state.config.media.preprocess_steps() { let operations = if upload_query.operations.is_empty() {
state.config.media.preprocess_steps()
} else {
Some(upload_query.operations.as_ref())
};
let process_read = if let Some(operations) = operations {
if let Some(format) = input_type.processable_format() { if let Some(format) = input_type.processable_format() {
let (_, magick_args) = let (_, magick_args) =
crate::processor::build_chain(operations, format.file_extension())?; crate::processor::build_chain(operations, format.file_extension())?;

View file

@ -39,6 +39,9 @@ pub(crate) enum ValidationError {
#[error("Video is disabled")] #[error("Video is disabled")]
VideoDisabled, VideoDisabled,
#[error("Media type wasn't allowed for this upload")]
MediaDisallowed,
} }
impl ValidationError { impl ValidationError {
@ -51,6 +54,7 @@ impl ValidationError {
Self::Empty => ErrorCode::VALIDATE_FILE_EMPTY, Self::Empty => ErrorCode::VALIDATE_FILE_EMPTY,
Self::Filesize => ErrorCode::VALIDATE_FILE_SIZE, Self::Filesize => ErrorCode::VALIDATE_FILE_SIZE,
Self::VideoDisabled => ErrorCode::VIDEO_DISABLED, Self::VideoDisabled => ErrorCode::VIDEO_DISABLED,
Self::MediaDisallowed => ErrorCode::MEDIA_DISALLOWED,
} }
} }
} }
@ -76,14 +80,16 @@ pub(crate) async fn validate_bytes_stream<S>(
.with_poll_timer("discover-bytes-stream") .with_poll_timer("discover-bytes-stream")
.await?; .await?;
validate_upload(bytes.len(), width, height, frames, upload_limits)?;
match &input { match &input {
InputFile::Image(input) => { InputFile::Image(input) if *upload_limits.allow_image => {
let (format, process) = let (format, process) =
process_image_command(state, *input, bytes.len(), width, height).await?; process_image_command(state, *input, bytes.len(), width, height).await?;
Ok((format, process.drive_with_stream(bytes.into_io_stream()))) Ok((format, process.drive_with_stream(bytes.into_io_stream())))
} }
InputFile::Animation(input) => { InputFile::Animation(input) if *upload_limits.allow_animation => {
let (format, process) = process_animation_command( let (format, process) = process_animation_command(
state, state,
*input, *input,
@ -96,20 +102,67 @@ pub(crate) async fn validate_bytes_stream<S>(
Ok((format, process.drive_with_stream(bytes.into_io_stream()))) Ok((format, process.drive_with_stream(bytes.into_io_stream())))
} }
InputFile::Video(input) => { InputFile::Video(input) if *upload_limits.allow_video => {
let (format, process_read) = let (format, process_read) =
process_video(state, bytes, *input, width, height, frames.unwrap_or(1)).await?; process_video(state, bytes, *input, width, height, frames.unwrap_or(1)).await?;
Ok((format, process_read)) Ok((format, process_read))
} }
_ => Err(ValidationError::MediaDisallowed.into()),
} }
} }
fn validate_upload(
size: usize,
width: u16,
height: u16,
frames: Option<u32>,
upload_limits: &UploadLimits,
) -> Result<(), ValidationError> {
if upload_limits
.max_width
.is_some_and(|max_width| width > *max_width)
{
return Err(ValidationError::Width);
}
if upload_limits
.max_height
.is_some_and(|max_height| height > *max_height)
{
return Err(ValidationError::Height);
}
if upload_limits
.max_frame_count
.zip(frames)
.is_some_and(|(max_frame_count, frames)| frames > *max_frame_count)
{
return Err(ValidationError::Frames);
}
if upload_limits
.max_area
.is_some_and(|max_area| u32::from(width) * u32::from(height) > *max_area)
{
return Err(ValidationError::Area);
}
if upload_limits
.max_file_size
.is_some_and(|max_file_size| size > *max_file_size * MEGABYTES)
{
return Err(ValidationError::Filesize);
}
Ok(())
}
#[tracing::instrument(skip(state))] #[tracing::instrument(skip(state))]
async fn process_image_command<S>( async fn process_image_command<S>(
state: &State<S>, state: &State<S>,
input: ImageInput, input: ImageInput,
length: usize, size: usize,
width: u16, width: u16,
height: u16, height: u16,
) -> Result<(InternalFormat, Process), Error> { ) -> Result<(InternalFormat, Process), Error> {
@ -124,7 +177,7 @@ async fn process_image_command<S>(
if u32::from(width) * u32::from(height) > validations.max_area { if u32::from(width) * u32::from(height) > validations.max_area {
return Err(ValidationError::Area.into()); return Err(ValidationError::Area.into());
} }
if length > validations.max_file_size * MEGABYTES { if size > validations.max_file_size * MEGABYTES {
return Err(ValidationError::Filesize.into()); return Err(ValidationError::Filesize.into());
} }
@ -174,14 +227,14 @@ fn validate_animation(
async fn process_animation_command<S>( async fn process_animation_command<S>(
state: &State<S>, state: &State<S>,
input: AnimationFormat, input: AnimationFormat,
length: usize, size: usize,
width: u16, width: u16,
height: u16, height: u16,
frames: u32, frames: u32,
) -> Result<(InternalFormat, Process), Error> { ) -> Result<(InternalFormat, Process), Error> {
let validations = &state.config.media.animation; let validations = &state.config.media.animation;
validate_animation(length, width, height, frames, validations)?; validate_animation(size, width, height, frames, validations)?;
let AnimationOutput { let AnimationOutput {
format, format,

View file

@ -150,14 +150,14 @@ async fn ensure_details_identifier<S: Store + 'static>(
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(default)] #[serde(default)]
struct UploadLimits { struct UploadLimits {
max_width: Option<u16>, max_width: Option<Serde<u16>>,
max_height: Option<u16>, max_height: Option<Serde<u16>>,
max_area: Option<u32>, max_area: Option<Serde<u32>>,
max_frame_count: Option<u32>, max_frame_count: Option<Serde<u32>>,
max_file_size: Option<usize>, max_file_size: Option<Serde<usize>>,
allow_image: bool, allow_image: Serde<bool>,
allow_animation: bool, allow_animation: Serde<bool>,
allow_video: bool, allow_video: Serde<bool>,
} }
impl Default for UploadLimits { impl Default for UploadLimits {
@ -168,9 +168,9 @@ impl Default for UploadLimits {
max_area: None, max_area: None,
max_frame_count: None, max_frame_count: None,
max_file_size: None, max_file_size: None,
allow_image: true, allow_image: Serde::new(true),
allow_animation: true, allow_animation: Serde::new(true),
allow_video: true, allow_video: Serde::new(true),
} }
} }
} }
@ -197,7 +197,7 @@ impl<S: Store + 'static> FormData for Upload<S> {
.clone(); .clone();
let web::Query(upload_query) = web::Query::<UploadQuery>::from_query(req.query_string()) let web::Query(upload_query) = web::Query::<UploadQuery>::from_query(req.query_string())
.map_err(UploadError::InvalidUploadQuery)?; .map_err(UploadError::InvalidQuery)?;
let upload_query = Rc::new(upload_query); let upload_query = Rc::new(upload_query);
@ -254,7 +254,7 @@ impl<S: Store + 'static> FormData for Import<S> {
.clone(); .clone();
let web::Query(upload_query) = web::Query::<UploadQuery>::from_query(req.query_string()) let web::Query(upload_query) = web::Query::<UploadQuery>::from_query(req.query_string())
.map_err(UploadError::InvalidUploadQuery)?; .map_err(UploadError::InvalidQuery)?;
let upload_query = Rc::new(upload_query); let upload_query = Rc::new(upload_query);
@ -426,8 +426,10 @@ impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
async fn upload_backgrounded<S: Store>( async fn upload_backgrounded<S: Store>(
Multipart(BackgroundedUpload(value, _)): Multipart<BackgroundedUpload<S>>, Multipart(BackgroundedUpload(value, _)): Multipart<BackgroundedUpload<S>>,
state: web::Data<State<S>>, state: web::Data<State<S>>,
web::Query(upload_query): web::Query<UploadQuery>, upload_query: web::Query<UploadQuery>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let upload_query = upload_query.into_inner();
let images = value let images = value
.map() .map()
.and_then(|mut m| m.remove("images")) .and_then(|mut m| m.remove("images"))
@ -552,12 +554,14 @@ async fn ingest_inline<S: Store + 'static>(
/// download an image from a URL /// download an image from a URL
#[tracing::instrument(name = "Downloading file", skip(state))] #[tracing::instrument(name = "Downloading file", skip(state))]
async fn download<S: Store + 'static>( async fn download<S: Store + 'static>(
web::Query(DownloadQuery { download_query: web::Query<DownloadQuery>,
url_query,
upload_query,
}): web::Query<DownloadQuery>,
state: web::Data<State<S>>, state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let DownloadQuery {
url_query,
upload_query,
} = download_query.into_inner();
let stream = download_stream(&url_query.url, &state).await?; let stream = download_stream(&url_query.url, &state).await?;
if url_query.backgrounded { if url_query.backgrounded {
@ -1574,6 +1578,16 @@ fn build_client() -> Result<ClientWithMiddleware, Error> {
.build()) .build())
} }
fn query_config() -> web::QueryConfig {
web::QueryConfig::default()
.error_handler(|err, _| Error::from(UploadError::InvalidQuery(err)).into())
}
fn json_config() -> web::JsonConfig {
web::JsonConfig::default()
.error_handler(|err, _| Error::from(UploadError::InvalidJson(err)).into())
}
fn configure_endpoints<S: Store + 'static, F: Fn(&mut web::ServiceConfig)>( fn configure_endpoints<S: Store + 'static, F: Fn(&mut web::ServiceConfig)>(
config: &mut web::ServiceConfig, config: &mut web::ServiceConfig,
state: State<S>, state: State<S>,
@ -1581,6 +1595,8 @@ fn configure_endpoints<S: Store + 'static, F: Fn(&mut web::ServiceConfig)>(
extra_config: F, extra_config: F,
) { ) {
config config
.app_data(query_config())
.app_data(json_config())
.app_data(web::Data::new(state.clone())) .app_data(web::Data::new(state.clone()))
.app_data(web::Data::new(process_map.clone())) .app_data(web::Data::new(process_map.clone()))
.route("/healthz", web::get().to(healthz::<S>)) .route("/healthz", web::get().to(healthz::<S>))

View file

@ -3,7 +3,7 @@ use std::{
str::FromStr, str::FromStr,
}; };
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct Serde<T> { pub(crate) struct Serde<T> {
inner: T, inner: T,
} }
@ -44,6 +44,17 @@ impl<T> DerefMut for Serde<T> {
} }
} }
impl<T> Default for Serde<T>
where
T: Default,
{
fn default() -> Self {
Serde {
inner: T::default(),
}
}
}
impl<T> FromStr for Serde<T> impl<T> FromStr for Serde<T>
where where
T: FromStr, T: FromStr,