In Progress: process jobs

- Is this a good idea? it definitely will make interacting with pict-rs harder. Maybe it's best not to do this
This commit is contained in:
Aode (lion) 2022-04-01 16:51:12 -05:00
parent c0d8e0e8e3
commit 6ed592c432
10 changed files with 329 additions and 74 deletions

View file

@ -30,7 +30,7 @@ impl Details {
}
#[tracing::instrument("Details from store")]
pub(crate) async fn from_store<S: Store>(
pub(crate) async fn from_store<S: Store + 'static>(
store: S,
identifier: S::Identifier,
expected_format: Option<ValidInputType>,

View file

@ -140,7 +140,7 @@ pub(crate) async fn details_bytes(
}
#[tracing::instrument(skip(store))]
pub(crate) async fn details_store<S: Store>(
pub(crate) async fn details_store<S: Store + 'static>(
store: S,
identifier: S::Identifier,
hint: Option<ValidInputType>,
@ -255,7 +255,7 @@ pub(crate) async fn input_type_bytes(input: Bytes) -> Result<ValidInputType, Err
}
#[instrument(name = "Spawning process command")]
pub(crate) fn process_image_store_read<S: Store>(
pub(crate) fn process_image_store_read<S: Store + 'static>(
store: S,
identifier: S::Identifier,
args: Vec<String>,

View file

@ -11,7 +11,6 @@ use futures_util::{
};
use once_cell::sync::Lazy;
use std::{
collections::BTreeSet,
future::ready,
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
@ -142,7 +141,7 @@ struct UrlQuery {
/// download an image from a URL
#[instrument(name = "Downloading file", skip(client, manager))]
async fn download<S: Store>(
async fn download<S: Store + 'static>(
client: web::Data<Client>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
@ -214,7 +213,6 @@ type ProcessQuery = Vec<(String, String)>;
fn prepare_process(
query: web::Query<ProcessQuery>,
ext: &str,
filters: &BTreeSet<String>,
) -> Result<(ImageFormat, Alias, PathBuf, Vec<String>), Error> {
let (alias, operations) =
query
@ -237,7 +235,7 @@ fn prepare_process(
let operations = operations
.into_iter()
.filter(|(k, _)| filters.contains(&k.to_lowercase()))
.filter(|(k, _)| CONFIG.media.filters.contains(&k.to_lowercase()))
.collect::<Vec<_>>();
let format = ext
@ -251,14 +249,13 @@ fn prepare_process(
Ok((format, alias, thumbnail_path, thumbnail_args))
}
#[instrument(name = "Fetching derived details", skip(manager, filters))]
#[instrument(name = "Fetching derived details", skip(manager))]
async fn process_details<S: Store>(
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
filters: web::Data<BTreeSet<String>>,
) -> Result<HttpResponse, Error> {
let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str(), &filters)?;
let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?;
let identifier = manager
.variant_identifier::<S>(&alias, &thumbnail_path)
@ -273,17 +270,15 @@ async fn process_details<S: Store>(
}
/// Process files
#[instrument(name = "Serving processed image", skip(manager, filters))]
#[instrument(name = "Serving processed image", skip(manager))]
async fn process<S: Store + 'static>(
range: Option<web::Header<Range>>,
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
filters: web::Data<BTreeSet<String>>,
) -> Result<HttpResponse, Error> {
let (format, alias, thumbnail_path, thumbnail_args) =
prepare_process(query, ext.as_str(), &filters)?;
let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?;
let identifier_opt = manager
.variant_identifier::<S>(&alias, &thumbnail_path)
@ -376,7 +371,7 @@ async fn process<S: Store + 'static>(
/// Fetch file details
#[instrument(name = "Fetching details", skip(manager))]
async fn details<S: Store>(
async fn details<S: Store + 'static>(
alias: web::Path<String>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
@ -402,7 +397,7 @@ async fn details<S: Store>(
/// Serve files
#[instrument(name = "Serving file", skip(manager))]
async fn serve<S: Store>(
async fn serve<S: Store + 'static>(
range: Option<web::Header<Range>>,
alias: web::Path<String>,
manager: web::Data<UploadManager>,
@ -426,7 +421,7 @@ async fn serve<S: Store>(
ranged_file_resp(&**store, identifier, range, details).await
}
async fn ranged_file_resp<S: Store>(
async fn ranged_file_resp<S: Store + 'static>(
store: &S,
identifier: S::Identifier,
range: Option<web::Header<Range>>,
@ -652,7 +647,12 @@ async fn launch<S: Store + Clone + 'static>(
let manager = manager.clone();
let store = store.clone();
actix_rt::spawn(queue::process_jobs(
actix_rt::spawn(queue::process_cleanup(
manager.repo().clone(),
store.clone(),
next_worker_id(),
));
actix_rt::spawn(queue::process_images(
manager.repo().clone(),
store.clone(),
next_worker_id(),
@ -664,7 +664,6 @@ async fn launch<S: Store + Clone + 'static>(
.app_data(web::Data::new(store))
.app_data(web::Data::new(manager))
.app_data(web::Data::new(build_client()))
.app_data(web::Data::new(CONFIG.media.filters.clone()))
.service(
web::scope("/image")
.service(

View file

@ -144,7 +144,7 @@ impl Process {
})
}
pub(crate) fn store_read<S: Store>(
pub(crate) fn store_read<S: Store + 'static>(
mut self,
store: S,
identifier: S::Identifier,

View file

@ -1,83 +1,135 @@
use crate::{
config::ImageFormat,
error::Error,
repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo},
repo::{Alias, AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo},
serde_str::Serde,
store::Store,
};
use std::{future::Future, path::PathBuf, pin::Pin};
use uuid::Uuid;
mod cleanup;
mod process;
const CLEANUP_QUEUE: &str = "cleanup";
const PROCESS_QUEUE: &str = "process";
#[derive(Debug, serde::Deserialize, serde::Serialize)]
enum Job {
enum Cleanup {
CleanupHash { hash: Vec<u8> },
CleanupIdentifier { identifier: Vec<u8> },
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
enum Process {
Ingest {
identifier: Vec<u8>,
upload_id: Uuid,
declared_alias: Option<Serde<Alias>>,
should_validate: bool,
},
Generate {
target_format: ImageFormat,
source: Serde<Alias>,
process_path: PathBuf,
process_args: Vec<String>,
},
}
pub(crate) async fn queue_cleanup<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
let job = serde_json::to_vec(&Job::CleanupHash {
let job = serde_json::to_vec(&Cleanup::CleanupHash {
hash: hash.as_ref().to_vec(),
})?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn process_jobs<S: Store>(repo: Repo, store: S, worker_id: String) {
pub(crate) async fn queue_ingest<R: QueueRepo>(
repo: &R,
identifier: Vec<u8>,
upload_id: Uuid,
declared_alias: Option<Alias>,
should_validate: bool,
) -> Result<(), Error> {
let job = serde_json::to_vec(&Process::Ingest {
identifier,
declared_alias: declared_alias.map(Serde::new),
upload_id,
should_validate,
})?;
repo.push(PROCESS_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn queue_generate<R: QueueRepo>(
repo: &R,
target_format: ImageFormat,
source: Alias,
process_path: PathBuf,
process_args: Vec<String>,
) -> Result<(), Error> {
let job = serde_json::to_vec(&Process::Generate {
target_format,
source: Serde::new(source),
process_path,
process_args,
})?;
repo.push(PROCESS_QUEUE, job.into()).await?;
Ok(())
}
pub(crate) async fn process_cleanup<S: Store>(repo: Repo, store: S, worker_id: String) {
match repo {
Repo::Sled(ref repo) => {
if let Ok(Some(job)) = repo.in_progress(worker_id.as_bytes().to_vec()).await {
if let Err(e) = run_job(repo, &store, &job).await {
tracing::warn!("Failed to run previously dropped job: {}", e);
tracing::warn!("{:?}", e);
}
}
loop {
let res = job_loop(repo, &store, worker_id.clone()).await;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", e);
tracing::warn!("{:?}", e);
continue;
}
break;
}
}
Repo::Sled(repo) => process_jobs(&repo, &store, worker_id, cleanup::perform).await,
}
}
async fn job_loop<R, S>(repo: &R, store: &S, worker_id: String) -> Result<(), Error>
pub(crate) async fn process_images<S: Store>(repo: Repo, store: S, worker_id: String) {
match repo {
Repo::Sled(repo) => process_jobs(&repo, &store, worker_id, process::perform).await,
}
}
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
async fn process_jobs<R, S, F>(repo: &R, store: &S, worker_id: String, callback: F)
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
{
if let Ok(Some(job)) = repo.in_progress(worker_id.as_bytes().to_vec()).await {
if let Err(e) = (callback)(repo, store, job.as_ref()).await {
tracing::warn!("Failed to run previously dropped job: {}", e);
tracing::warn!("{:?}", e);
}
}
loop {
let res = job_loop(repo, store, worker_id.clone(), callback).await;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", e);
tracing::warn!("{:?}", e);
continue;
}
break;
}
}
async fn job_loop<R, S, F>(repo: &R, store: &S, worker_id: String, callback: F) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
{
loop {
let bytes = repo
.pop(CLEANUP_QUEUE, worker_id.as_bytes().to_vec())
.await?;
run_job(repo, store, bytes.as_ref()).await?;
(callback)(repo, store, bytes.as_ref()).await?;
}
}
async fn run_job<R, S>(repo: &R, store: &S, job: &[u8]) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
{
match serde_json::from_slice(job) {
Ok(job) => match job {
Job::CleanupHash { hash } => cleanup::hash::<R, S>(repo, hash).await?,
Job::CleanupIdentifier { identifier } => {
cleanup::identifier(repo, store, identifier).await?
}
},
Err(e) => {
tracing::warn!("Invalid job: {}", e);
}
}
Ok(())
}

View file

@ -1,13 +1,40 @@
use crate::{
error::Error,
queue::{Job, CLEANUP_QUEUE},
queue::{Cleanup, LocalBoxFuture, CLEANUP_QUEUE},
repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo},
store::{Identifier, Store},
};
use tracing::error;
pub(super) fn perform<'a, R, S>(
repo: &'a R,
store: &'a S,
job: &'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
{
Box::pin(async move {
match serde_json::from_slice(job) {
Ok(job) => match job {
Cleanup::CleanupHash { hash: in_hash } => hash::<R, S>(repo, in_hash).await?,
Cleanup::CleanupIdentifier {
identifier: in_identifier,
} => identifier(repo, &store, in_identifier).await?,
},
Err(e) => {
tracing::warn!("Invalid job: {}", e);
}
}
Ok(())
})
}
#[tracing::instrument(skip(repo, store))]
pub(super) async fn identifier<R, S>(repo: &R, store: &S, identifier: Vec<u8>) -> Result<(), Error>
async fn identifier<R, S>(repo: &R, store: &S, identifier: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
@ -38,7 +65,7 @@ where
}
#[tracing::instrument(skip(repo))]
pub(super) async fn hash<R, S>(repo: &R, hash: Vec<u8>) -> Result<(), Error>
async fn hash<R, S>(repo: &R, hash: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
@ -63,7 +90,7 @@ where
for identifier in idents {
if let Ok(identifier) = identifier.to_bytes() {
let job = serde_json::to_vec(&Job::CleanupIdentifier { identifier })?;
let job = serde_json::to_vec(&Cleanup::CleanupIdentifier { identifier })?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
}
}

87
src/queue/process.rs Normal file
View file

@ -0,0 +1,87 @@
use crate::{
config::ImageFormat,
error::Error,
queue::{LocalBoxFuture, Process},
repo::{Alias, AliasRepo, HashRepo, IdentifierRepo, QueueRepo},
serde_str::Serde,
store::Store,
};
use std::path::PathBuf;
use uuid::Uuid;
pub(super) fn perform<'a, R, S>(
repo: &'a R,
store: &'a S,
job: &'a [u8],
) -> LocalBoxFuture<'a, Result<(), Error>>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
{
Box::pin(async move {
match serde_json::from_slice(job) {
Ok(job) => match job {
Process::Ingest {
identifier,
upload_id,
declared_alias,
should_validate,
} => {
ingest(
repo,
store,
identifier,
upload_id,
declared_alias.map(Serde::into_inner),
should_validate,
)
.await?
}
Process::Generate {
target_format,
source,
process_path,
process_args,
} => {
generate(
repo,
store,
target_format,
Serde::into_inner(source),
process_path,
process_args,
)
.await?
}
},
Err(e) => {
tracing::warn!("Invalid job: {}", e);
}
}
Ok(())
})
}
async fn ingest<R, S>(
repo: &R,
store: &S,
identifier: Vec<u8>,
upload_id: Uuid,
declared_alias: Option<Alias>,
should_validate: bool,
) -> Result<(), Error> {
unimplemented!("do this")
}
async fn generate<R, S>(
repo: &R,
store: &S,
target_format: ImageFormat,
source: Alias,
process_path: PathBuf,
process_args: Vec<String>,
) -> Result<(), Error> {
unimplemented!("do this")
}

View file

@ -30,10 +30,29 @@ pub(crate) struct DeleteToken {
pub(crate) struct AlreadyExists;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct UploadId {
id: Uuid,
}
pub(crate) enum UploadResult {
Success { alias: Alias, token: DeleteToken },
Failure { message: String },
}
pub(crate) trait BaseRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait UploadRepo: BaseRepo {
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, Error>;
async fn claim(&self, upload_id: UploadId) -> Result<(), Error>;
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error>;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait QueueRepo: BaseRepo {
async fn in_progress(&self, worker_id: Vec<u8>) -> Result<Option<Self::Bytes>, Error>;
@ -362,6 +381,21 @@ impl DeleteToken {
}
}
impl UploadId {
pub(crate) fn generate() -> Self {
Self { id: Uuid::new_v4() }
}
pub(crate) fn as_bytes(&self) -> &[u8] {
&self.id.as_bytes()[..]
}
pub(crate) fn from_bytes(&self, bytes: &[u8]) -> Option<Self> {
let id = Uuid::from_slice(bytes).ok()?;
Some(Self { id })
}
}
impl std::fmt::Display for MaybeUuid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@ -377,6 +411,14 @@ impl std::fmt::Display for DeleteToken {
}
}
impl std::str::FromStr for Alias {
type Err = std::convert::Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Alias::from_existing(s))
}
}
impl std::fmt::Display for Alias {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(ext) = self.extension() {

View file

@ -16,9 +16,9 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug {
}
#[async_trait::async_trait(?Send)]
pub(crate) trait Store: Send + Sync + Clone + Debug + 'static {
type Identifier: Identifier;
type Stream: Stream<Item = std::io::Result<Bytes>>;
pub(crate) trait Store: Send + Sync + Clone + Debug {
type Identifier: Identifier + 'static;
type Stream: Stream<Item = std::io::Result<Bytes>> + 'static;
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
where
@ -45,3 +45,51 @@ pub(crate) trait Store: Send + Sync + Clone + Debug + 'static {
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error>;
}
#[async_trait::async_trait(?Send)]
impl<'a, T> Store for &'a T
where
T: Store,
{
type Identifier = T::Identifier;
type Stream = T::Stream;
async fn save_async_read<Reader>(&self, reader: &mut Reader) -> Result<Self::Identifier, Error>
where
Reader: AsyncRead + Unpin,
{
T::save_async_read(self, reader).await
}
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
T::save_bytes(self, bytes).await
}
async fn to_stream(
&self,
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Error> {
T::to_stream(self, identifier, from_start, len).await
}
async fn read_into<Writer>(
&self,
identifier: &Self::Identifier,
writer: &mut Writer,
) -> Result<(), std::io::Error>
where
Writer: AsyncWrite + Send + Unpin,
{
T::read_into(self, identifier, writer).await
}
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
T::len(self, identifier).await
}
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
T::remove(self, identifier).await
}
}

View file

@ -61,7 +61,7 @@ impl UploadManager {
}
}
pub(crate) async fn still_identifier_from_alias<S: Store + Clone>(
pub(crate) async fn still_identifier_from_alias<S: Store + Clone + 'static>(
&self,
store: S,
alias: &Alias,