pict-rs/src/queue/process.rs
2023-07-16 21:51:14 -05:00

158 lines
4.1 KiB
Rust

use crate::{
error::Error,
formats::InputProcessableFormat,
ingest::Session,
queue::{Base64Bytes, LocalBoxFuture, Process},
repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult},
serde_str::Serde,
store::{Identifier, Store},
CONFIG,
};
use futures_util::TryStreamExt;
use std::path::PathBuf;
pub(super) fn perform<'a, R, S>(
repo: &'a R,
store: &'a S,
job: &'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
where
R: FullRepo + 'static,
S: Store + 'static,
{
Box::pin(async move {
match serde_json::from_slice(job) {
Ok(job) => match job {
Process::Ingest {
identifier: Base64Bytes(identifier),
upload_id,
declared_alias,
} => {
process_ingest(
repo,
store,
identifier,
Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner),
&CONFIG.media,
)
.await?
}
Process::Generate {
target_format,
source,
process_path,
process_args,
} => {
generate(
repo,
store,
target_format,
Serde::into_inner(source),
process_path,
process_args,
)
.await?
}
},
Err(e) => {
tracing::warn!("Invalid job: {}", format!("{e}"));
}
}
Ok(())
})
}
#[tracing::instrument(skip_all)]
async fn process_ingest<R, S>(
repo: &R,
store: &S,
unprocessed_identifier: Vec<u8>,
upload_id: UploadId,
declared_alias: Option<Alias>,
media: &crate::config::Media,
) -> Result<(), Error>
where
R: FullRepo + 'static,
S: Store,
{
let fut = async {
let unprocessed_identifier = S::Identifier::from_bytes(unprocessed_identifier)?;
let stream = store
.to_stream(&unprocessed_identifier, None, None)
.await?
.map_err(Error::from);
let session = crate::ingest::ingest(repo, store, stream, declared_alias, media).await?;
let token = session.delete_token().await?;
store.remove(&unprocessed_identifier).await?;
Ok((session, token)) as Result<(Session<R, S>, DeleteToken), Error>
};
let result = match fut.await {
Ok((mut session, token)) => {
let alias = session.alias().take().expect("Alias should exist").clone();
let result = UploadResult::Success { alias, token };
session.disarm();
result
}
Err(e) => {
tracing::warn!("Failed to ingest\n{}\n{}", format!("{e}"), format!("{e:?}"));
UploadResult::Failure {
message: e.root_cause().to_string(),
}
}
};
repo.complete(upload_id, result).await?;
Ok(())
}
#[tracing::instrument(skip_all)]
async fn generate<R: FullRepo, S: Store + 'static>(
repo: &R,
store: &S,
target_format: InputProcessableFormat,
source: Alias,
process_path: PathBuf,
process_args: Vec<String>,
) -> Result<(), Error> {
let Some(hash) = repo.hash(&source).await? else {
// Nothing to do
return Ok(());
};
let path_string = process_path.to_string_lossy().to_string();
let identifier_opt = repo
.variant_identifier::<S::Identifier>(hash.clone(), path_string)
.await?;
if identifier_opt.is_some() {
return Ok(());
}
let original_details = crate::ensure_details(repo, store, &source).await?;
crate::generate::generate(
repo,
store,
target_format,
source,
process_path,
process_args,
original_details.video_format(),
None,
hash,
)
.await?;
Ok(())
}