Enable multi-threaded job processing

This commit is contained in:
Aode (lion) 2022-03-29 13:18:47 -05:00
parent 602d1ea935
commit 8226a3571d
5 changed files with 120 additions and 101 deletions

View file

@ -1,5 +1,6 @@
[server]
address = '0.0.0.0:8080'
worker_id = 'pict-rs-1'
[tracing.logging]
format = 'normal'
targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info'

View file

@ -1,5 +1,6 @@
[server]
address = '0.0.0.0:8080'
worker_id = 'pict-rs-1'
[tracing.logging]
format = 'normal'
targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info'

View file

@ -1,5 +1,6 @@
[server]
address = '0.0.0.0:8080'
worker_id = 'pict-rs-1'
[tracing.logging]
format = 'normal'
targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info'

View file

@ -15,11 +15,12 @@ use std::{
future::ready,
path::PathBuf,
pin::Pin,
sync::atomic::{AtomicU64, Ordering},
task::{Context, Poll},
time::SystemTime,
};
use tokio::{io::AsyncReadExt, sync::Semaphore};
use tracing::{debug, error, info, instrument, Span};
use tracing::{debug, error, info, instrument};
use tracing_actix_web::TracingLogger;
use tracing_awc::Tracing;
use tracing_futures::Instrument;
@ -382,38 +383,11 @@ async fn process<S: Store + 'static>(
let details = Details::from_bytes(bytes.clone(), format.as_hint()).await?;
let save_span = tracing::info_span!(
parent: None,
"Saving variant information",
path = tracing::field::debug(&thumbnail_path),
name = tracing::field::display(&alias),
);
save_span.follows_from(Span::current());
let details2 = details.clone();
let bytes2 = bytes.clone();
let alias2 = alias.clone();
actix_rt::spawn(
async move {
let identifier = match store.save_bytes(bytes2).await {
Ok(identifier) => identifier,
Err(e) => {
tracing::warn!("Failed to generate directory path: {}", e);
return;
}
};
if let Err(e) = manager.store_details(&identifier, &details2).await {
tracing::warn!("Error saving variant details: {}", e);
return;
}
if let Err(e) = manager
.store_variant(&alias2, &thumbnail_path, &identifier)
.await
{
tracing::warn!("Error saving variant info: {}", e);
}
}
.instrument(save_span),
);
let identifier = store.save_bytes(bytes.clone()).await?;
manager.store_details(&identifier, &details).await?;
manager
.store_variant(&alias, &thumbnail_path, &identifier)
.await?;
Ok((details, bytes)) as Result<(Details, web::Bytes), Error>
};
@ -632,18 +606,18 @@ fn build_reqwest_client() -> reqwest::Result<reqwest::Client> {
.build()
}
fn next_worker_id() -> String {
static WORKER_ID: AtomicU64 = AtomicU64::new(0);
let next_id = WORKER_ID.fetch_add(1, Ordering::Relaxed);
format!("{}-{}", CONFIG.server.worker_id, next_id)
}
async fn launch<S: Store + Clone + 'static>(
manager: UploadManager,
store: S,
) -> color_eyre::Result<()> {
let repo = manager.repo().clone();
actix_rt::spawn(queue::process_jobs(
repo,
store.clone(),
CONFIG.server.worker_id.as_bytes().to_vec(),
));
// Create a new Multipart Form validator
//
// This form is expecting a single array field, 'images' with at most 10 files in it
@ -717,11 +691,20 @@ async fn launch<S: Store + Clone + 'static>(
);
HttpServer::new(move || {
let manager = manager.clone();
let store = store.clone();
actix_rt::spawn(queue::process_jobs(
manager.repo().clone(),
store.clone(),
next_worker_id(),
));
App::new()
.wrap(TracingLogger::default())
.wrap(Deadline)
.app_data(web::Data::new(store.clone()))
.app_data(web::Data::new(manager.clone()))
.app_data(web::Data::new(store))
.app_data(web::Data::new(manager))
.app_data(web::Data::new(build_client()))
.app_data(web::Data::new(CONFIG.media.filters.clone()))
.service(

View file

@ -1,103 +1,100 @@
use crate::{
error::Error,
repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo},
store::Store,
store::{Identifier, Store},
};
use tracing::{debug, error, Span};
use tracing::{debug, error};
#[derive(Debug, serde::Deserialize, serde::Serialize)]
enum Job {
Cleanup { hash: Vec<u8> },
CleanupHash { hash: Vec<u8> },
CleanupIdentifier { identifier: Vec<u8> },
}
pub(crate) async fn queue_cleanup<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
let job = serde_json::to_vec(&Job::Cleanup {
let job = serde_json::to_vec(&Job::CleanupHash {
hash: hash.as_ref().to_vec(),
})?;
repo.push(job.into()).await?;
Ok(())
}
pub(crate) async fn process_jobs<S: Store>(repo: Repo, store: S, worker_id: Vec<u8>) {
loop {
let res = match repo {
Repo::Sled(ref repo) => do_process_jobs(repo, &store, worker_id.clone()).await,
};
pub(crate) async fn process_jobs<S: Store>(repo: Repo, store: S, worker_id: String) {
match repo {
Repo::Sled(ref repo) => {
if let Ok(Some(job)) = repo.in_progress(worker_id.as_bytes().to_vec()).await {
if let Err(e) = run_job(repo, &store, &job).await {
tracing::warn!("Failed to run previously dropped job: {}", e);
tracing::warn!("{:?}", e);
}
}
loop {
let res = job_loop(repo, &store, worker_id.clone()).await;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", e);
tracing::warn!("{:?}", e);
continue;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", e);
tracing::warn!("{:?}", e);
continue;
}
break;
}
}
break;
}
}
async fn do_process_jobs<R, S>(repo: &R, store: &S, worker_id: Vec<u8>) -> Result<(), Error>
async fn job_loop<R, S>(repo: &R, store: &S, worker_id: String) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
{
loop {
let bytes = repo.pop(worker_id.clone()).await?;
let bytes = repo.pop(worker_id.as_bytes().to_vec()).await?;
match serde_json::from_slice(bytes.as_ref()) {
Ok(job) => match job {
Job::Cleanup { hash } => cleanup(repo, store, hash).await?,
},
Err(e) => {
tracing::warn!("Invalid job: {}", e);
}
}
run_job(repo, store, bytes.as_ref()).await?;
}
}
#[tracing::instrument(skip(repo, store))]
async fn cleanup<R, S>(repo: &R, store: &S, hash: Vec<u8>) -> Result<(), Error>
async fn run_job<R, S>(repo: &R, store: &S, job: &[u8]) -> Result<(), Error>
where
R: HashRepo + IdentifierRepo + AliasRepo,
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
{
let hash: R::Bytes = hash.into();
let aliases = repo.aliases(hash.clone()).await?;
if !aliases.is_empty() {
return Ok(());
match serde_json::from_slice(job) {
Ok(job) => match job {
Job::CleanupHash { hash } => cleanup_hash::<R, S>(repo, hash).await?,
Job::CleanupIdentifier { identifier } => {
cleanup_identifier(repo, store, identifier).await?
}
},
Err(e) => {
tracing::warn!("Invalid job: {}", e);
}
}
let variant_idents = repo
.variants::<S::Identifier>(hash.clone())
.await?
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>();
let main_ident = repo.identifier(hash.clone()).await?;
let motion_ident = repo.motion_identifier(hash.clone()).await?;
Ok(())
}
HashRepo::cleanup(repo, hash).await?;
let cleanup_span = tracing::info_span!(parent: None, "Cleaning files");
cleanup_span.follows_from(Span::current());
#[tracing::instrument(skip(repo, store))]
async fn cleanup_identifier<R, S>(repo: &R, store: &S, identifier: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
S: Store,
{
let identifier = S::Identifier::from_bytes(identifier)?;
let mut errors = Vec::new();
for identifier in variant_idents
.iter()
.chain(&[main_ident])
.chain(motion_ident.iter())
{
debug!("Deleting {:?}", identifier);
if let Err(e) = store.remove(identifier).await {
errors.push(e);
}
debug!("Deleting {:?}", identifier);
if let Err(e) = store.remove(&identifier).await {
errors.push(e);
}
if let Err(e) = IdentifierRepo::cleanup(repo, identifier).await {
errors.push(e);
}
if let Err(e) = IdentifierRepo::cleanup(repo, &identifier).await {
errors.push(e);
}
if !errors.is_empty() {
@ -111,3 +108,39 @@ where
Ok(())
}
#[tracing::instrument(skip(repo))]
async fn cleanup_hash<R, S>(repo: &R, hash: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
S: Store,
{
let hash: R::Bytes = hash.into();
let aliases = repo.aliases(hash.clone()).await?;
if !aliases.is_empty() {
return Ok(());
}
let mut idents = repo
.variants::<S::Identifier>(hash.clone())
.await?
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>();
idents.push(repo.identifier(hash.clone()).await?);
idents.extend(repo.motion_identifier(hash.clone()).await?);
for identifier in idents {
if let Ok(identifier) = identifier.to_bytes() {
let job = serde_json::to_vec(&Job::CleanupIdentifier { identifier })?;
repo.push(job.into()).await?;
}
}
HashRepo::cleanup(repo, hash).await?;
Ok(())
}