pict-rs/src/queue/process.rs

214 lines
5.7 KiB
Rust
Raw Normal View History

2023-09-06 01:45:07 +00:00
use reqwest_middleware::ClientWithMiddleware;
use time::Instant;
2023-09-06 01:45:07 +00:00
use crate::{
2023-07-22 16:15:30 +00:00
concurrent_processor::ProcessMap,
config::Configuration,
error::{Error, UploadError},
formats::InputProcessableFormat,
2023-09-05 02:51:27 +00:00
future::LocalBoxFuture,
ingest::Session,
2023-09-05 02:51:27 +00:00
queue::Process,
repo::{Alias, ArcRepo, UploadId, UploadResult},
serde_str::Serde,
store::Store,
};
use std::{path::PathBuf, sync::Arc};
pub(super) fn perform<'a, S>(
repo: &'a ArcRepo,
store: &'a S,
2023-09-06 01:45:07 +00:00
client: &'a ClientWithMiddleware,
2023-07-22 16:15:30 +00:00
process_map: &'a ProcessMap,
config: &'a Configuration,
2023-09-03 17:47:06 +00:00
job: serde_json::Value,
) -> LocalBoxFuture<'a, Result<(), Error>>
where
S: Store + 'static,
{
Box::pin(async move {
2023-09-03 17:47:06 +00:00
match serde_json::from_value(job) {
Ok(job) => match job {
Process::Ingest {
identifier,
upload_id,
declared_alias,
} => {
process_ingest(
repo,
store,
2023-09-06 01:45:07 +00:00
client,
Arc::from(identifier),
Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner),
&config.media,
)
.await?
}
Process::Generate {
target_format,
source,
process_path,
process_args,
} => {
generate(
repo,
store,
2023-07-22 16:15:30 +00:00
process_map,
target_format,
Serde::into_inner(source),
process_path,
process_args,
config,
)
.await?
}
},
Err(e) => {
2023-01-29 17:57:59 +00:00
tracing::warn!("Invalid job: {}", format!("{e}"));
}
}
Ok(())
})
}
struct UploadGuard {
armed: bool,
start: Instant,
upload_id: UploadId,
}
impl UploadGuard {
fn guard(upload_id: UploadId) -> Self {
Self {
armed: true,
start: Instant::now(),
upload_id,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for UploadGuard {
fn drop(&mut self) {
metrics::increment_counter!("pict-rs.background.upload.ingest", "completed" => (!self.armed).to_string());
metrics::histogram!("pict-rs.background.upload.ingest.duration", self.start.elapsed().as_seconds_f64(), "completed" => (!self.armed).to_string());
if self.armed {
tracing::warn!(
"Upload future for {} dropped before completion! This can cause clients to wait forever",
self.upload_id,
);
}
}
}
2023-09-06 01:45:07 +00:00
#[tracing::instrument(skip(repo, store, client, media))]
async fn process_ingest<S>(
repo: &ArcRepo,
store: &S,
2023-09-06 01:45:07 +00:00
client: &ClientWithMiddleware,
unprocessed_identifier: Arc<str>,
upload_id: UploadId,
declared_alias: Option<Alias>,
media: &crate::config::Media,
) -> Result<(), Error>
where
S: Store + 'static,
{
let guard = UploadGuard::guard(upload_id);
let fut = async {
let ident = unprocessed_identifier.clone();
let store2 = store.clone();
let repo = repo.clone();
2023-09-06 01:45:07 +00:00
let client = client.clone();
let media = media.clone();
let error_boundary = crate::sync::spawn(async move {
let stream = crate::stream::from_err(store2.to_stream(&ident, None, None).await?);
let session =
2023-09-06 01:45:07 +00:00
crate::ingest::ingest(&repo, &store2, &client, stream, declared_alias, &media)
.await?;
Ok(session) as Result<Session, Error>
})
.await;
2022-04-03 02:15:39 +00:00
store.remove(&unprocessed_identifier).await?;
error_boundary.map_err(|_| UploadError::Canceled)?
};
let result = match fut.await {
Ok(session) => {
let alias = session.alias().take().expect("Alias should exist").clone();
let token = session.disarm();
2023-07-27 03:53:41 +00:00
UploadResult::Success { alias, token }
}
Err(e) => {
2023-07-10 22:15:43 +00:00
tracing::warn!("Failed to ingest\n{}\n{}", format!("{e}"), format!("{e:?}"));
UploadResult::Failure {
2023-07-17 02:51:14 +00:00
message: e.root_cause().to_string(),
2023-09-02 01:50:10 +00:00
code: e.error_code().into_owned(),
}
}
};
repo.complete_upload(upload_id, result).await?;
guard.disarm();
Ok(())
}
2023-07-22 16:15:30 +00:00
#[allow(clippy::too_many_arguments)]
2023-09-03 23:21:46 +00:00
#[tracing::instrument(skip(repo, store, process_map, process_path, process_args, config))]
async fn generate<S: Store + 'static>(
repo: &ArcRepo,
store: &S,
2023-07-22 16:15:30 +00:00
process_map: &ProcessMap,
target_format: InputProcessableFormat,
source: Alias,
process_path: PathBuf,
process_args: Vec<String>,
config: &Configuration,
) -> Result<(), Error> {
let Some(hash) = repo.hash(&source).await? else {
// Nothing to do
return Ok(());
};
let path_string = process_path.to_string_lossy().to_string();
let identifier_opt = repo.variant_identifier(hash.clone(), path_string).await?;
if identifier_opt.is_some() {
return Ok(());
}
let original_details = crate::ensure_details(repo, store, config, &source).await?;
crate::generate::generate(
repo,
store,
2023-07-22 16:15:30 +00:00
process_map,
target_format,
source,
process_path,
process_args,
original_details.video_format(),
None,
&config.media,
hash,
)
.await?;
Ok(())
}