Better job recovery

This commit is contained in:
Aode (Lion) 2022-04-02 18:53:03 -05:00
parent 77a400c7ca
commit c4d014597e
4 changed files with 81 additions and 25 deletions

View file

@ -127,6 +127,7 @@ async fn upload<R: FullRepo, S: Store + 'static>(
for mut image in images { for mut image in images {
image.result.disarm(); image.result.disarm();
} }
Ok(HttpResponse::Created().json(&serde_json::json!({ Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok", "msg": "ok",
"files": files "files": files
@ -539,6 +540,8 @@ async fn launch<R: FullRepo + Clone + 'static, S: Store + Clone + 'static>(
repo: R, repo: R,
store: S, store: S,
) -> color_eyre::Result<()> { ) -> color_eyre::Result<()> {
repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
// Create a new Multipart Form validator // Create a new Multipart Form validator
// //
// This form is expecting a single array field, 'images' with at most 10 files in it // This form is expecting a single array field, 'images' with at most 10 files in it

View file

@ -6,6 +6,7 @@ use crate::{
store::{Identifier, Store}, store::{Identifier, Store},
}; };
use std::{future::Future, path::PathBuf, pin::Pin}; use std::{future::Future, path::PathBuf, pin::Pin};
use tracing::Instrument;
use uuid::Uuid; use uuid::Uuid;
mod cleanup; mod cleanup;
@ -111,7 +112,7 @@ pub(crate) async fn queue_generate<R: QueueRepo>(
} }
pub(crate) async fn process_cleanup<R: FullRepo, S: Store>(repo: R, store: S, worker_id: String) { pub(crate) async fn process_cleanup<R: FullRepo, S: Store>(repo: R, store: S, worker_id: String) {
process_jobs(&repo, &store, worker_id, cleanup::perform).await process_jobs(&repo, &store, worker_id, CLEANUP_QUEUE, cleanup::perform).await
} }
pub(crate) async fn process_images<R: FullRepo + 'static, S: Store + 'static>( pub(crate) async fn process_images<R: FullRepo + 'static, S: Store + 'static>(
@ -119,26 +120,25 @@ pub(crate) async fn process_images<R: FullRepo + 'static, S: Store + 'static>(
store: S, store: S,
worker_id: String, worker_id: String,
) { ) {
process_jobs(&repo, &store, worker_id, process::perform).await process_jobs(&repo, &store, worker_id, PROCESS_QUEUE, process::perform).await
} }
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>; type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
async fn process_jobs<R, S, F>(repo: &R, store: &S, worker_id: String, callback: F) async fn process_jobs<R, S, F>(
where repo: &R,
store: &S,
worker_id: String,
queue: &'static str,
callback: F,
) where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone, R::Bytes: Clone,
S: Store, S: Store,
for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
{ {
if let Ok(Some(job)) = repo.in_progress(worker_id.as_bytes().to_vec()).await {
if let Err(e) = (callback)(repo, store, job.as_ref()).await {
tracing::warn!("Failed to run previously dropped job: {}", e);
tracing::warn!("{:?}", e);
}
}
loop { loop {
let res = job_loop(repo, store, worker_id.clone(), callback).await; let res = job_loop(repo, store, worker_id.clone(), queue, callback).await;
if let Err(e) = res { if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", e); tracing::warn!("Error processing jobs: {}", e);
@ -150,7 +150,13 @@ where
} }
} }
async fn job_loop<R, S, F>(repo: &R, store: &S, worker_id: String, callback: F) -> Result<(), Error> async fn job_loop<R, S, F>(
repo: &R,
store: &S,
worker_id: String,
queue: &'static str,
callback: F,
) -> Result<(), Error>
where where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo, R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone, R::Bytes: Clone,
@ -158,10 +164,12 @@ where
for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy, for<'a> F: Fn(&'a R, &'a S, &'a [u8]) -> LocalBoxFuture<'a, Result<(), Error>> + Copy,
{ {
loop { loop {
let bytes = repo let bytes = repo.pop(queue, worker_id.as_bytes().to_vec()).await?;
.pop(CLEANUP_QUEUE, worker_id.as_bytes().to_vec())
.await?;
(callback)(repo, store, bytes.as_ref()).await?; let span = tracing::info_span!("Running Job", worker_id = ?worker_id);
span.in_scope(|| (callback)(repo, store, bytes.as_ref()))
.instrument(span)
.await?;
} }
} }

View file

@ -97,7 +97,7 @@ pub(crate) trait UploadRepo: BaseRepo {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait QueueRepo: BaseRepo { pub(crate) trait QueueRepo: BaseRepo {
async fn in_progress(&self, worker_id: Vec<u8>) -> Result<Option<Self::Bytes>, Error>; async fn requeue_in_progress(&self, worker_prefix: Vec<u8>) -> Result<(), Error>;
async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error>; async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error>;

View file

@ -155,20 +155,60 @@ impl UploadRepo for SledRepo {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl QueueRepo for SledRepo { impl QueueRepo for SledRepo {
async fn in_progress(&self, worker_id: Vec<u8>) -> Result<Option<Self::Bytes>, Error> { #[tracing::instrument(skip_all, fields(worker_id = %String::from_utf8_lossy(&worker_prefix)))]
let opt = b!(self.in_progress_queue, in_progress_queue.get(worker_id)); async fn requeue_in_progress(&self, worker_prefix: Vec<u8>) -> Result<(), Error> {
let vec: Vec<(String, IVec)> = b!(self.in_progress_queue, {
let vec = in_progress_queue
.scan_prefix(worker_prefix)
.values()
.filter_map(Result::ok)
.filter_map(|ivec| {
let index = ivec.as_ref().iter().enumerate().find_map(|(index, byte)| {
if *byte == 0 {
Some(index)
} else {
None
}
})?;
Ok(opt) let (queue, job) = ivec.split_at(index);
if queue.is_empty() || job.len() <= 1 {
return None;
}
let job = &job[1..];
Some((String::from_utf8_lossy(queue).to_string(), IVec::from(job)))
})
.collect::<Vec<(String, IVec)>>();
Ok(vec) as Result<_, Error>
});
let db = self.db.clone();
b!(self.queue, {
for (queue_name, job) in vec {
let id = db.generate_id()?;
let mut key = queue_name.as_bytes().to_vec();
key.extend(id.to_be_bytes());
queue.insert(key, job)?;
}
Ok(()) as Result<(), Error>
});
Ok(())
} }
async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error> { #[tracing::instrument(skip(self, job), fields(worker_id = %String::from_utf8_lossy(&job)))]
async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), Error> {
let id = self.db.generate_id()?; let id = self.db.generate_id()?;
let mut key = queue.as_bytes().to_vec(); let mut key = queue_name.as_bytes().to_vec();
key.extend(id.to_be_bytes()); key.extend(id.to_be_bytes());
b!(self.queue, queue.insert(key, job)); b!(self.queue, queue.insert(key, job));
if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue) { if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) {
notifier.notify_one(); notifier.notify_one();
return Ok(()); return Ok(());
} }
@ -176,13 +216,14 @@ impl QueueRepo for SledRepo {
self.queue_notifier self.queue_notifier
.write() .write()
.unwrap() .unwrap()
.entry(queue) .entry(queue_name)
.or_insert_with(|| Arc::new(Notify::new())) .or_insert_with(|| Arc::new(Notify::new()))
.notify_one(); .notify_one();
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, worker_id), fields(worker_id = %String::from_utf8_lossy(&worker_id)))]
async fn pop( async fn pop(
&self, &self,
queue_name: &'static str, queue_name: &'static str,
@ -199,7 +240,11 @@ impl QueueRepo for SledRepo {
.scan_prefix(queue_name.as_bytes()) .scan_prefix(queue_name.as_bytes())
.find_map(Result::ok) .find_map(Result::ok)
{ {
in_progress_queue.insert(&worker_id, &job)?; let mut in_progress_value = queue_name.as_bytes().to_vec();
in_progress_value.push(0);
in_progress_value.extend_from_slice(&job);
in_progress_queue.insert(&worker_id, in_progress_value)?;
if queue.remove(key)?.is_some() { if queue.remove(key)?.is_some() {
return Ok(Some(job)); return Ok(Some(job));