Replace global config with passed-around config

This will enable spinning up pict-rs multiple times in the same process with different configurations
This commit is contained in:
asonix 2023-07-22 12:31:01 -05:00
parent e1262a5fda
commit 3ca994ee31
7 changed files with 312 additions and 232 deletions

View file

@ -54,10 +54,15 @@ impl ConfigSource<PathBuf, ()> {
} }
} }
pub struct PictRsConfiguration {
pub(crate) config: Configuration,
pub(crate) operation: Operation,
}
pub(crate) fn configure_without_clap<P: AsRef<Path>, T: serde::Serialize, Q: AsRef<Path>>( pub(crate) fn configure_without_clap<P: AsRef<Path>, T: serde::Serialize, Q: AsRef<Path>>(
source: ConfigSource<P, T>, source: ConfigSource<P, T>,
save_to: Option<Q>, save_to: Option<Q>,
) -> color_eyre::Result<(Configuration, Operation)> { ) -> color_eyre::Result<PictRsConfiguration> {
let config = Config::builder().add_source(config::Config::try_from(&Defaults::default())?); let config = Config::builder().add_source(config::Config::try_from(&Defaults::default())?);
let config = match source { let config = match source {
@ -83,10 +88,10 @@ pub(crate) fn configure_without_clap<P: AsRef<Path>, T: serde::Serialize, Q: AsR
std::fs::write(save_to, output)?; std::fs::write(save_to, output)?;
} }
Ok((config, operation)) Ok(PictRsConfiguration { config, operation })
} }
pub(crate) fn configure() -> color_eyre::Result<(Configuration, Operation)> { pub(crate) fn configure() -> color_eyre::Result<PictRsConfiguration> {
let Output { let Output {
config_format, config_format,
operation, operation,
@ -118,5 +123,5 @@ pub(crate) fn configure() -> color_eyre::Result<(Configuration, Operation)> {
std::fs::write(save_to, output)?; std::fs::write(save_to, output)?;
} }
Ok((config, operation)) Ok(PictRsConfiguration { config, operation })
} }

View file

@ -24,7 +24,7 @@ pub(crate) async fn generate<R: FullRepo, S: Store + 'static>(
thumbnail_args: Vec<String>, thumbnail_args: Vec<String>,
input_format: Option<InternalVideoFormat>, input_format: Option<InternalVideoFormat>,
thumbnail_format: Option<ThumbnailFormat>, thumbnail_format: Option<ThumbnailFormat>,
media: &'static crate::config::Media, media: &crate::config::Media,
hash: R::Bytes, hash: R::Bytes,
) -> Result<(Details, Bytes), Error> { ) -> Result<(Details, Bytes), Error> {
let process_fut = process( let process_fut = process(
@ -58,7 +58,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
thumbnail_args: Vec<String>, thumbnail_args: Vec<String>,
input_format: Option<InternalVideoFormat>, input_format: Option<InternalVideoFormat>,
thumbnail_format: Option<ThumbnailFormat>, thumbnail_format: Option<ThumbnailFormat>,
media: &'static crate::config::Media, media: &crate::config::Media,
hash: R::Bytes, hash: R::Bytes,
) -> Result<(Details, Bytes), Error> { ) -> Result<(Details, Bytes), Error> {
let permit = crate::PROCESS_SEMAPHORE.acquire().await; let permit = crate::PROCESS_SEMAPHORE.acquire().await;

View file

@ -46,7 +46,7 @@ pub(crate) async fn ingest<R, S>(
store: &S, store: &S,
stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static, stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
declared_alias: Option<Alias>, declared_alias: Option<Alias>,
media: &'static crate::config::Media, media: &crate::config::Media,
) -> Result<Session<R, S>, Error> ) -> Result<Session<R, S>, Error>
where where
R: FullRepo + 'static, R: FullRepo + 'static,

View file

@ -39,7 +39,7 @@ use futures_util::{
stream::{empty, once}, stream::{empty, once},
Stream, StreamExt, TryStreamExt, Stream, StreamExt, TryStreamExt,
}; };
use once_cell::sync::{Lazy, OnceCell}; use once_cell::sync::Lazy;
use repo::sled::SledRepo; use repo::sled::SledRepo;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::TracingMiddleware; use reqwest_tracing::TracingMiddleware;
@ -79,7 +79,7 @@ use self::{
stream::{StreamLimit, StreamTimeout}, stream::{StreamLimit, StreamTimeout},
}; };
pub use self::config::ConfigSource; pub use self::config::{ConfigSource, PictRsConfiguration};
const MEGABYTES: usize = 1024 * 1024; const MEGABYTES: usize = 1024 * 1024;
const MINUTES: u32 = 60; const MINUTES: u32 = 60;
@ -88,21 +88,6 @@ const DAYS: u32 = 24 * HOURS;
const NOT_FOUND_KEY: &str = "404-alias"; const NOT_FOUND_KEY: &str = "404-alias";
static DO_CONFIG: OnceCell<(Configuration, Operation)> = OnceCell::new();
static CONFIG: Lazy<Configuration> = Lazy::new(|| {
DO_CONFIG
.get_or_try_init(config::configure)
.expect("Failed to configure")
.0
.clone()
});
static OPERATION: Lazy<Operation> = Lazy::new(|| {
DO_CONFIG
.get_or_try_init(config::configure)
.expect("Failed to configure")
.1
.clone()
});
static PROCESS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| { static PROCESS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| {
tracing::trace_span!(parent: None, "Initialize semaphore") tracing::trace_span!(parent: None, "Initialize semaphore")
.in_scope(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1))) .in_scope(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1)))
@ -111,6 +96,7 @@ static PROCESS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| {
async fn ensure_details<R: FullRepo, S: Store + 'static>( async fn ensure_details<R: FullRepo, S: Store + 'static>(
repo: &R, repo: &R,
store: &S, store: &S,
config: &Configuration,
alias: &Alias, alias: &Alias,
) -> Result<Details, Error> { ) -> Result<Details, Error> {
let Some(identifier) = repo.identifier_from_alias::<S::Identifier>(alias).await? else { let Some(identifier) = repo.identifier_from_alias::<S::Identifier>(alias).await? else {
@ -129,7 +115,7 @@ async fn ensure_details<R: FullRepo, S: Store + 'static>(
tracing::debug!("details exist"); tracing::debug!("details exist");
Ok(details) Ok(details)
} else { } else {
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -160,16 +146,21 @@ impl<R: FullRepo, S: Store + 'static> FormData for Upload<R, S> {
.app_data::<web::Data<S>>() .app_data::<web::Data<S>>()
.expect("No store in request") .expect("No store in request")
.clone(); .clone();
let config = req
.app_data::<web::Data<Configuration>>()
.expect("No configuration in request")
.clone();
Form::new() Form::new()
.max_files(CONFIG.server.max_file_count) .max_files(config.server.max_file_count)
.max_file_size(CONFIG.media.max_file_size * MEGABYTES) .max_file_size(config.media.max_file_size * MEGABYTES)
.transform_error(transform_error) .transform_error(transform_error)
.field( .field(
"images", "images",
Field::array(Field::file(move |filename, _, stream| { Field::array(Field::file(move |filename, _, stream| {
let repo = repo.clone(); let repo = repo.clone();
let store = store.clone(); let store = store.clone();
let config = config.clone();
let span = tracing::info_span!("file-upload", ?filename); let span = tracing::info_span!("file-upload", ?filename);
@ -177,11 +168,11 @@ impl<R: FullRepo, S: Store + 'static> FormData for Upload<R, S> {
Box::pin( Box::pin(
async move { async move {
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
ingest::ingest(&**repo, &**store, stream, None, &CONFIG.media).await ingest::ingest(&**repo, &**store, stream, None, &config.media).await
} }
.instrument(span), .instrument(span),
) )
@ -209,19 +200,24 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
.app_data::<web::Data<S>>() .app_data::<web::Data<S>>()
.expect("No store in request") .expect("No store in request")
.clone(); .clone();
let config = req
.app_data::<web::Data<Configuration>>()
.expect("No configuration in request")
.clone();
// Create a new Multipart Form validator for internal imports // Create a new Multipart Form validator for internal imports
// //
// This form is expecting a single array field, 'images' with at most 10 files in it // This form is expecting a single array field, 'images' with at most 10 files in it
Form::new() Form::new()
.max_files(CONFIG.server.max_file_count) .max_files(config.server.max_file_count)
.max_file_size(CONFIG.media.max_file_size * MEGABYTES) .max_file_size(config.media.max_file_size * MEGABYTES)
.transform_error(transform_error) .transform_error(transform_error)
.field( .field(
"images", "images",
Field::array(Field::file(move |filename, _, stream| { Field::array(Field::file(move |filename, _, stream| {
let repo = repo.clone(); let repo = repo.clone();
let store = store.clone(); let store = store.clone();
let config = config.clone();
let span = tracing::info_span!("file-import", ?filename); let span = tracing::info_span!("file-import", ?filename);
@ -229,7 +225,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
Box::pin( Box::pin(
async move { async move {
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -238,7 +234,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
&**store, &**store,
stream, stream,
Some(Alias::from_existing(&filename)), Some(Alias::from_existing(&filename)),
&CONFIG.media, &config.media,
) )
.await .await
} }
@ -257,31 +253,34 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
} }
/// Handle responding to successful uploads /// Handle responding to successful uploads
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store))] #[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
async fn upload<R: FullRepo, S: Store + 'static>( async fn upload<R: FullRepo, S: Store + 'static>(
Multipart(Upload(value)): Multipart<Upload<R, S>>, Multipart(Upload(value)): Multipart<Upload<R, S>>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
handle_upload(value, repo, store).await handle_upload(value, repo, store, config).await
} }
/// Handle responding to successful uploads /// Handle responding to successful uploads
#[tracing::instrument(name = "Imported files", skip(value, repo, store))] #[tracing::instrument(name = "Imported files", skip(value, repo, store, config))]
async fn import<R: FullRepo, S: Store + 'static>( async fn import<R: FullRepo, S: Store + 'static>(
Multipart(Import(value)): Multipart<Import<R, S>>, Multipart(Import(value)): Multipart<Import<R, S>>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
handle_upload(value, repo, store).await handle_upload(value, repo, store, config).await
} }
/// Handle responding to successful uploads /// Handle responding to successful uploads
#[tracing::instrument(name = "Uploaded files", skip(value, repo, store))] #[tracing::instrument(name = "Uploaded files", skip(value, repo, store, config))]
async fn handle_upload<R: FullRepo, S: Store + 'static>( async fn handle_upload<R: FullRepo, S: Store + 'static>(
value: Value<Session<R, S>>, value: Value<Session<R, S>>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let images = value let images = value
.map() .map()
@ -300,7 +299,7 @@ async fn handle_upload<R: FullRepo, S: Store + 'static>(
tracing::debug!("Uploaded {} as {:?}", image.filename, alias); tracing::debug!("Uploaded {} as {:?}", image.filename, alias);
let delete_token = image.result.delete_token().await?; let delete_token = image.result.delete_token().await?;
let details = ensure_details(&repo, &store, alias).await?; let details = ensure_details(&repo, &store, &config, alias).await?;
files.push(serde_json::json!({ files.push(serde_json::json!({
"file": alias.to_string(), "file": alias.to_string(),
@ -338,10 +337,16 @@ impl<R: FullRepo, S: Store + 'static> FormData for BackgroundedUpload<R, S> {
.app_data::<web::Data<S>>() .app_data::<web::Data<S>>()
.expect("No store in request") .expect("No store in request")
.clone(); .clone();
let config = req
.app_data::<web::Data<Configuration>>()
.expect("No configuration in request")
.clone();
let read_only = config.server.read_only;
Form::new() Form::new()
.max_files(CONFIG.server.max_file_count) .max_files(config.server.max_file_count)
.max_file_size(CONFIG.media.max_file_size * MEGABYTES) .max_file_size(config.media.max_file_size * MEGABYTES)
.transform_error(transform_error) .transform_error(transform_error)
.field( .field(
"images", "images",
@ -355,7 +360,7 @@ impl<R: FullRepo, S: Store + 'static> FormData for BackgroundedUpload<R, S> {
Box::pin( Box::pin(
async move { async move {
if CONFIG.server.read_only { if read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -427,6 +432,7 @@ struct ClaimQuery {
async fn claim_upload<R: FullRepo, S: Store + 'static>( async fn claim_upload<R: FullRepo, S: Store + 'static>(
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
query: web::Query<ClaimQuery>, query: web::Query<ClaimQuery>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let upload_id = Serde::into_inner(query.into_inner().upload_id); let upload_id = Serde::into_inner(query.into_inner().upload_id);
@ -438,7 +444,7 @@ async fn claim_upload<R: FullRepo, S: Store + 'static>(
match upload_result { match upload_result {
UploadResult::Success { alias, token } => { UploadResult::Success { alias, token } => {
let details = ensure_details(&repo, &store, &alias).await?; let details = ensure_details(&repo, &store, &config, &alias).await?;
Ok(HttpResponse::Ok().json(&serde_json::json!({ Ok(HttpResponse::Ok().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
@ -469,14 +475,15 @@ struct UrlQuery {
} }
/// download an image from a URL /// download an image from a URL
#[tracing::instrument(name = "Downloading file", skip(client, repo, store))] #[tracing::instrument(name = "Downloading file", skip(client, repo, store, config))]
async fn download<R: FullRepo + 'static, S: Store + 'static>( async fn download<R: FullRepo + 'static, S: Store + 'static>(
client: web::Data<ClientWithMiddleware>, client: web::Data<ClientWithMiddleware>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
query: web::Query<UrlQuery>, query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -489,27 +496,28 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
let stream = res let stream = res
.bytes_stream() .bytes_stream()
.map_err(Error::from) .map_err(Error::from)
.limit((CONFIG.media.max_file_size * MEGABYTES) as u64); .limit((config.media.max_file_size * MEGABYTES) as u64);
if query.backgrounded { if query.backgrounded {
do_download_backgrounded(stream, repo, store).await do_download_backgrounded(stream, repo, store).await
} else { } else {
do_download_inline(stream, repo, store).await do_download_inline(stream, repo, store, config).await
} }
} }
#[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store))] #[tracing::instrument(name = "Downloading file inline", skip(stream, repo, store, config))]
async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>( async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static, stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let mut session = ingest::ingest(&repo, &store, stream, None, &CONFIG.media).await?; let mut session = ingest::ingest(&repo, &store, stream, None, &config.media).await?;
let alias = session.alias().expect("alias should exist").to_owned(); let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?; let delete_token = session.delete_token().await?;
let details = ensure_details(&repo, &store, &alias).await?; let details = ensure_details(&repo, &store, &config, &alias).await?;
session.disarm(); session.disarm();
@ -553,9 +561,10 @@ async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
#[tracing::instrument(name = "Deleting file", skip(repo))] #[tracing::instrument(name = "Deleting file", skip(repo))]
async fn delete<R: FullRepo>( async fn delete<R: FullRepo>(
repo: web::Data<R>, repo: web::Data<R>,
config: web::Data<Configuration>,
path_entries: web::Path<(String, String)>, path_entries: web::Path<(String, String)>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -572,6 +581,7 @@ async fn delete<R: FullRepo>(
type ProcessQuery = Vec<(String, String)>; type ProcessQuery = Vec<(String, String)>;
fn prepare_process( fn prepare_process(
config: &Configuration,
query: web::Query<ProcessQuery>, query: web::Query<ProcessQuery>,
ext: &str, ext: &str,
) -> Result<(InputProcessableFormat, Alias, PathBuf, Vec<String>), Error> { ) -> Result<(InputProcessableFormat, Alias, PathBuf, Vec<String>), Error> {
@ -596,7 +606,7 @@ fn prepare_process(
let operations = operations let operations = operations
.into_iter() .into_iter()
.filter(|(k, _)| CONFIG.media.filters.contains(&k.to_lowercase())) .filter(|(k, _)| config.media.filters.contains(&k.to_lowercase()))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let format = ext let format = ext
@ -609,13 +619,14 @@ fn prepare_process(
Ok((format, alias, thumbnail_path, thumbnail_args)) Ok((format, alias, thumbnail_path, thumbnail_args))
} }
#[tracing::instrument(name = "Fetching derived details", skip(repo))] #[tracing::instrument(name = "Fetching derived details", skip(repo, config))]
async fn process_details<R: FullRepo, S: Store>( async fn process_details<R: FullRepo, S: Store>(
query: web::Query<ProcessQuery>, query: web::Query<ProcessQuery>,
ext: web::Path<String>, ext: web::Path<String>,
repo: web::Data<R>, repo: web::Data<R>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?; let (_, alias, thumbnail_path, _) = prepare_process(&config, query, ext.as_str())?;
let Some(hash) = repo.hash(&alias).await? else { let Some(hash) = repo.hash(&alias).await? else {
// Invalid alias // Invalid alias
@ -655,16 +666,21 @@ async fn not_found_hash<R: FullRepo>(repo: &R) -> Result<Option<(Alias, R::Bytes
} }
/// Process files /// Process files
#[tracing::instrument(name = "Serving processed image", skip(repo, store))] #[tracing::instrument(
name = "Serving processed image",
skip(repo, store, config, process_map)
)]
async fn process<R: FullRepo, S: Store + 'static>( async fn process<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,
query: web::Query<ProcessQuery>, query: web::Query<ProcessQuery>,
ext: web::Path<String>, ext: web::Path<String>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
process_map: web::Data<ProcessMap>, process_map: web::Data<ProcessMap>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?; let (format, alias, thumbnail_path, thumbnail_args) =
prepare_process(&config, query, ext.as_str())?;
let path_string = thumbnail_path.to_string_lossy().to_string(); let path_string = thumbnail_path.to_string_lossy().to_string();
@ -695,7 +711,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
tracing::debug!("details exist"); tracing::debug!("details exist");
details details
} else { } else {
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -716,11 +732,11 @@ async fn process<R: FullRepo, S: Store + 'static>(
return ranged_file_resp(&store, identifier, range, details, not_found).await; return ranged_file_resp(&store, identifier, range, details, not_found).await;
} }
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
let original_details = ensure_details(&repo, &store, &alias).await?; let original_details = ensure_details(&repo, &store, &config, &alias).await?;
let (details, bytes) = generate::generate( let (details, bytes) = generate::generate(
&repo, &repo,
@ -732,7 +748,7 @@ async fn process<R: FullRepo, S: Store + 'static>(
thumbnail_args, thumbnail_args,
original_details.video_format(), original_details.video_format(),
None, None,
&CONFIG.media, &config.media,
hash, hash,
) )
.await?; .await?;
@ -774,15 +790,16 @@ async fn process<R: FullRepo, S: Store + 'static>(
)) ))
} }
#[tracing::instrument(name = "Serving processed image headers", skip(repo, store))] #[tracing::instrument(name = "Serving processed image headers", skip(repo, store, config))]
async fn process_head<R: FullRepo, S: Store + 'static>( async fn process_head<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,
query: web::Query<ProcessQuery>, query: web::Query<ProcessQuery>,
ext: web::Path<String>, ext: web::Path<String>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?; let (_, alias, thumbnail_path, _) = prepare_process(&config, query, ext.as_str())?;
let path_string = thumbnail_path.to_string_lossy().to_string(); let path_string = thumbnail_path.to_string_lossy().to_string();
let Some(hash) = repo.hash(&alias).await? else { let Some(hash) = repo.hash(&alias).await? else {
@ -807,7 +824,7 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
tracing::debug!("details exist"); tracing::debug!("details exist");
details details
} else { } else {
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -837,8 +854,10 @@ async fn process_backgrounded<R: FullRepo, S: Store>(
query: web::Query<ProcessQuery>, query: web::Query<ProcessQuery>,
ext: web::Path<String>, ext: web::Path<String>,
repo: web::Data<R>, repo: web::Data<R>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let (target_format, source, process_path, process_args) = prepare_process(query, ext.as_str())?; let (target_format, source, process_path, process_args) =
prepare_process(&config, query, ext.as_str())?;
let path_string = process_path.to_string_lossy().to_string(); let path_string = process_path.to_string_lossy().to_string();
let Some(hash) = repo.hash(&source).await? else { let Some(hash) = repo.hash(&source).await? else {
@ -854,7 +873,7 @@ async fn process_backgrounded<R: FullRepo, S: Store>(
return Ok(HttpResponse::Accepted().finish()); return Ok(HttpResponse::Accepted().finish());
} }
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -864,26 +883,28 @@ async fn process_backgrounded<R: FullRepo, S: Store>(
} }
/// Fetch file details /// Fetch file details
#[tracing::instrument(name = "Fetching details", skip(repo, store))] #[tracing::instrument(name = "Fetching details", skip(repo, store, config))]
async fn details<R: FullRepo, S: Store + 'static>( async fn details<R: FullRepo, S: Store + 'static>(
alias: web::Path<Serde<Alias>>, alias: web::Path<Serde<Alias>>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let alias = alias.into_inner(); let alias = alias.into_inner();
let details = ensure_details(&repo, &store, &alias).await?; let details = ensure_details(&repo, &store, &config, &alias).await?;
Ok(HttpResponse::Ok().json(&details)) Ok(HttpResponse::Ok().json(&details))
} }
/// Serve files /// Serve files
#[tracing::instrument(name = "Serving file", skip(repo, store))] #[tracing::instrument(name = "Serving file", skip(repo, store, config))]
async fn serve<R: FullRepo, S: Store + 'static>( async fn serve<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,
alias: web::Path<Serde<Alias>>, alias: web::Path<Serde<Alias>>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let alias = alias.into_inner(); let alias = alias.into_inner();
@ -906,7 +927,7 @@ async fn serve<R: FullRepo, S: Store + 'static>(
return Ok(HttpResponse::NotFound().finish()); return Ok(HttpResponse::NotFound().finish());
}; };
let details = ensure_details(&repo, &store, &alias).await?; let details = ensure_details(&repo, &store, &config, &alias).await?;
if let Some(public_url) = store.public_url(&identifier) { if let Some(public_url) = store.public_url(&identifier) {
return Ok(HttpResponse::SeeOther() return Ok(HttpResponse::SeeOther()
@ -917,12 +938,13 @@ async fn serve<R: FullRepo, S: Store + 'static>(
ranged_file_resp(&store, identifier, range, details, not_found).await ranged_file_resp(&store, identifier, range, details, not_found).await
} }
#[tracing::instrument(name = "Serving file headers", skip(repo, store))] #[tracing::instrument(name = "Serving file headers", skip(repo, store, config))]
async fn serve_head<R: FullRepo, S: Store + 'static>( async fn serve_head<R: FullRepo, S: Store + 'static>(
range: Option<web::Header<Range>>, range: Option<web::Header<Range>>,
alias: web::Path<Serde<Alias>>, alias: web::Path<Serde<Alias>>,
repo: web::Data<R>, repo: web::Data<R>,
store: web::Data<S>, store: web::Data<S>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let alias = alias.into_inner(); let alias = alias.into_inner();
@ -931,7 +953,7 @@ async fn serve_head<R: FullRepo, S: Store + 'static>(
return Ok(HttpResponse::NotFound().finish()); return Ok(HttpResponse::NotFound().finish());
}; };
let details = ensure_details(&repo, &store, &alias).await?; let details = ensure_details(&repo, &store, &config, &alias).await?;
if let Some(public_url) = store.public_url(&identifier) { if let Some(public_url) = store.public_url(&identifier) {
return Ok(HttpResponse::SeeOther() return Ok(HttpResponse::SeeOther()
@ -1074,9 +1096,12 @@ fn srv_head(
builder builder
} }
#[tracing::instrument(name = "Spawning variant cleanup", skip(repo))] #[tracing::instrument(name = "Spawning variant cleanup", skip(repo, config))]
async fn clean_variants<R: FullRepo>(repo: web::Data<R>) -> Result<HttpResponse, Error> { async fn clean_variants<R: FullRepo>(
if CONFIG.server.read_only { repo: web::Data<R>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> {
if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -1089,12 +1114,13 @@ struct AliasQuery {
alias: Serde<Alias>, alias: Serde<Alias>,
} }
#[tracing::instrument(name = "Setting 404 Image", skip(repo))] #[tracing::instrument(name = "Setting 404 Image", skip(repo, config))]
async fn set_not_found<R: FullRepo>( async fn set_not_found<R: FullRepo>(
json: web::Json<AliasQuery>, json: web::Json<AliasQuery>,
repo: web::Data<R>, repo: web::Data<R>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -1113,12 +1139,13 @@ async fn set_not_found<R: FullRepo>(
}))) })))
} }
#[tracing::instrument(name = "Purging file", skip(repo))] #[tracing::instrument(name = "Purging file", skip(repo, config))]
async fn purge<R: FullRepo>( async fn purge<R: FullRepo>(
query: web::Query<AliasQuery>, query: web::Query<AliasQuery>,
repo: web::Data<R>, repo: web::Data<R>,
config: web::Data<Configuration>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
if CONFIG.server.read_only { if config.server.read_only {
return Err(UploadError::ReadOnly.into()); return Err(UploadError::ReadOnly.into());
} }
@ -1186,11 +1213,11 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error {
error error
} }
fn build_client() -> Result<ClientWithMiddleware, Error> { fn build_client(config: &Configuration) -> Result<ClientWithMiddleware, Error> {
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.user_agent("pict-rs v0.5.0-main") .user_agent("pict-rs v0.5.0-main")
.use_rustls_tls() .use_rustls_tls()
.pool_max_idle_per_host(CONFIG.client.pool_size) .pool_max_idle_per_host(config.client.pool_size)
.build() .build()
.map_err(UploadError::BuildClient)?; .map_err(UploadError::BuildClient)?;
@ -1199,12 +1226,12 @@ fn build_client() -> Result<ClientWithMiddleware, Error> {
.build()) .build())
} }
fn next_worker_id() -> String { fn next_worker_id(config: &Configuration) -> String {
static WORKER_ID: AtomicU64 = AtomicU64::new(0); static WORKER_ID: AtomicU64 = AtomicU64::new(0);
let next_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); let next_id = WORKER_ID.fetch_add(1, Ordering::Relaxed);
format!("{}-{}", CONFIG.server.worker_id, next_id) format!("{}-{}", config.server.worker_id, next_id)
} }
fn configure_endpoints< fn configure_endpoints<
@ -1215,6 +1242,7 @@ fn configure_endpoints<
config: &mut web::ServiceConfig, config: &mut web::ServiceConfig,
repo: R, repo: R,
store: S, store: S,
configuration: Configuration,
client: ClientWithMiddleware, client: ClientWithMiddleware,
extra_config: F, extra_config: F,
) { ) {
@ -1222,6 +1250,7 @@ fn configure_endpoints<
.app_data(web::Data::new(repo)) .app_data(web::Data::new(repo))
.app_data(web::Data::new(store)) .app_data(web::Data::new(store))
.app_data(web::Data::new(client)) .app_data(web::Data::new(client))
.app_data(web::Data::new(configuration.clone()))
.route("/healthz", web::get().to(healthz::<R, S>)) .route("/healthz", web::get().to(healthz::<R, S>))
.service( .service(
web::scope("/image") web::scope("/image")
@ -1276,7 +1305,7 @@ fn configure_endpoints<
.service( .service(
web::scope("/internal") web::scope("/internal")
.wrap(Internal( .wrap(Internal(
CONFIG.server.api_key.as_ref().map(|s| s.to_owned()), configuration.server.api_key.as_ref().map(|s| s.to_owned()),
)) ))
.service(web::resource("/import").route(web::post().to(import::<R, S>))) .service(web::resource("/import").route(web::post().to(import::<R, S>)))
.service(web::resource("/variants").route(web::delete().to(clean_variants::<R>))) .service(web::resource("/variants").route(web::delete().to(clean_variants::<R>)))
@ -1288,7 +1317,7 @@ fn configure_endpoints<
); );
} }
fn spawn_workers<R, S>(repo: R, store: S, process_map: ProcessMap) fn spawn_workers<R, S>(repo: R, store: S, config: &Configuration, process_map: ProcessMap)
where where
R: FullRepo + 'static, R: FullRepo + 'static,
S: Store + 'static, S: Store + 'static,
@ -1297,7 +1326,7 @@ where
actix_rt::spawn(queue::process_cleanup( actix_rt::spawn(queue::process_cleanup(
repo.clone(), repo.clone(),
store.clone(), store.clone(),
next_worker_id(), next_worker_id(config),
)) ))
}); });
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
@ -1305,7 +1334,8 @@ where
repo, repo,
store, store,
process_map, process_map,
next_worker_id(), config.clone(),
next_worker_id(config),
)) ))
}); });
} }
@ -1314,26 +1344,29 @@ async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig)
repo: R, repo: R,
store: FileStore, store: FileStore,
client: ClientWithMiddleware, client: ClientWithMiddleware,
config: Configuration,
extra_config: F, extra_config: F,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let process_map = ProcessMap::new(); let process_map = ProcessMap::new();
let address = config.server.address;
HttpServer::new(move || { HttpServer::new(move || {
let client = client.clone(); let client = client.clone();
let store = store.clone(); let store = store.clone();
let repo = repo.clone(); let repo = repo.clone();
let config = config.clone();
let extra_config = extra_config.clone(); let extra_config = extra_config.clone();
spawn_workers(repo.clone(), store.clone(), process_map.clone()); spawn_workers(repo.clone(), store.clone(), &config, process_map.clone());
App::new() App::new()
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
.wrap(Deadline) .wrap(Deadline)
.app_data(web::Data::new(process_map.clone())) .app_data(web::Data::new(process_map.clone()))
.configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config)) .configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config))
}) })
.bind(CONFIG.server.address)? .bind(address)?
.run() .run()
.await .await
} }
@ -1345,26 +1378,29 @@ async fn launch_object_store<
repo: R, repo: R,
store_config: ObjectStoreConfig, store_config: ObjectStoreConfig,
client: ClientWithMiddleware, client: ClientWithMiddleware,
config: Configuration,
extra_config: F, extra_config: F,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let process_map = ProcessMap::new(); let process_map = ProcessMap::new();
let address = config.server.address;
HttpServer::new(move || { HttpServer::new(move || {
let client = client.clone(); let client = client.clone();
let store = store_config.clone().build(client.clone()); let store = store_config.clone().build(client.clone());
let repo = repo.clone(); let repo = repo.clone();
let config = config.clone();
let extra_config = extra_config.clone(); let extra_config = extra_config.clone();
spawn_workers(repo.clone(), store.clone(), process_map.clone()); spawn_workers(repo.clone(), store.clone(), &config, process_map.clone());
App::new() App::new()
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
.wrap(Deadline) .wrap(Deadline)
.app_data(web::Data::new(process_map.clone())) .app_data(web::Data::new(process_map.clone()))
.configure(move |sc| configure_endpoints(sc, repo, store, client, extra_config)) .configure(move |sc| configure_endpoints(sc, repo, store, config, client, extra_config))
}) })
.bind(CONFIG.server.address)? .bind(address)?
.run() .run()
.await .await
} }
@ -1463,25 +1499,14 @@ impl<P: AsRef<Path>, T: serde::Serialize> ConfigSource<P, T> {
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn init<Q: AsRef<Path>>(self, save_to: Option<Q>) -> color_eyre::Result<()> { pub fn init<Q: AsRef<Path>>(
let (config, operation) = config::configure_without_clap(self, save_to)?; self,
save_to: Option<Q>,
DO_CONFIG ) -> color_eyre::Result<PictRsConfiguration> {
.set((config, operation)) config::configure_without_clap(self, save_to)
.unwrap_or_else(|_| panic!("CONFIG cannot be initialized more than once"));
Ok(())
} }
} }
/// Install the default pict-rs tracer
///
/// This is probably not useful for 3rd party applications that install their own tracing
/// subscribers.
pub fn install_tracing() -> color_eyre::Result<()> {
init_tracing(&CONFIG.tracing)
}
async fn export_handler(repo: web::Data<SledRepo>) -> Result<HttpResponse, Error> { async fn export_handler(repo: web::Data<SledRepo>) -> Result<HttpResponse, Error> {
repo.export().await?; repo.export().await?;
@ -1494,106 +1519,111 @@ fn sled_extra_config(sc: &mut web::ServiceConfig) {
sc.service(web::resource("/export").route(web::post().to(export_handler))); sc.service(web::resource("/export").route(web::post().to(export_handler)));
} }
/// Run the pict-rs application impl PictRsConfiguration {
/// /// Build the pict-rs configuration from commandline arguments
/// This must be called after `init_config`, or else the default configuration builder will run and ///
/// fail. /// This is probably not useful for 3rd party applications that handle their own commandline
pub async fn run() -> color_eyre::Result<()> { pub fn build_default() -> color_eyre::Result<Self> {
let repo = Repo::open(CONFIG.repo.clone())?; config::configure()
repo.migrate_from_db(CONFIG.old_db.path.clone()).await?; }
let client = build_client()?;
match (*OPERATION).clone() { /// Install the default pict-rs tracer
Operation::Run => (), ///
Operation::MigrateStore { /// This is probably not useful for 3rd party applications that install their own tracing
skip_missing_files, /// subscribers.
from, pub fn install_tracing(&self) -> color_eyre::Result<()> {
to, init_tracing(&self.config.tracing)
} => { }
match from {
config::primitives::Store::Filesystem(config::Filesystem { path }) => { /// Run the pict-rs application
let from = FileStore::build(path.clone(), repo.clone()).await?; ///
migrate_inner(repo, client, from, to, skip_missing_files).await?; /// This must be called after `init_config`, or else the default configuration builder will run and
} /// fail.
config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage { pub async fn run(self) -> color_eyre::Result<()> {
endpoint, let PictRsConfiguration { config, operation } = self;
bucket_name,
use_path_style, let repo = Repo::open(config.repo.clone())?;
region, repo.migrate_from_db(config.old_db.path.clone()).await?;
access_key, let client = build_client(&config)?;
secret_key,
session_token, match operation {
signature_duration, Operation::Run => (),
client_timeout, Operation::MigrateStore {
public_endpoint, skip_missing_files,
}) => { from,
let from = ObjectStore::build( to,
endpoint, } => {
bucket_name, match from {
if use_path_style { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
UrlStyle::Path let from = FileStore::build(path.clone(), repo.clone()).await?;
} else { migrate_inner(repo, client, from, to, skip_missing_files).await?;
UrlStyle::VirtualHost }
config::primitives::Store::ObjectStorage(
config::primitives::ObjectStorage {
endpoint,
bucket_name,
use_path_style,
region,
access_key,
secret_key,
session_token,
signature_duration,
client_timeout,
public_endpoint,
}, },
region, ) => {
access_key, let from = ObjectStore::build(
secret_key, endpoint,
session_token, bucket_name,
signature_duration.unwrap_or(15), if use_path_style {
client_timeout.unwrap_or(30), UrlStyle::Path
public_endpoint, } else {
repo.clone(), UrlStyle::VirtualHost
) },
.await? region,
.build(client.clone()); access_key,
secret_key,
session_token,
signature_duration.unwrap_or(15),
client_timeout.unwrap_or(30),
public_endpoint,
repo.clone(),
)
.await?
.build(client.clone());
migrate_inner(repo, client, from, to, skip_missing_files).await?; migrate_inner(repo, client, from, to, skip_missing_files).await?;
}
} }
}
return Ok(()); return Ok(());
}
}
if CONFIG.server.read_only {
tracing::warn!("Launching in READ ONLY mode");
}
match CONFIG.store.clone() {
config::Store::Filesystem(config::Filesystem { path }) => {
repo.migrate_identifiers().await?;
let store = FileStore::build(path, repo.clone()).await?;
match repo {
Repo::Sled(sled_repo) => {
sled_repo
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
launch_file_store(sled_repo, store, client, sled_extra_config).await?;
}
} }
} }
config::Store::ObjectStorage(config::ObjectStorage {
endpoint, if config.server.read_only {
bucket_name, tracing::warn!("Launching in READ ONLY mode");
use_path_style, }
region,
access_key, match config.store.clone() {
secret_key, config::Store::Filesystem(config::Filesystem { path }) => {
session_token, repo.migrate_identifiers().await?;
signature_duration,
client_timeout, let store = FileStore::build(path, repo.clone()).await?;
public_endpoint, match repo {
}) => { Repo::Sled(sled_repo) => {
let store = ObjectStore::build( sled_repo
.requeue_in_progress(config.server.worker_id.as_bytes().to_vec())
.await?;
launch_file_store(sled_repo, store, client, config, sled_extra_config)
.await?;
}
}
}
config::Store::ObjectStorage(config::ObjectStorage {
endpoint, endpoint,
bucket_name, bucket_name,
if use_path_style { use_path_style,
UrlStyle::Path
} else {
UrlStyle::VirtualHost
},
region, region,
access_key, access_key,
secret_key, secret_key,
@ -1601,23 +1631,41 @@ pub async fn run() -> color_eyre::Result<()> {
signature_duration, signature_duration,
client_timeout, client_timeout,
public_endpoint, public_endpoint,
repo.clone(), }) => {
) let store = ObjectStore::build(
.await?; endpoint,
bucket_name,
if use_path_style {
UrlStyle::Path
} else {
UrlStyle::VirtualHost
},
region,
access_key,
secret_key,
session_token,
signature_duration,
client_timeout,
public_endpoint,
repo.clone(),
)
.await?;
match repo { match repo {
Repo::Sled(sled_repo) => { Repo::Sled(sled_repo) => {
sled_repo sled_repo
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .requeue_in_progress(config.server.worker_id.as_bytes().to_vec())
.await?; .await?;
launch_object_store(sled_repo, store, client, sled_extra_config).await?; launch_object_store(sled_repo, store, client, config, sled_extra_config)
.await?;
}
} }
} }
} }
self::tmp_file::remove_tmp_dir().await?;
Ok(())
} }
self::tmp_file::remove_tmp_dir().await?;
Ok(())
} }

View file

@ -1,6 +1,6 @@
#[actix_rt::main] #[actix_rt::main]
async fn main() -> color_eyre::Result<()> { async fn main() -> color_eyre::Result<()> {
pict_rs::install_tracing()?; let pict_rs = pict_rs::PictRsConfiguration::build_default()?;
pict_rs.install_tracing()?;
pict_rs::run().await pict_rs.run().await
} }

View file

@ -1,5 +1,6 @@
use crate::{ use crate::{
concurrent_processor::ProcessMap, concurrent_processor::ProcessMap,
config::Configuration,
error::Error, error::Error,
formats::InputProcessableFormat, formats::InputProcessableFormat,
repo::{ repo::{
@ -163,12 +164,14 @@ pub(crate) async fn process_images<R: FullRepo + 'static, S: Store + 'static>(
repo: R, repo: R,
store: S, store: S,
process_map: ProcessMap, process_map: ProcessMap,
config: Configuration,
worker_id: String, worker_id: String,
) { ) {
process_image_jobs( process_image_jobs(
&repo, &repo,
&store, &store,
&process_map, &process_map,
&config,
worker_id, worker_id,
PROCESS_QUEUE, PROCESS_QUEUE,
process::perform, process::perform,
@ -231,6 +234,7 @@ async fn process_image_jobs<R, S, F>(
repo: &R, repo: &R,
store: &S, store: &S,
process_map: &ProcessMap, process_map: &ProcessMap,
config: &Configuration,
worker_id: String, worker_id: String,
queue: &'static str, queue: &'static str,
callback: F, callback: F,
@ -238,12 +242,26 @@ async fn process_image_jobs<R, S, F>(
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone, R::Bytes: Clone,
S: Store, S: Store,
for<'a> F: for<'a> F: Fn(
Fn(&'a R, &'a S, &'a ProcessMap, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, &'a R,
&'a S,
&'a ProcessMap,
&'a Configuration,
&'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
{ {
loop { loop {
let res = let res = image_job_loop(
image_job_loop(repo, store, process_map, worker_id.clone(), queue, callback).await; repo,
store,
process_map,
config,
worker_id.clone(),
queue,
callback,
)
.await;
if let Err(e) = res { if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", format!("{e}")); tracing::warn!("Error processing jobs: {}", format!("{e}"));
@ -259,6 +277,7 @@ async fn image_job_loop<R, S, F>(
repo: &R, repo: &R,
store: &S, store: &S,
process_map: &ProcessMap, process_map: &ProcessMap,
config: &Configuration,
worker_id: String, worker_id: String,
queue: &'static str, queue: &'static str,
callback: F, callback: F,
@ -267,15 +286,21 @@ where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone, R::Bytes: Clone,
S: Store, S: Store,
for<'a> F: for<'a> F: Fn(
Fn(&'a R, &'a S, &'a ProcessMap, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, &'a R,
&'a S,
&'a ProcessMap,
&'a Configuration,
&'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
+ Copy,
{ {
loop { loop {
let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?; let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?;
let span = tracing::info_span!("Running Job", worker_id = ?worker_id); let span = tracing::info_span!("Running Job", worker_id = ?worker_id);
span.in_scope(|| (callback)(repo, store, process_map, bytes.as_ref())) span.in_scope(|| (callback)(repo, store, process_map, config, bytes.as_ref()))
.instrument(span) .instrument(span)
.await?; .await?;
} }

View file

@ -1,5 +1,6 @@
use crate::{ use crate::{
concurrent_processor::ProcessMap, concurrent_processor::ProcessMap,
config::Configuration,
error::{Error, UploadError}, error::{Error, UploadError},
formats::InputProcessableFormat, formats::InputProcessableFormat,
ingest::Session, ingest::Session,
@ -7,7 +8,6 @@ use crate::{
repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult}, repo::{Alias, DeleteToken, FullRepo, UploadId, UploadResult},
serde_str::Serde, serde_str::Serde,
store::{Identifier, Store}, store::{Identifier, Store},
CONFIG,
}; };
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use std::path::PathBuf; use std::path::PathBuf;
@ -16,6 +16,7 @@ pub(super) fn perform<'a, R, S>(
repo: &'a R, repo: &'a R,
store: &'a S, store: &'a S,
process_map: &'a ProcessMap, process_map: &'a ProcessMap,
config: &'a Configuration,
job: &'a [u8], job: &'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>> ) -> LocalBoxFuture<'a, Result<(), Error>>
where where
@ -36,7 +37,7 @@ where
identifier, identifier,
Serde::into_inner(upload_id), Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner), declared_alias.map(Serde::into_inner),
&CONFIG.media, &config.media,
) )
.await? .await?
} }
@ -54,7 +55,7 @@ where
Serde::into_inner(source), Serde::into_inner(source),
process_path, process_path,
process_args, process_args,
&CONFIG.media, config,
) )
.await? .await?
} }
@ -75,7 +76,7 @@ async fn process_ingest<R, S>(
unprocessed_identifier: Vec<u8>, unprocessed_identifier: Vec<u8>,
upload_id: UploadId, upload_id: UploadId,
declared_alias: Option<Alias>, declared_alias: Option<Alias>,
media: &'static crate::config::Media, media: &crate::config::Media,
) -> Result<(), Error> ) -> Result<(), Error>
where where
R: FullRepo + 'static, R: FullRepo + 'static,
@ -88,6 +89,7 @@ where
let store2 = store.clone(); let store2 = store.clone();
let repo = repo.clone(); let repo = repo.clone();
let media = media.clone();
let error_boundary = actix_rt::spawn(async move { let error_boundary = actix_rt::spawn(async move {
let stream = store2 let stream = store2
.to_stream(&ident, None, None) .to_stream(&ident, None, None)
@ -95,7 +97,7 @@ where
.map_err(Error::from); .map_err(Error::from);
let session = let session =
crate::ingest::ingest(&repo, &store2, stream, declared_alias, media).await?; crate::ingest::ingest(&repo, &store2, stream, declared_alias, &media).await?;
let token = session.delete_token().await?; let token = session.delete_token().await?;
@ -139,7 +141,7 @@ async fn generate<R: FullRepo, S: Store + 'static>(
source: Alias, source: Alias,
process_path: PathBuf, process_path: PathBuf,
process_args: Vec<String>, process_args: Vec<String>,
meida: &'static crate::config::Media, config: &Configuration,
) -> Result<(), Error> { ) -> Result<(), Error> {
let Some(hash) = repo.hash(&source).await? else { let Some(hash) = repo.hash(&source).await? else {
// Nothing to do // Nothing to do
@ -155,7 +157,7 @@ async fn generate<R: FullRepo, S: Store + 'static>(
return Ok(()); return Ok(());
} }
let original_details = crate::ensure_details(repo, store, &source).await?; let original_details = crate::ensure_details(repo, store, config, &source).await?;
crate::generate::generate( crate::generate::generate(
repo, repo,
@ -167,7 +169,7 @@ async fn generate<R: FullRepo, S: Store + 'static>(
process_args, process_args,
original_details.video_format(), original_details.video_format(),
None, None,
meida, &config.media,
hash, hash,
) )
.await?; .await?;