Begin implementing queue, put cleanups in it

This commit is contained in:
Aode (lion) 2022-03-29 12:51:16 -05:00
parent e25a4781a8
commit 602d1ea935
8 changed files with 245 additions and 101 deletions

View file

@ -44,6 +44,7 @@ impl Args {
Command::Run(Run {
address,
api_key,
worker_id,
media_skip_validate_imports,
media_max_width,
media_max_height,
@ -54,7 +55,11 @@ impl Args {
media_format,
store,
}) => {
let server = Server { address, api_key };
let server = Server {
address,
api_key,
worker_id,
};
let media = Media {
skip_validate_imports: media_skip_validate_imports,
max_width: media_max_width,
@ -240,6 +245,8 @@ struct Server {
#[serde(skip_serializing_if = "Option::is_none")]
address: Option<SocketAddr>,
#[serde(skip_serializing_if = "Option::is_none")]
worker_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
api_key: Option<String>,
}
@ -372,6 +379,9 @@ struct Run {
#[clap(long)]
api_key: Option<String>,
#[clap(long)]
worker_id: Option<String>,
/// Whether to validate media on the "import" endpoint
#[clap(long)]
media_skip_validate_imports: Option<bool>,

View file

@ -19,6 +19,7 @@ pub(crate) struct Defaults {
#[serde(rename_all = "snake_case")]
struct ServerDefaults {
address: SocketAddr,
worker_id: String,
}
#[derive(Clone, Debug, Default, serde::Serialize)]
@ -100,6 +101,7 @@ impl Default for ServerDefaults {
fn default() -> Self {
ServerDefaults {
address: "0.0.0.0:8080".parse().expect("Valid address string"),
worker_id: String::from("pict-rs-1"),
}
}
}

View file

@ -33,6 +33,8 @@ pub(crate) enum Repo {
pub(crate) struct Server {
pub(crate) address: SocketAddr,
pub(crate) worker_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) api_key: Option<String>,
}

View file

@ -39,6 +39,7 @@ mod middleware;
mod migrate;
mod process;
mod processor;
mod queue;
mod range;
mod repo;
mod serde_str;
@ -246,9 +247,8 @@ async fn download<S: Store>(
/// Delete aliases and files
#[instrument(name = "Deleting file", skip(manager))]
async fn delete<S: Store>(
async fn delete(
manager: web::Data<UploadManager>,
store: web::Data<S>,
path_entries: web::Path<(String, String)>,
) -> Result<HttpResponse, Error> {
let (token, alias) = path_entries.into_inner();
@ -256,7 +256,7 @@ async fn delete<S: Store>(
let token = DeleteToken::from_existing(&token);
let alias = Alias::from_existing(&alias);
manager.delete((**store).clone(), alias, token).await?;
manager.delete(alias, token).await?;
Ok(HttpResponse::NoContent().finish())
}
@ -308,7 +308,6 @@ async fn process_details<S: Store>(
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
store: web::Data<S>,
filters: web::Data<BTreeSet<String>>,
) -> Result<HttpResponse, Error> {
let (_, alias, thumbnail_path, _) = prepare_process(query, ext.as_str(), &filters)?;
@ -581,17 +580,16 @@ struct AliasQuery {
}
#[instrument(name = "Purging file", skip(upload_manager))]
async fn purge<S: Store>(
async fn purge(
query: web::Query<AliasQuery>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error> {
let alias = Alias::from_existing(&query.alias);
let aliases = upload_manager.aliases_by_alias(&alias).await?;
for alias in aliases.iter() {
upload_manager
.delete_without_token((**store).clone(), alias.to_owned())
.delete_without_token(alias.to_owned())
.await?;
}
@ -602,10 +600,9 @@ async fn purge<S: Store>(
}
#[instrument(name = "Fetching aliases", skip(upload_manager))]
async fn aliases<S: Store>(
async fn aliases(
query: web::Query<AliasQuery>,
upload_manager: web::Data<UploadManager>,
store: web::Data<S>,
) -> Result<HttpResponse, Error> {
let alias = Alias::from_existing(&query.alias);
let aliases = upload_manager.aliases_by_alias(&alias).await?;
@ -639,6 +636,14 @@ async fn launch<S: Store + Clone + 'static>(
manager: UploadManager,
store: S,
) -> color_eyre::Result<()> {
let repo = manager.repo().clone();
actix_rt::spawn(queue::process_jobs(
repo,
store.clone(),
CONFIG.server.worker_id.as_bytes().to_vec(),
));
// Create a new Multipart Form validator
//
// This form is expecting a single array field, 'images' with at most 10 files in it
@ -730,8 +735,8 @@ async fn launch<S: Store + Clone + 'static>(
.service(web::resource("/download").route(web::get().to(download::<S>)))
.service(
web::resource("/delete/{delete_token}/{filename}")
.route(web::delete().to(delete::<S>))
.route(web::get().to(delete::<S>)),
.route(web::delete().to(delete))
.route(web::get().to(delete)),
)
.service(web::resource("/original/{filename}").route(web::get().to(serve::<S>)))
.service(web::resource("/process.{ext}").route(web::get().to(process::<S>)))
@ -757,8 +762,8 @@ async fn launch<S: Store + Clone + 'static>(
.wrap(import_form.clone())
.route(web::post().to(upload::<S>)),
)
.service(web::resource("/purge").route(web::post().to(purge::<S>)))
.service(web::resource("/aliases").route(web::get().to(aliases::<S>))),
.service(web::resource("/purge").route(web::post().to(purge)))
.service(web::resource("/aliases").route(web::get().to(aliases))),
)
})
.bind(CONFIG.server.address)?

113
src/queue.rs Normal file
View file

@ -0,0 +1,113 @@
use crate::{
error::Error,
repo::{AliasRepo, HashRepo, IdentifierRepo, QueueRepo, Repo},
store::Store,
};
use tracing::{debug, error, Span};
#[derive(Debug, serde::Deserialize, serde::Serialize)]
enum Job {
Cleanup { hash: Vec<u8> },
}
pub(crate) async fn queue_cleanup<R: QueueRepo>(repo: &R, hash: R::Bytes) -> Result<(), Error> {
let job = serde_json::to_vec(&Job::Cleanup {
hash: hash.as_ref().to_vec(),
})?;
repo.push(job.into()).await?;
Ok(())
}
pub(crate) async fn process_jobs<S: Store>(repo: Repo, store: S, worker_id: Vec<u8>) {
loop {
let res = match repo {
Repo::Sled(ref repo) => do_process_jobs(repo, &store, worker_id.clone()).await,
};
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", e);
tracing::warn!("{:?}", e);
continue;
}
break;
}
}
async fn do_process_jobs<R, S>(repo: &R, store: &S, worker_id: Vec<u8>) -> Result<(), Error>
where
R: QueueRepo + HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
{
loop {
let bytes = repo.pop(worker_id.clone()).await?;
match serde_json::from_slice(bytes.as_ref()) {
Ok(job) => match job {
Job::Cleanup { hash } => cleanup(repo, store, hash).await?,
},
Err(e) => {
tracing::warn!("Invalid job: {}", e);
}
}
}
}
#[tracing::instrument(skip(repo, store))]
async fn cleanup<R, S>(repo: &R, store: &S, hash: Vec<u8>) -> Result<(), Error>
where
R: HashRepo + IdentifierRepo + AliasRepo,
R::Bytes: Clone,
S: Store,
{
let hash: R::Bytes = hash.into();
let aliases = repo.aliases(hash.clone()).await?;
if !aliases.is_empty() {
return Ok(());
}
let variant_idents = repo
.variants::<S::Identifier>(hash.clone())
.await?
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>();
let main_ident = repo.identifier(hash.clone()).await?;
let motion_ident = repo.motion_identifier(hash.clone()).await?;
HashRepo::cleanup(repo, hash).await?;
let cleanup_span = tracing::info_span!(parent: None, "Cleaning files");
cleanup_span.follows_from(Span::current());
let mut errors = Vec::new();
for identifier in variant_idents
.iter()
.chain(&[main_ident])
.chain(motion_ident.iter())
{
debug!("Deleting {:?}", identifier);
if let Err(e) = store.remove(identifier).await {
errors.push(e);
}
if let Err(e) = IdentifierRepo::cleanup(repo, identifier).await {
errors.push(e);
}
}
if !errors.is_empty() {
let span = tracing::error_span!("Error deleting files");
span.in_scope(|| {
for error in errors {
error!("{}", error);
}
});
}
Ok(())
}

View file

@ -30,17 +30,28 @@ pub(crate) struct DeleteToken {
pub(crate) struct AlreadyExists;
#[async_trait::async_trait(?Send)]
pub(crate) trait SettingsRepo {
pub(crate) trait BaseRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait QueueRepo: BaseRepo {
async fn in_progress(&self, worker_id: Vec<u8>) -> Result<Option<Self::Bytes>, Error>;
async fn push(&self, job: Self::Bytes) -> Result<(), Error>;
async fn pop(&self, worker_id: Vec<u8>) -> Result<Self::Bytes, Error>;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait SettingsRepo: BaseRepo {
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error>;
async fn get(&self, key: &'static [u8]) -> Result<Option<Self::Bytes>, Error>;
async fn remove(&self, key: &'static [u8]) -> Result<(), Error>;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait IdentifierRepo {
pub(crate) trait IdentifierRepo: BaseRepo {
async fn relate_details<I: Identifier>(
&self,
identifier: &I,
@ -52,8 +63,7 @@ pub(crate) trait IdentifierRepo {
}
#[async_trait::async_trait(?Send)]
pub(crate) trait HashRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
pub(crate) trait HashRepo: BaseRepo {
type Stream: Stream<Item = Result<Self::Bytes, Error>>;
async fn hashes(&self) -> Self::Stream;
@ -101,9 +111,7 @@ pub(crate) trait HashRepo {
}
#[async_trait::async_trait(?Send)]
pub(crate) trait AliasRepo {
type Bytes: AsRef<[u8]> + From<Vec<u8>>;
pub(crate) trait AliasRepo: BaseRepo {
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Error>;
async fn relate_delete_token(

View file

@ -1,9 +1,15 @@
use super::{
Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier, IdentifierRepo,
SettingsRepo,
use crate::{
error::Error,
repo::{
Alias, AliasRepo, AlreadyExists, DeleteToken, Details, HashRepo, Identifier,
IdentifierRepo, QueueRepo, SettingsRepo,
},
};
use crate::error::Error;
use sled::{Db, IVec, Tree};
use std::sync::Arc;
use tokio::sync::Notify;
use super::BaseRepo;
macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{
@ -42,7 +48,10 @@ pub(crate) struct SledRepo {
aliases: Tree,
alias_hashes: Tree,
alias_delete_tokens: Tree,
_db: Db,
queue: Tree,
in_progress_queue: Tree,
queue_notifier: Arc<Notify>,
db: Db,
}
impl SledRepo {
@ -58,15 +67,67 @@ impl SledRepo {
aliases: db.open_tree("pict-rs-aliases-tree")?,
alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?,
alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?,
_db: db,
queue: db.open_tree("pict-rs-queue-tree")?,
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
queue_notifier: Arc::new(Notify::new()),
db,
})
}
}
impl BaseRepo for SledRepo {
type Bytes = IVec;
}
#[async_trait::async_trait(?Send)]
impl QueueRepo for SledRepo {
async fn in_progress(&self, worker_id: Vec<u8>) -> Result<Option<Self::Bytes>, Error> {
let opt = b!(self.in_progress_queue, in_progress_queue.get(worker_id));
Ok(opt)
}
async fn push(&self, job: Self::Bytes) -> Result<(), Error> {
let id = self.db.generate_id()?;
b!(self.queue, queue.insert(id.to_be_bytes(), job));
self.queue_notifier.notify_one();
Ok(())
}
async fn pop(&self, worker_id: Vec<u8>) -> Result<Self::Bytes, Error> {
let notify = Arc::clone(&self.queue_notifier);
loop {
let in_progress_queue = self.in_progress_queue.clone();
let worker_id = worker_id.clone();
let job = b!(self.queue, {
in_progress_queue.remove(&worker_id)?;
while let Some((key, job)) = queue.iter().find_map(Result::ok) {
in_progress_queue.insert(&worker_id, &job)?;
if queue.remove(key)?.is_some() {
return Ok(Some(job));
}
in_progress_queue.remove(&worker_id)?;
}
Ok(None) as Result<_, SledError>
});
if let Some(job) = job {
return Ok(job);
}
notify.notified().await
}
}
}
#[async_trait::async_trait(?Send)]
impl SettingsRepo for SledRepo {
type Bytes = IVec;
#[tracing::instrument(skip(value))]
async fn set(&self, key: &'static [u8], value: Self::Bytes) -> Result<(), Error> {
b!(self.settings, settings.insert(key, value));
@ -212,7 +273,6 @@ fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec<u8> {
#[async_trait::async_trait(?Send)]
impl HashRepo for SledRepo {
type Bytes = IVec;
type Stream = HashStream;
async fn hashes(&self) -> Self::Stream {
@ -429,8 +489,6 @@ impl HashRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl AliasRepo for SledRepo {
type Bytes = sled::IVec;
#[tracing::instrument]
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Error> {
let bytes = alias.to_bytes();

View file

@ -5,15 +5,15 @@ use crate::{
ffmpeg::{InputFormat, ThumbnailFormat},
magick::details_hint,
repo::{
sled::SledRepo, Alias, AliasRepo, DeleteToken, HashRepo, IdentifierRepo, Repo, SettingsRepo,
sled::SledRepo, Alias, AliasRepo, BaseRepo, DeleteToken, HashRepo, IdentifierRepo, Repo,
SettingsRepo,
},
store::{Identifier, Store},
};
use futures_util::StreamExt;
use sha2::Digest;
use std::sync::Arc;
use tracing::{debug, error, instrument, Span};
use tracing_futures::Instrument;
use tracing::instrument;
mod hasher;
mod session;
@ -34,6 +34,10 @@ pub(crate) struct UploadManagerInner {
}
impl UploadManager {
pub(crate) fn repo(&self) -> &Repo {
&self.inner.repo
}
/// Create a new UploadManager
pub(crate) async fn new(repo: Repo, format: Option<ImageFormat>) -> Result<Self, Error> {
let manager = UploadManager {
@ -229,26 +233,17 @@ impl UploadManager {
}
/// Delete an alias without a delete token
pub(crate) async fn delete_without_token<S: Store + 'static>(
&self,
store: S,
alias: Alias,
) -> Result<(), Error> {
pub(crate) async fn delete_without_token(&self, alias: Alias) -> Result<(), Error> {
let token = match self.inner.repo {
Repo::Sled(ref sled_repo) => sled_repo.delete_token(&alias).await?,
};
self.delete(store, alias, token).await
self.delete(alias, token).await
}
/// Delete the alias, and the file & variants if no more aliases exist
#[instrument(skip(self, alias, token))]
pub(crate) async fn delete<S: Store + 'static>(
&self,
store: S,
alias: Alias,
token: DeleteToken,
) -> Result<(), Error> {
pub(crate) async fn delete(&self, alias: Alias, token: DeleteToken) -> Result<(), Error> {
let hash = match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let saved_delete_token = sled_repo.delete_token(&alias).await?;
@ -262,17 +257,13 @@ impl UploadManager {
}
};
self.check_delete_files(store, hash).await
self.check_delete_files(hash).await
}
async fn check_delete_files<S: Store + 'static>(
&self,
store: S,
hash: Vec<u8>,
) -> Result<(), Error> {
async fn check_delete_files(&self, hash: Vec<u8>) -> Result<(), Error> {
match self.inner.repo {
Repo::Sled(ref sled_repo) => {
let hash: <SledRepo as HashRepo>::Bytes = hash.into();
let hash: <SledRepo as BaseRepo>::Bytes = hash.into();
let aliases = sled_repo.aliases(hash.clone()).await?;
@ -280,52 +271,7 @@ impl UploadManager {
return Ok(());
}
let variant_idents = sled_repo
.variants::<S::Identifier>(hash.clone())
.await?
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>();
let main_ident = sled_repo.identifier(hash.clone()).await?;
let motion_ident = sled_repo.motion_identifier(hash.clone()).await?;
let repo = sled_repo.clone();
HashRepo::cleanup(sled_repo, hash).await?;
let cleanup_span = tracing::info_span!(parent: None, "Cleaning files");
cleanup_span.follows_from(Span::current());
actix_rt::spawn(
async move {
let mut errors = Vec::new();
for identifier in variant_idents
.iter()
.chain(&[main_ident])
.chain(motion_ident.iter())
{
debug!("Deleting {:?}", identifier);
if let Err(e) = store.remove(identifier).await {
errors.push(e);
}
if let Err(e) = IdentifierRepo::cleanup(&repo, identifier).await {
errors.push(e);
}
}
if !errors.is_empty() {
let span = tracing::error_span!("Error deleting files");
span.in_scope(|| {
for error in errors {
error!("{}", error);
}
});
}
}
.instrument(cleanup_span),
);
crate::queue::queue_cleanup(sled_repo, hash).await?;
}
}