Support multiple queues for job processor

This commit is contained in:
Aode (lion) 2022-04-01 11:51:46 -05:00
parent 09281d9ae8
commit c0d8e0e8e3
7 changed files with 152 additions and 100 deletions

View file

@ -1,9 +1,12 @@
use crate::{ use crate::{
error::Error, error::Error,
repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo}, repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo},
store::{Identifier, Store}, store::Store,
}; };
use tracing::{debug, error};
mod cleanup;
const CLEANUP_QUEUE: &str = "cleanup";
#[derive(Debug, serde::Deserialize, serde::Serialize)] #[derive(Debug, serde::Deserialize, serde::Serialize)]
enum Job { enum Job {
@ -15,7 +18,7 @@ pub(crate) async fn queue_cleanup<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Res
let job = serde_json::to_vec(&Job::CleanupHash { let job = serde_json::to_vec(&Job::CleanupHash {
hash: hash.as_ref().to_vec(), hash: hash.as_ref().to_vec(),
})?; })?;
repo.push(job.into()).await?; repo.push(CLEANUP_QUEUE, job.into()).await?;
Ok(()) Ok(())
} }
@ -50,7 +53,9 @@ where
S: Store, S: Store,
{ {
loop { loop {
let bytes = repo.pop(worker_id.as_bytes().to_vec()).await?; let bytes = repo
.pop(CLEANUP_QUEUE, worker_id.as_bytes().to_vec())
.await?;
run_job(repo, store, bytes.as_ref()).await?; run_job(repo, store, bytes.as_ref()).await?;
} }
@ -64,9 +69,9 @@ where
{ {
match serde_json::from_slice(job) { match serde_json::from_slice(job) {
Ok(job) => match job { Ok(job) => match job {
Job::CleanupHash { hash } => cleanup_hash::<R, S>(repo, hash).await?, Job::CleanupHash { hash } => cleanup::hash::<R, S>(repo, hash).await?,
Job::CleanupIdentifier { identifier } => { Job::CleanupIdentifier { identifier } => {
cleanup_identifier(repo, store, identifier).await? cleanup::identifier(repo, store, identifier).await?
} }
}, },
Err(e) => { Err(e) => {
@ -76,71 +81,3 @@ where
Ok(()) Ok(())
} }
#[tracing::instrument(skip(repo, store))]
async fn cleanup_identifier<R, S>(repo: &R, store: &S, identifier: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
S: Store,
{
let identifier = S::Identifier::from_bytes(identifier)?;
let mut errors = Vec::new();
debug!("Deleting {:?}", identifier);
if let Err(e) = store.remove(&identifier).await {
errors.push(e);
}
if let Err(e) = IdentifierRepo::cleanup(repo, &identifier).await {
errors.push(e);
}
if !errors.is_empty() {
let span = tracing::error_span!("Error deleting files");
span.in_scope(|| {
for error in errors {
error!("{}", error);
}
});
}
Ok(())
}
#[tracing::instrument(skip(repo))]
async fn cleanup_hash<R, S>(repo: &R, hash: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
S: Store,
{
let hash: R::Bytes = hash.into();
let aliases = repo.aliases(hash.clone()).await?;
if !aliases.is_empty() {
return Ok(());
}
let mut idents = repo
.variants::<S::Identifier>(hash.clone())
.await?
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>();
idents.push(repo.identifier(hash.clone()).await?);
idents.extend(repo.motion_identifier(hash.clone()).await?);
for identifier in idents {
if let Ok(identifier) = identifier.to_bytes() {
let job = serde_json::to_vec(&Job::CleanupIdentifier { identifier })?;
repo.push(job.into()).await?;
}
}
HashRepo::cleanup(repo, hash).await?;
Ok(())
}

74
src/queue/cleanup.rs Normal file
View file

@ -0,0 +1,74 @@
use crate::{
error::Error,
queue::{Job, CLEANUP_QUEUE},
repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo},
store::{Identifier, Store},
};
use tracing::error;
#[tracing::instrument(skip(repo, store))]
pub(super) async fn identifier<R, S>(repo: &R, store: &S, identifier: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
S: Store,
{
let identifier = S::Identifier::from_bytes(identifier)?;
let mut errors = Vec::new();
if let Err(e) = store.remove(&identifier).await {
errors.push(e);
}
if let Err(e) = IdentifierRepo::cleanup(repo, &identifier).await {
errors.push(e);
}
if !errors.is_empty() {
let span = tracing::error_span!("Error deleting files");
span.in_scope(|| {
for error in errors {
error!("{}", error);
}
});
}
Ok(())
}
#[tracing::instrument(skip(repo))]
pub(super) async fn hash<R, S>(repo: &R, hash: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + AliasRepo + HashRepo + IdentifierRepo,
R::Bytes: Clone,
S: Store,
{
let hash: R::Bytes = hash.into();
let aliases = repo.aliases(hash.clone()).await?;
if !aliases.is_empty() {
return Ok(());
}
let mut idents = repo
.variants::<S::Identifier>(hash.clone())
.await?
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>();
idents.push(repo.identifier(hash.clone()).await?);
idents.extend(repo.motion_identifier(hash.clone()).await?);
for identifier in idents {
if let Ok(identifier) = identifier.to_bytes() {
let job = serde_json::to_vec(&Job::CleanupIdentifier { identifier })?;
repo.push(CLEANUP_QUEUE, job.into()).await?;
}
}
HashRepo::cleanup(repo, hash).await?;
Ok(())
}

View file

@ -38,16 +38,16 @@ pub(crate) trait BaseRepo {
pub(crate) trait QueueRepo: BaseRepo { pub(crate) trait QueueRepo: BaseRepo {
async fn in_progress(&self, worker_id: Vec<u8>) -> Result<Option<Self::Bytes>, Error>; async fn in_progress(&self, worker_id: Vec<u8>) -> Result<Option<Self::Bytes>, Error>;
async fn push(&self, job: Self::Bytes) -> Result<(), Error>; async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error>;
async fn pop(&self, worker_id: Vec<u8>) -> Result<Self::Bytes, Error>; async fn pop(&self, queue: &'static str, worker_id: Vec<u8>) -> Result<Self::Bytes, Error>;
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait SettingsRepo: BaseRepo { pub(crate) trait SettingsRepo: BaseRepo {
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error>; async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error>;
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Error>; async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, Error>;
async fn remove(&self, key: &'static [u8]) -> Result<(), Error>; async fn remove(&self, key: &'static str) -> Result<(), Error>;
} }
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
@ -186,9 +186,9 @@ impl Repo {
} }
} }
const REPO_MIGRATION_O1: &[u8] = b"repo-migration-01"; const REPO_MIGRATION_O1: &str = "repo-migration-01";
const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress"; const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
const GENERATOR_KEY: &[u8] = b"last-path"; const GENERATOR_KEY: &str = "last-path";
async fn migrate_hash<T>(repo: &T, old: &old::Old, hash: ::sled::IVec) -> color_eyre::Result<()> async fn migrate_hash<T>(repo: &T, old: &old::Old, hash: ::sled::IVec) -> color_eyre::Result<()>
where where
@ -233,12 +233,12 @@ where
let _ = repo.relate_details(&identifier.to_vec(), &details).await; let _ = repo.relate_details(&identifier.to_vec(), &details).await;
} }
if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS) { if let Ok(Some(value)) = old.setting(STORE_MIGRATION_PROGRESS.as_bytes()) {
repo.set(STORE_MIGRATION_PROGRESS, value.to_vec().into()) repo.set(STORE_MIGRATION_PROGRESS, value.to_vec().into())
.await?; .await?;
} }
if let Ok(Some(value)) = old.setting(GENERATOR_KEY) { if let Ok(Some(value)) = old.setting(GENERATOR_KEY.as_bytes()) {
repo.set(GENERATOR_KEY, value.to_vec().into()).await?; repo.set(GENERATOR_KEY, value.to_vec().into()).await?;
} }

View file

@ -8,7 +8,11 @@ use crate::{
}; };
use futures_util::Stream; use futures_util::Stream;
use sled::{Db, IVec, Tree}; use sled::{Db, IVec, Tree};
use std::{pin::Pin, sync::Arc}; use std::{
collections::HashMap,
pin::Pin,
sync::{Arc, RwLock},
};
use tokio::sync::Notify; use tokio::sync::Notify;
use super::BaseRepo; use super::BaseRepo;
@ -52,7 +56,7 @@ pub(crate) struct SledRepo {
alias_delete_tokens: Tree, alias_delete_tokens: Tree,
queue: Tree, queue: Tree,
in_progress_queue: Tree, in_progress_queue: Tree,
queue_notifier: Arc<Notify>, queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
db: Db, db: Db,
} }
@ -71,7 +75,7 @@ impl SledRepo {
alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?, alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?,
queue: db.open_tree("pict-rs-queue-tree")?, queue: db.open_tree("pict-rs-queue-tree")?,
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?, in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
queue_notifier: Arc::new(Notify::new()), queue_notifier: Arc::new(RwLock::new(HashMap::new())),
db, db,
}) })
} }
@ -89,16 +93,33 @@ impl QueueRepo for SledRepo {
Ok(opt) Ok(opt)
} }
async fn push(&self, job: Self::Bytes) -> Result<(), Error> { async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error> {
let id = self.db.generate_id()?; let id = self.db.generate_id()?;
b!(self.queue, queue.insert(id.to_be_bytes(), job)); let mut key = queue.as_bytes().to_vec();
self.queue_notifier.notify_one(); key.extend(id.to_be_bytes());
b!(self.queue, queue.insert(key, job));
if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue) {
notifier.notify_one();
return Ok(());
}
self.queue_notifier
.write()
.unwrap()
.entry(queue)
.or_insert_with(|| Arc::new(Notify::new()))
.notify_one();
Ok(()) Ok(())
} }
async fn pop(&self, worker_id: Vec<u8>) -> Result<Self::Bytes, Error> { async fn pop(
let notify = Arc::clone(&self.queue_notifier); &self,
queue_name: &'static str,
worker_id: Vec<u8>,
) -> Result<Self::Bytes, Error> {
loop { loop {
let in_progress_queue = self.in_progress_queue.clone(); let in_progress_queue = self.in_progress_queue.clone();
@ -106,7 +127,10 @@ impl QueueRepo for SledRepo {
let job = b!(self.queue, { let job = b!(self.queue, {
in_progress_queue.remove(&worker_id)?; in_progress_queue.remove(&worker_id)?;
while let Some((key, job)) = queue.iter().find_map(Result::ok) { while let Some((key, job)) = queue
.scan_prefix(queue_name.as_bytes())
.find_map(Result::ok)
{
in_progress_queue.insert(&worker_id, &job)?; in_progress_queue.insert(&worker_id, &job)?;
if queue.remove(key)?.is_some() { if queue.remove(key)?.is_some() {
@ -123,6 +147,23 @@ impl QueueRepo for SledRepo {
return Ok(job); return Ok(job);
} }
let opt = self
.queue_notifier
.read()
.unwrap()
.get(&queue_name)
.map(Arc::clone);
let notify = if let Some(notify) = opt {
notify
} else {
let mut guard = self.queue_notifier.write().unwrap();
let entry = guard
.entry(queue_name)
.or_insert_with(|| Arc::new(Notify::new()));
Arc::clone(entry)
};
notify.notified().await notify.notified().await
} }
} }
@ -131,21 +172,21 @@ impl QueueRepo for SledRepo {
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl SettingsRepo for SledRepo { impl SettingsRepo for SledRepo {
#[tracing::instrument(skip(value))] #[tracing::instrument(skip(value))]
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error> { async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error> {
b!(self.settings, settings.insert(key, value)); b!(self.settings, settings.insert(key, value));
Ok(()) Ok(())
} }
#[tracing::instrument] #[tracing::instrument]
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Error> { async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, Error> {
let opt = b!(self.settings, settings.get(key)); let opt = b!(self.settings, settings.get(key));
Ok(opt) Ok(opt)
} }
#[tracing::instrument] #[tracing::instrument]
async fn remove(&self, key: &'static [u8]) -> Result<(), Error> { async fn remove(&self, key: &'static str) -> Result<(), Error> {
b!(self.settings, settings.remove(key)); b!(self.settings, settings.remove(key));
Ok(()) Ok(())

View file

@ -20,7 +20,7 @@ pub(crate) use file_id::FileId;
// - Settings Tree // - Settings Tree
// - last-path -> last generated path // - last-path -> last generated path
const GENERATOR_KEY: &[u8] = b"last-path"; const GENERATOR_KEY: &str = "last-path";
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum FileError { pub(crate) enum FileError {

View file

@ -19,7 +19,7 @@ pub(crate) use object_id::ObjectId;
// - Settings Tree // - Settings Tree
// - last-path -> last generated path // - last-path -> last generated path
const GENERATOR_KEY: &[u8] = b"last-path"; const GENERATOR_KEY: &str = "last-path";
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum ObjectError { pub(crate) enum ObjectError {

View file

@ -20,7 +20,7 @@ mod session;
pub(super) use session::UploadManagerSession; pub(super) use session::UploadManagerSession;
const STORE_MIGRATION_PROGRESS: &[u8] = b"store-migration-progress"; const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct UploadManager { pub(crate) struct UploadManager {