2022-03-29 17:51:16 +00:00
|
|
|
use crate::{
|
2023-07-08 22:35:57 +00:00
|
|
|
details::MaybeHumanDate,
|
2022-03-29 17:51:16 +00:00
|
|
|
repo::{
|
2023-08-14 19:25:19 +00:00
|
|
|
hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken,
|
2023-08-16 21:09:40 +00:00
|
|
|
Details, DetailsRepo, FullRepo, HashAlreadyExists, HashRepo, Identifier, JobId, ProxyRepo,
|
|
|
|
QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult,
|
|
|
|
VariantAccessRepo, VariantAlreadyExists,
|
2022-03-29 17:51:16 +00:00
|
|
|
},
|
2022-04-02 22:41:00 +00:00
|
|
|
serde_str::Serde,
|
2023-06-20 20:59:08 +00:00
|
|
|
store::StoreError,
|
2023-08-16 00:19:03 +00:00
|
|
|
stream::{from_iterator, LocalBoxStream},
|
2022-03-24 22:09:15 +00:00
|
|
|
};
|
2023-07-26 01:08:18 +00:00
|
|
|
use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree};
|
2022-04-01 16:51:46 +00:00
|
|
|
use std::{
|
2022-04-06 17:13:46 +00:00
|
|
|
collections::HashMap,
|
2023-07-08 22:35:57 +00:00
|
|
|
path::PathBuf,
|
2023-01-29 17:36:09 +00:00
|
|
|
sync::{
|
|
|
|
atomic::{AtomicU64, Ordering},
|
|
|
|
Arc, RwLock,
|
|
|
|
},
|
2023-07-23 02:11:28 +00:00
|
|
|
time::Instant,
|
2022-04-01 16:51:46 +00:00
|
|
|
};
|
2023-08-16 00:19:03 +00:00
|
|
|
use tokio::sync::Notify;
|
2023-07-23 15:23:37 +00:00
|
|
|
use url::Url;
|
2023-08-15 02:17:57 +00:00
|
|
|
use uuid::Uuid;
|
2022-03-29 17:51:16 +00:00
|
|
|
|
2022-03-24 22:09:15 +00:00
|
|
|
macro_rules! b {
|
|
|
|
($self:ident.$ident:ident, $expr:expr) => {{
|
|
|
|
let $ident = $self.$ident.clone();
|
|
|
|
|
2022-04-06 17:13:46 +00:00
|
|
|
let span = tracing::Span::current();
|
|
|
|
|
|
|
|
actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr))
|
2022-03-27 01:45:12 +00:00
|
|
|
.await
|
2023-06-20 20:59:08 +00:00
|
|
|
.map_err(SledError::from)
|
|
|
|
.map_err(RepoError::from)?
|
|
|
|
.map_err(SledError::from)
|
|
|
|
.map_err(RepoError::from)?
|
2022-03-24 22:09:15 +00:00
|
|
|
}};
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
2022-03-27 01:45:12 +00:00
|
|
|
pub(crate) enum SledError {
|
2022-03-24 22:09:15 +00:00
|
|
|
#[error("Error in database")]
|
|
|
|
Sled(#[from] sled::Error),
|
|
|
|
|
|
|
|
#[error("Invalid details json")]
|
|
|
|
Details(#[from] serde_json::Error),
|
|
|
|
|
2023-07-22 22:57:52 +00:00
|
|
|
#[error("Error parsing variant key")]
|
|
|
|
VariantKey(#[from] VariantKeyError),
|
|
|
|
|
2022-03-24 22:09:15 +00:00
|
|
|
#[error("Operation panicked")]
|
|
|
|
Panic,
|
2023-08-14 00:47:20 +00:00
|
|
|
|
|
|
|
#[error("Another process updated this value before us")]
|
|
|
|
Conflict,
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2022-03-26 21:49:23 +00:00
|
|
|
#[derive(Clone)]
|
2022-03-24 22:09:15 +00:00
|
|
|
pub(crate) struct SledRepo {
|
2023-01-29 17:36:09 +00:00
|
|
|
healthz_count: Arc<AtomicU64>,
|
|
|
|
healthz: Tree,
|
2022-03-24 22:09:15 +00:00
|
|
|
settings: Tree,
|
|
|
|
identifier_details: Tree,
|
|
|
|
hashes: Tree,
|
|
|
|
hash_aliases: Tree,
|
|
|
|
hash_identifiers: Tree,
|
2022-03-25 23:47:50 +00:00
|
|
|
hash_variant_identifiers: Tree,
|
|
|
|
hash_motion_identifiers: Tree,
|
2022-03-24 22:09:15 +00:00
|
|
|
aliases: Tree,
|
|
|
|
alias_hashes: Tree,
|
|
|
|
alias_delete_tokens: Tree,
|
2022-03-29 17:51:16 +00:00
|
|
|
queue: Tree,
|
2023-08-13 19:12:38 +00:00
|
|
|
job_state: Tree,
|
2023-07-19 02:56:13 +00:00
|
|
|
alias_access: Tree,
|
|
|
|
inverse_alias_access: Tree,
|
2023-07-22 22:57:52 +00:00
|
|
|
variant_access: Tree,
|
|
|
|
inverse_variant_access: Tree,
|
2023-07-23 15:23:37 +00:00
|
|
|
proxy: Tree,
|
|
|
|
inverse_proxy: Tree,
|
2022-04-01 16:51:46 +00:00
|
|
|
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
|
2022-04-02 22:41:00 +00:00
|
|
|
uploads: Tree,
|
2023-07-17 03:07:42 +00:00
|
|
|
migration_identifiers: Tree,
|
2023-07-08 22:35:57 +00:00
|
|
|
cache_capacity: u64,
|
|
|
|
export_path: PathBuf,
|
2022-03-29 17:51:16 +00:00
|
|
|
db: Db,
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl SledRepo {
|
2023-07-08 22:35:57 +00:00
|
|
|
#[tracing::instrument]
|
|
|
|
pub(crate) fn build(
|
|
|
|
path: PathBuf,
|
|
|
|
cache_capacity: u64,
|
|
|
|
export_path: PathBuf,
|
|
|
|
) -> color_eyre::Result<Self> {
|
|
|
|
let db = Self::open(path, cache_capacity)?;
|
|
|
|
|
2022-03-24 22:09:15 +00:00
|
|
|
Ok(SledRepo {
|
2023-01-29 17:36:09 +00:00
|
|
|
healthz_count: Arc::new(AtomicU64::new(0)),
|
|
|
|
healthz: db.open_tree("pict-rs-healthz-tree")?,
|
2022-03-24 22:09:15 +00:00
|
|
|
settings: db.open_tree("pict-rs-settings-tree")?,
|
|
|
|
identifier_details: db.open_tree("pict-rs-identifier-details-tree")?,
|
|
|
|
hashes: db.open_tree("pict-rs-hashes-tree")?,
|
|
|
|
hash_aliases: db.open_tree("pict-rs-hash-aliases-tree")?,
|
|
|
|
hash_identifiers: db.open_tree("pict-rs-hash-identifiers-tree")?,
|
2022-03-25 23:47:50 +00:00
|
|
|
hash_variant_identifiers: db.open_tree("pict-rs-hash-variant-identifiers-tree")?,
|
|
|
|
hash_motion_identifiers: db.open_tree("pict-rs-hash-motion-identifiers-tree")?,
|
2022-03-24 22:09:15 +00:00
|
|
|
aliases: db.open_tree("pict-rs-aliases-tree")?,
|
|
|
|
alias_hashes: db.open_tree("pict-rs-alias-hashes-tree")?,
|
|
|
|
alias_delete_tokens: db.open_tree("pict-rs-alias-delete-tokens-tree")?,
|
2022-03-29 17:51:16 +00:00
|
|
|
queue: db.open_tree("pict-rs-queue-tree")?,
|
2023-08-13 19:12:38 +00:00
|
|
|
job_state: db.open_tree("pict-rs-job-state-tree")?,
|
2023-07-19 02:56:13 +00:00
|
|
|
alias_access: db.open_tree("pict-rs-alias-access-tree")?,
|
|
|
|
inverse_alias_access: db.open_tree("pict-rs-inverse-alias-access-tree")?,
|
2023-07-22 22:57:52 +00:00
|
|
|
variant_access: db.open_tree("pict-rs-variant-access-tree")?,
|
|
|
|
inverse_variant_access: db.open_tree("pict-rs-inverse-variant-access-tree")?,
|
2023-07-23 15:23:37 +00:00
|
|
|
proxy: db.open_tree("pict-rs-proxy-tree")?,
|
|
|
|
inverse_proxy: db.open_tree("pict-rs-inverse-proxy-tree")?,
|
2022-04-01 16:51:46 +00:00
|
|
|
queue_notifier: Arc::new(RwLock::new(HashMap::new())),
|
2022-04-02 22:41:00 +00:00
|
|
|
uploads: db.open_tree("pict-rs-uploads-tree")?,
|
2023-07-17 03:07:42 +00:00
|
|
|
migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?,
|
2023-07-08 22:35:57 +00:00
|
|
|
cache_capacity,
|
|
|
|
export_path,
|
2022-03-29 17:51:16 +00:00
|
|
|
db,
|
2022-03-24 22:09:15 +00:00
|
|
|
})
|
|
|
|
}
|
2023-07-08 22:35:57 +00:00
|
|
|
|
|
|
|
fn open(mut path: PathBuf, cache_capacity: u64) -> Result<Db, SledError> {
|
2023-08-14 00:47:20 +00:00
|
|
|
path.push("v0.5.0");
|
2023-07-08 22:35:57 +00:00
|
|
|
|
|
|
|
let db = ::sled::Config::new()
|
|
|
|
.cache_capacity(cache_capacity)
|
|
|
|
.path(path)
|
|
|
|
.open()?;
|
|
|
|
|
|
|
|
Ok(db)
|
|
|
|
}
|
|
|
|
|
2023-07-23 00:55:50 +00:00
|
|
|
#[tracing::instrument(level = "warn", skip_all)]
|
2023-07-08 22:35:57 +00:00
|
|
|
pub(crate) async fn export(&self) -> Result<(), RepoError> {
|
|
|
|
let path = self
|
|
|
|
.export_path
|
|
|
|
.join(MaybeHumanDate::HumanDate(time::OffsetDateTime::now_utc()).to_string());
|
|
|
|
|
|
|
|
let export_db = Self::open(path, self.cache_capacity)?;
|
|
|
|
|
|
|
|
let this = self.db.clone();
|
|
|
|
|
|
|
|
actix_rt::task::spawn_blocking(move || {
|
|
|
|
let export = this.export();
|
|
|
|
export_db.import(export);
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(SledError::from)?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2023-08-15 01:00:00 +00:00
|
|
|
impl BaseRepo for SledRepo {}
|
2022-03-29 17:51:16 +00:00
|
|
|
|
2023-01-29 17:36:09 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
|
|
|
impl FullRepo for SledRepo {
|
2023-06-20 20:59:08 +00:00
|
|
|
async fn health_check(&self) -> Result<(), RepoError> {
|
2023-01-29 17:36:09 +00:00
|
|
|
let next = self.healthz_count.fetch_add(1, Ordering::Relaxed);
|
|
|
|
b!(self.healthz, {
|
|
|
|
healthz.insert("healthz", &next.to_be_bytes()[..])
|
|
|
|
});
|
2023-06-20 20:59:08 +00:00
|
|
|
self.healthz.flush_async().await.map_err(SledError::from)?;
|
2023-01-29 17:36:09 +00:00
|
|
|
b!(self.healthz, healthz.get("healthz"));
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
2022-04-02 21:44:03 +00:00
|
|
|
|
2023-07-23 15:23:37 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
|
|
|
impl ProxyRepo for SledRepo {
|
|
|
|
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> {
|
|
|
|
let proxy = self.proxy.clone();
|
|
|
|
let inverse_proxy = self.inverse_proxy.clone();
|
|
|
|
|
|
|
|
actix_web::web::block(move || {
|
|
|
|
proxy.insert(url.as_str().as_bytes(), alias.to_bytes())?;
|
|
|
|
inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?;
|
|
|
|
|
|
|
|
Ok(()) as Result<(), SledError>
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)??;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn related(&self, url: Url) -> Result<Option<Alias>, RepoError> {
|
|
|
|
let opt = b!(self.proxy, proxy.get(url.as_str().as_bytes()));
|
|
|
|
|
|
|
|
Ok(opt.and_then(|ivec| Alias::from_slice(&ivec)))
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn remove_relation(&self, alias: Alias) -> Result<(), RepoError> {
|
|
|
|
let proxy = self.proxy.clone();
|
|
|
|
let inverse_proxy = self.inverse_proxy.clone();
|
|
|
|
|
|
|
|
actix_web::web::block(move || {
|
|
|
|
if let Some(url) = inverse_proxy.remove(alias.to_bytes())? {
|
|
|
|
proxy.remove(url)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(()) as Result<(), SledError>
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)??;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-19 02:56:13 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
|
|
|
impl AliasAccessRepo for SledRepo {
|
2023-07-23 00:41:50 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn set_accessed_alias(
|
|
|
|
&self,
|
|
|
|
alias: Alias,
|
|
|
|
accessed: time::OffsetDateTime,
|
|
|
|
) -> Result<(), RepoError> {
|
|
|
|
let mut value_bytes = accessed.unix_timestamp_nanos().to_be_bytes().to_vec();
|
2023-08-16 02:18:25 +00:00
|
|
|
value_bytes.extend_from_slice(&alias.to_bytes());
|
|
|
|
let value_bytes = IVec::from(value_bytes);
|
2023-07-19 02:56:13 +00:00
|
|
|
|
|
|
|
let alias_access = self.alias_access.clone();
|
|
|
|
let inverse_alias_access = self.inverse_alias_access.clone();
|
|
|
|
|
2023-08-16 02:18:25 +00:00
|
|
|
let res = actix_rt::task::spawn_blocking(move || {
|
|
|
|
(&alias_access, &inverse_alias_access).transaction(
|
|
|
|
|(alias_access, inverse_alias_access)| {
|
|
|
|
if let Some(old) = alias_access.insert(alias.to_bytes(), &value_bytes)? {
|
|
|
|
inverse_alias_access.remove(old)?;
|
|
|
|
}
|
|
|
|
inverse_alias_access.insert(&value_bytes, alias.to_bytes())?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
2023-07-19 02:56:13 +00:00
|
|
|
})
|
|
|
|
.await
|
2023-08-16 02:18:25 +00:00
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
|
|
|
|
|
|
|
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
|
|
|
|
return Err(RepoError::from(SledError::from(e)));
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
2023-07-19 02:56:13 +00:00
|
|
|
}
|
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn alias_accessed_at(
|
|
|
|
&self,
|
|
|
|
alias: Alias,
|
|
|
|
) -> Result<Option<time::OffsetDateTime>, RepoError> {
|
|
|
|
let alias = alias.to_bytes();
|
|
|
|
|
|
|
|
let Some(timestamp) = b!(self.variant_access, variant_access.get(alias)) else {
|
|
|
|
return Ok(None);
|
|
|
|
};
|
|
|
|
|
|
|
|
let timestamp = timestamp[0..16].try_into().expect("valid timestamp bytes");
|
|
|
|
|
|
|
|
let timestamp =
|
|
|
|
time::OffsetDateTime::from_unix_timestamp_nanos(i128::from_be_bytes(timestamp))
|
|
|
|
.expect("valid timestamp");
|
|
|
|
|
|
|
|
Ok(Some(timestamp))
|
|
|
|
}
|
|
|
|
|
2023-07-23 00:41:50 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self))]
|
2023-07-19 02:56:13 +00:00
|
|
|
async fn older_aliases(
|
|
|
|
&self,
|
|
|
|
timestamp: time::OffsetDateTime,
|
2023-08-16 00:19:03 +00:00
|
|
|
) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError> {
|
2023-08-16 02:18:25 +00:00
|
|
|
let time_bytes = timestamp.unix_timestamp_nanos().to_be_bytes().to_vec();
|
2023-07-19 02:56:13 +00:00
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
let iterator = self
|
|
|
|
.inverse_alias_access
|
2023-08-16 02:18:25 +00:00
|
|
|
.range(..=time_bytes)
|
2023-08-16 00:19:03 +00:00
|
|
|
.filter_map(|res| {
|
|
|
|
res.map_err(SledError::from)
|
|
|
|
.map_err(RepoError::from)
|
|
|
|
.map(|(_, value)| Alias::from_slice(&value))
|
|
|
|
.transpose()
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(Box::pin(from_iterator(iterator, 8)))
|
2023-07-19 02:56:13 +00:00
|
|
|
}
|
|
|
|
|
2023-07-23 00:41:50 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn remove_alias_access(&self, alias: Alias) -> Result<(), RepoError> {
|
2023-07-19 02:56:13 +00:00
|
|
|
let alias_access = self.alias_access.clone();
|
|
|
|
let inverse_alias_access = self.inverse_alias_access.clone();
|
|
|
|
|
2023-08-16 02:18:25 +00:00
|
|
|
let res = actix_rt::task::spawn_blocking(move || {
|
|
|
|
(&alias_access, &inverse_alias_access).transaction(
|
|
|
|
|(alias_access, inverse_alias_access)| {
|
|
|
|
if let Some(old) = alias_access.remove(alias.to_bytes())? {
|
|
|
|
inverse_alias_access.remove(old)?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
2023-07-19 02:56:13 +00:00
|
|
|
})
|
|
|
|
.await
|
2023-08-16 02:18:25 +00:00
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
|
|
|
|
|
|
|
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
|
|
|
|
return Err(RepoError::from(SledError::from(e)));
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
2023-07-19 02:56:13 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[async_trait::async_trait(?Send)]
|
2023-07-22 22:57:52 +00:00
|
|
|
impl VariantAccessRepo for SledRepo {
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn set_accessed_variant(
|
|
|
|
&self,
|
|
|
|
hash: Hash,
|
|
|
|
variant: String,
|
|
|
|
accessed: time::OffsetDateTime,
|
|
|
|
) -> Result<(), RepoError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_bytes();
|
2023-08-16 02:18:25 +00:00
|
|
|
let key = IVec::from(variant_access_key(&hash, &variant));
|
2023-07-19 02:56:13 +00:00
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
let mut value_bytes = accessed.unix_timestamp_nanos().to_be_bytes().to_vec();
|
2023-08-16 02:18:25 +00:00
|
|
|
value_bytes.extend_from_slice(&key);
|
|
|
|
let value_bytes = IVec::from(value_bytes);
|
2023-07-19 02:56:13 +00:00
|
|
|
|
2023-07-22 22:57:52 +00:00
|
|
|
let variant_access = self.variant_access.clone();
|
|
|
|
let inverse_variant_access = self.inverse_variant_access.clone();
|
2023-07-19 02:56:13 +00:00
|
|
|
|
2023-08-16 02:18:25 +00:00
|
|
|
let res = actix_rt::task::spawn_blocking(move || {
|
|
|
|
(&variant_access, &inverse_variant_access).transaction(
|
|
|
|
|(variant_access, inverse_variant_access)| {
|
|
|
|
if let Some(old) = variant_access.insert(&key, &value_bytes)? {
|
|
|
|
inverse_variant_access.remove(old)?;
|
|
|
|
}
|
|
|
|
inverse_variant_access.insert(&value_bytes, &key)?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
2023-07-19 02:56:13 +00:00
|
|
|
})
|
|
|
|
.await
|
2023-08-16 02:18:25 +00:00
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
|
|
|
|
|
|
|
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
|
|
|
|
return Err(RepoError::from(SledError::from(e)));
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
2023-07-19 02:56:13 +00:00
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn variant_accessed_at(
|
|
|
|
&self,
|
|
|
|
hash: Hash,
|
|
|
|
variant: String,
|
|
|
|
) -> Result<Option<time::OffsetDateTime>, RepoError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_bytes();
|
2023-07-22 23:50:04 +00:00
|
|
|
let key = variant_access_key(&hash, &variant);
|
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
let Some(timestamp) = b!(self.variant_access, variant_access.get(key)) else {
|
|
|
|
return Ok(None);
|
|
|
|
};
|
|
|
|
|
|
|
|
let timestamp = timestamp[0..16].try_into().expect("valid timestamp bytes");
|
|
|
|
|
|
|
|
let timestamp =
|
|
|
|
time::OffsetDateTime::from_unix_timestamp_nanos(i128::from_be_bytes(timestamp))
|
|
|
|
.expect("valid timestamp");
|
2023-07-22 23:50:04 +00:00
|
|
|
|
2023-08-16 21:09:40 +00:00
|
|
|
Ok(Some(timestamp))
|
2023-07-22 23:50:04 +00:00
|
|
|
}
|
|
|
|
|
2023-07-23 00:41:50 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self))]
|
2023-07-22 22:57:52 +00:00
|
|
|
async fn older_variants(
|
2023-07-19 02:56:13 +00:00
|
|
|
&self,
|
|
|
|
timestamp: time::OffsetDateTime,
|
2023-08-16 00:19:03 +00:00
|
|
|
) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError> {
|
2023-08-16 02:18:25 +00:00
|
|
|
let time_bytes = timestamp.unix_timestamp_nanos().to_be_bytes().to_vec();
|
2023-07-19 02:56:13 +00:00
|
|
|
|
2023-08-16 02:18:25 +00:00
|
|
|
let iterator = self.inverse_variant_access.range(..=time_bytes).map(|res| {
|
|
|
|
let (_, bytes) = res.map_err(SledError::from)?;
|
2023-07-19 02:56:13 +00:00
|
|
|
|
2023-08-16 02:18:25 +00:00
|
|
|
parse_variant_access_key(bytes)
|
|
|
|
.map_err(SledError::from)
|
|
|
|
.map_err(RepoError::from)
|
|
|
|
});
|
2023-07-19 02:56:13 +00:00
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
Ok(Box::pin(from_iterator(iterator, 8)))
|
2023-07-19 02:56:13 +00:00
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn remove_variant_access(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_bytes();
|
2023-08-16 02:18:25 +00:00
|
|
|
let key = IVec::from(variant_access_key(&hash, &variant));
|
2023-07-19 02:56:13 +00:00
|
|
|
|
2023-07-22 22:57:52 +00:00
|
|
|
let variant_access = self.variant_access.clone();
|
|
|
|
let inverse_variant_access = self.inverse_variant_access.clone();
|
2023-07-19 02:56:13 +00:00
|
|
|
|
2023-08-16 02:18:25 +00:00
|
|
|
let res = actix_rt::task::spawn_blocking(move || {
|
|
|
|
(&variant_access, &inverse_variant_access).transaction(
|
|
|
|
|(variant_access, inverse_variant_access)| {
|
|
|
|
if let Some(old) = variant_access.remove(&key)? {
|
|
|
|
inverse_variant_access.remove(old)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
2023-07-19 02:56:13 +00:00
|
|
|
})
|
|
|
|
.await
|
2023-08-16 02:18:25 +00:00
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
|
|
|
|
|
|
|
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
|
|
|
|
return Err(RepoError::from(SledError::from(e)));
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
2023-07-19 02:56:13 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-02 22:41:00 +00:00
|
|
|
#[derive(serde::Deserialize, serde::Serialize)]
|
|
|
|
enum InnerUploadResult {
|
|
|
|
Success {
|
|
|
|
alias: Serde<Alias>,
|
|
|
|
token: Serde<DeleteToken>,
|
|
|
|
},
|
|
|
|
Failure {
|
|
|
|
message: String,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<UploadResult> for InnerUploadResult {
|
|
|
|
fn from(u: UploadResult) -> Self {
|
|
|
|
match u {
|
|
|
|
UploadResult::Success { alias, token } => InnerUploadResult::Success {
|
|
|
|
alias: Serde::new(alias),
|
|
|
|
token: Serde::new(token),
|
|
|
|
},
|
|
|
|
UploadResult::Failure { message } => InnerUploadResult::Failure { message },
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<InnerUploadResult> for UploadResult {
|
|
|
|
fn from(i: InnerUploadResult) -> Self {
|
|
|
|
match i {
|
|
|
|
InnerUploadResult::Success { alias, token } => UploadResult::Success {
|
|
|
|
alias: Serde::into_inner(alias),
|
|
|
|
token: Serde::into_inner(token),
|
|
|
|
},
|
|
|
|
InnerUploadResult::Failure { message } => UploadResult::Failure { message },
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-23 02:11:28 +00:00
|
|
|
struct PushMetricsGuard {
|
|
|
|
queue: &'static str,
|
|
|
|
armed: bool,
|
|
|
|
}
|
|
|
|
|
|
|
|
struct PopMetricsGuard {
|
|
|
|
queue: &'static str,
|
|
|
|
start: Instant,
|
|
|
|
armed: bool,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PushMetricsGuard {
|
|
|
|
fn guard(queue: &'static str) -> Self {
|
|
|
|
Self { queue, armed: true }
|
|
|
|
}
|
|
|
|
|
|
|
|
fn disarm(mut self) {
|
|
|
|
self.armed = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PopMetricsGuard {
|
|
|
|
fn guard(queue: &'static str) -> Self {
|
|
|
|
Self {
|
|
|
|
queue,
|
|
|
|
start: Instant::now(),
|
|
|
|
armed: true,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn disarm(mut self) {
|
|
|
|
self.armed = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for PushMetricsGuard {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
metrics::increment_counter!("pict-rs.queue.push", "completed" => (!self.armed).to_string(), "queue" => self.queue);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for PopMetricsGuard {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
metrics::histogram!("pict-rs.queue.pop.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string(), "queue" => self.queue);
|
|
|
|
metrics::increment_counter!("pict-rs.queue.pop", "completed" => (!self.armed).to_string(), "queue" => self.queue);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-02 21:44:03 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
|
|
|
impl UploadRepo for SledRepo {
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn create_upload(&self, upload_id: UploadId) -> Result<(), RepoError> {
|
2022-04-03 01:56:29 +00:00
|
|
|
b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1"));
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-09-25 23:46:50 +00:00
|
|
|
#[tracing::instrument(skip(self))]
|
2023-06-20 20:59:08 +00:00
|
|
|
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
|
2022-04-02 22:41:00 +00:00
|
|
|
let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes());
|
|
|
|
|
2022-04-03 01:56:29 +00:00
|
|
|
let bytes = upload_id.as_bytes().to_vec();
|
|
|
|
let opt = b!(self.uploads, uploads.get(bytes));
|
|
|
|
|
|
|
|
if let Some(bytes) = opt {
|
|
|
|
if bytes != b"1" {
|
2023-06-20 20:59:08 +00:00
|
|
|
let result: InnerUploadResult =
|
|
|
|
serde_json::from_slice(&bytes).map_err(SledError::from)?;
|
2022-04-02 22:41:00 +00:00
|
|
|
return Ok(result.into());
|
|
|
|
}
|
2022-04-03 01:56:29 +00:00
|
|
|
} else {
|
2023-06-20 20:59:08 +00:00
|
|
|
return Err(RepoError::AlreadyClaimed);
|
2022-04-03 01:56:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
while let Some(event) = (&mut subscriber).await {
|
|
|
|
match event {
|
|
|
|
sled::Event::Remove { .. } => {
|
2023-06-20 20:59:08 +00:00
|
|
|
return Err(RepoError::AlreadyClaimed);
|
2022-04-03 01:56:29 +00:00
|
|
|
}
|
|
|
|
sled::Event::Insert { value, .. } => {
|
|
|
|
if value != b"1" {
|
2023-06-20 20:59:08 +00:00
|
|
|
let result: InnerUploadResult =
|
|
|
|
serde_json::from_slice(&value).map_err(SledError::from)?;
|
2022-04-03 01:56:29 +00:00
|
|
|
return Ok(result.into());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-04-02 22:41:00 +00:00
|
|
|
}
|
|
|
|
|
2023-06-20 20:59:08 +00:00
|
|
|
Err(RepoError::Canceled)
|
2022-04-02 21:44:03 +00:00
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-06-20 20:59:08 +00:00
|
|
|
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> {
|
2022-04-02 22:41:00 +00:00
|
|
|
b!(self.uploads, uploads.remove(upload_id.as_bytes()));
|
|
|
|
Ok(())
|
2022-04-02 21:44:03 +00:00
|
|
|
}
|
|
|
|
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self, result))]
|
2023-06-20 20:59:08 +00:00
|
|
|
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> {
|
2022-04-02 22:41:00 +00:00
|
|
|
let result: InnerUploadResult = result.into();
|
2023-06-20 20:59:08 +00:00
|
|
|
let result = serde_json::to_vec(&result).map_err(SledError::from)?;
|
2022-04-02 22:41:00 +00:00
|
|
|
|
|
|
|
b!(self.uploads, uploads.insert(upload_id.as_bytes(), result));
|
|
|
|
|
|
|
|
Ok(())
|
2022-04-02 21:44:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
enum JobState {
|
|
|
|
Pending,
|
2023-08-15 02:17:57 +00:00
|
|
|
Running([u8; 24]),
|
2023-08-13 19:12:38 +00:00
|
|
|
}
|
2022-03-29 17:51:16 +00:00
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
impl JobState {
|
|
|
|
const fn pending() -> Self {
|
|
|
|
Self::Pending
|
|
|
|
}
|
2022-04-02 23:53:03 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
fn running(worker_id: Uuid) -> Self {
|
|
|
|
let first_eight = time::OffsetDateTime::now_utc()
|
|
|
|
.unix_timestamp()
|
|
|
|
.to_be_bytes();
|
|
|
|
|
|
|
|
let next_sixteen = worker_id.into_bytes();
|
|
|
|
|
|
|
|
let mut bytes = [0u8; 24];
|
|
|
|
|
|
|
|
bytes[0..8]
|
|
|
|
.iter_mut()
|
|
|
|
.zip(&first_eight)
|
|
|
|
.for_each(|(dest, src)| *dest = *src);
|
|
|
|
|
|
|
|
bytes[8..24]
|
|
|
|
.iter_mut()
|
|
|
|
.zip(&next_sixteen)
|
|
|
|
.for_each(|(dest, src)| *dest = *src);
|
|
|
|
|
|
|
|
Self::Running(bytes)
|
2023-08-13 19:12:38 +00:00
|
|
|
}
|
2022-04-02 23:53:03 +00:00
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
fn as_bytes(&self) -> &[u8] {
|
|
|
|
match self {
|
2023-08-14 00:47:20 +00:00
|
|
|
Self::Pending => b"pend",
|
2023-08-13 19:12:38 +00:00
|
|
|
Self::Running(ref bytes) => bytes,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-04-02 23:53:03 +00:00
|
|
|
|
2023-08-14 00:47:20 +00:00
|
|
|
fn job_key(queue: &'static str, job_id: JobId) -> Arc<[u8]> {
|
|
|
|
let mut key = queue.as_bytes().to_vec();
|
|
|
|
key.extend(job_id.as_bytes());
|
|
|
|
|
|
|
|
Arc::from(key)
|
|
|
|
}
|
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
|
|
|
impl QueueRepo for SledRepo {
|
2022-10-02 03:47:52 +00:00
|
|
|
#[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))]
|
2023-08-15 01:00:00 +00:00
|
|
|
async fn push(&self, queue_name: &'static str, job: Arc<[u8]>) -> Result<JobId, RepoError> {
|
2023-07-23 02:11:28 +00:00
|
|
|
let metrics_guard = PushMetricsGuard::guard(queue_name);
|
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
let id = JobId::gen();
|
2023-08-14 00:47:20 +00:00
|
|
|
let key = job_key(queue_name, id);
|
2023-08-13 19:12:38 +00:00
|
|
|
|
|
|
|
let queue = self.queue.clone();
|
|
|
|
let job_state = self.job_state.clone();
|
|
|
|
|
|
|
|
let res = actix_rt::task::spawn_blocking(move || {
|
|
|
|
(&queue, &job_state).transaction(|(queue, job_state)| {
|
|
|
|
let state = JobState::pending();
|
|
|
|
|
2023-08-15 01:00:00 +00:00
|
|
|
queue.insert(&key[..], &job[..])?;
|
2023-08-14 00:47:20 +00:00
|
|
|
job_state.insert(&key[..], state.as_bytes())?;
|
2023-08-13 19:12:38 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
2022-04-01 16:51:46 +00:00
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
|
|
|
|
return Err(RepoError::from(SledError::from(e)));
|
|
|
|
}
|
2022-04-01 16:51:46 +00:00
|
|
|
|
2022-04-02 23:53:03 +00:00
|
|
|
if let Some(notifier) = self.queue_notifier.read().unwrap().get(&queue_name) {
|
2022-04-01 16:51:46 +00:00
|
|
|
notifier.notify_one();
|
2023-07-23 02:11:28 +00:00
|
|
|
metrics_guard.disarm();
|
2023-08-14 00:52:27 +00:00
|
|
|
return Ok(id);
|
2022-04-01 16:51:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
self.queue_notifier
|
|
|
|
.write()
|
|
|
|
.unwrap()
|
2022-04-02 23:53:03 +00:00
|
|
|
.entry(queue_name)
|
2022-04-01 16:51:46 +00:00
|
|
|
.or_insert_with(|| Arc::new(Notify::new()))
|
|
|
|
.notify_one();
|
|
|
|
|
2023-07-23 02:11:28 +00:00
|
|
|
metrics_guard.disarm();
|
|
|
|
|
2023-08-14 00:52:27 +00:00
|
|
|
Ok(id)
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
#[tracing::instrument(skip(self, worker_id), fields(job_id))]
|
|
|
|
async fn pop(
|
|
|
|
&self,
|
|
|
|
queue_name: &'static str,
|
|
|
|
worker_id: Uuid,
|
|
|
|
) -> Result<(JobId, Arc<[u8]>), RepoError> {
|
2023-07-23 02:11:28 +00:00
|
|
|
let metrics_guard = PopMetricsGuard::guard(queue_name);
|
|
|
|
|
2023-08-14 00:47:20 +00:00
|
|
|
let now = time::OffsetDateTime::now_utc();
|
|
|
|
|
2022-03-29 17:51:16 +00:00
|
|
|
loop {
|
2023-08-13 19:12:38 +00:00
|
|
|
let queue = self.queue.clone();
|
|
|
|
let job_state = self.job_state.clone();
|
2022-03-29 17:51:16 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
let span = tracing::Span::current();
|
2023-08-13 19:12:38 +00:00
|
|
|
let opt = actix_rt::task::spawn_blocking(move || {
|
2023-08-15 02:17:57 +00:00
|
|
|
let _guard = span.enter();
|
2023-08-14 00:47:20 +00:00
|
|
|
// Job IDs are generated with Uuid version 7 - defining their first bits as a
|
|
|
|
// timestamp. Scanning a prefix should give us jobs in the order they were queued.
|
2023-08-13 19:12:38 +00:00
|
|
|
for res in job_state.scan_prefix(queue_name) {
|
|
|
|
let (key, value) = res?;
|
2022-03-29 17:51:16 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
if value.len() > 8 {
|
2023-08-14 00:47:20 +00:00
|
|
|
let unix_timestamp =
|
|
|
|
i64::from_be_bytes(value[0..8].try_into().expect("Verified length"));
|
|
|
|
|
|
|
|
let timestamp = time::OffsetDateTime::from_unix_timestamp(unix_timestamp)
|
|
|
|
.expect("Valid timestamp");
|
|
|
|
|
|
|
|
// heartbeats should update every 5 seconds, so 30 seconds without an
|
|
|
|
// update is 6 missed beats
|
|
|
|
if timestamp.saturating_add(time::Duration::seconds(30)) > now {
|
|
|
|
// job hasn't expired
|
|
|
|
continue;
|
|
|
|
}
|
2023-08-13 19:12:38 +00:00
|
|
|
}
|
2022-04-02 23:53:03 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
let state = JobState::running(worker_id);
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
match job_state.compare_and_swap(&key, Some(value), Some(state.as_bytes()))? {
|
|
|
|
Ok(()) => {
|
2023-08-13 19:12:38 +00:00
|
|
|
// acquired job
|
|
|
|
}
|
|
|
|
Err(_) => {
|
2023-08-15 02:17:57 +00:00
|
|
|
tracing::debug!("Contested");
|
2023-08-13 19:12:38 +00:00
|
|
|
// someone else acquired job
|
|
|
|
continue;
|
|
|
|
}
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
|
|
|
|
2023-08-14 00:47:20 +00:00
|
|
|
let id_bytes = &key[queue_name.len()..];
|
2023-08-13 19:12:38 +00:00
|
|
|
|
|
|
|
let id_bytes: [u8; 16] = id_bytes.try_into().expect("Key length");
|
|
|
|
|
|
|
|
let job_id = JobId::from_bytes(id_bytes);
|
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
tracing::Span::current().record("job_id", &format!("{job_id:?}"));
|
|
|
|
|
2023-08-15 01:00:00 +00:00
|
|
|
let opt = queue
|
|
|
|
.get(&key)?
|
|
|
|
.map(|job_bytes| (job_id, Arc::from(job_bytes.to_vec())));
|
2023-08-13 19:12:38 +00:00
|
|
|
|
2023-08-15 01:00:00 +00:00
|
|
|
return Ok(opt) as Result<Option<(JobId, Arc<[u8]>)>, SledError>;
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
Ok(None)
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)??;
|
2022-03-29 17:51:16 +00:00
|
|
|
|
2023-08-13 19:12:38 +00:00
|
|
|
if let Some(tup) = opt {
|
2023-07-23 02:11:28 +00:00
|
|
|
metrics_guard.disarm();
|
2023-08-13 19:12:38 +00:00
|
|
|
return Ok(tup);
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
|
|
|
|
2022-04-01 16:51:46 +00:00
|
|
|
let opt = self
|
|
|
|
.queue_notifier
|
|
|
|
.read()
|
|
|
|
.unwrap()
|
|
|
|
.get(&queue_name)
|
|
|
|
.map(Arc::clone);
|
|
|
|
|
|
|
|
let notify = if let Some(notify) = opt {
|
|
|
|
notify
|
|
|
|
} else {
|
|
|
|
let mut guard = self.queue_notifier.write().unwrap();
|
|
|
|
let entry = guard
|
|
|
|
.entry(queue_name)
|
|
|
|
.or_insert_with(|| Arc::new(Notify::new()));
|
|
|
|
Arc::clone(entry)
|
|
|
|
};
|
|
|
|
|
2022-03-29 17:51:16 +00:00
|
|
|
notify.notified().await
|
|
|
|
}
|
|
|
|
}
|
2023-08-13 19:12:38 +00:00
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
#[tracing::instrument(skip(self, worker_id))]
|
|
|
|
async fn heartbeat(
|
|
|
|
&self,
|
|
|
|
queue_name: &'static str,
|
|
|
|
worker_id: Uuid,
|
|
|
|
job_id: JobId,
|
|
|
|
) -> Result<(), RepoError> {
|
2023-08-14 00:47:20 +00:00
|
|
|
let key = job_key(queue_name, job_id);
|
|
|
|
|
|
|
|
let job_state = self.job_state.clone();
|
|
|
|
|
|
|
|
actix_rt::task::spawn_blocking(move || {
|
|
|
|
if let Some(state) = job_state.get(&key)? {
|
2023-08-15 02:17:57 +00:00
|
|
|
let new_state = JobState::running(worker_id);
|
2023-08-14 00:47:20 +00:00
|
|
|
|
|
|
|
match job_state.compare_and_swap(&key, Some(state), Some(new_state.as_bytes()))? {
|
2023-08-15 02:17:57 +00:00
|
|
|
Ok(()) => Ok(()),
|
2023-08-14 00:47:20 +00:00
|
|
|
Err(_) => Err(SledError::Conflict),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)??;
|
|
|
|
|
|
|
|
Ok(())
|
2023-08-13 19:12:38 +00:00
|
|
|
}
|
|
|
|
|
2023-08-15 02:17:57 +00:00
|
|
|
#[tracing::instrument(skip(self, _worker_id))]
|
|
|
|
async fn complete_job(
|
|
|
|
&self,
|
|
|
|
queue_name: &'static str,
|
|
|
|
_worker_id: Uuid,
|
|
|
|
job_id: JobId,
|
|
|
|
) -> Result<(), RepoError> {
|
2023-08-14 00:47:20 +00:00
|
|
|
let key = job_key(queue_name, job_id);
|
|
|
|
|
|
|
|
let queue = self.queue.clone();
|
|
|
|
let job_state = self.job_state.clone();
|
|
|
|
|
|
|
|
let res = actix_rt::task::spawn_blocking(move || {
|
|
|
|
(&queue, &job_state).transaction(|(queue, job_state)| {
|
|
|
|
queue.remove(&key[..])?;
|
|
|
|
job_state.remove(&key[..])?;
|
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
|
|
|
|
|
|
|
if let Err(TransactionError::Abort(e) | TransactionError::Storage(e)) = res {
|
|
|
|
return Err(RepoError::from(SledError::from(e)));
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
2023-08-13 19:12:38 +00:00
|
|
|
}
|
2022-03-29 17:51:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[async_trait::async_trait(?Send)]
|
|
|
|
impl SettingsRepo for SledRepo {
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(value))]
|
2023-08-15 01:00:00 +00:00
|
|
|
async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError> {
|
|
|
|
b!(self.settings, settings.insert(key, &value[..]));
|
2022-03-24 22:09:15 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-08-15 01:00:00 +00:00
|
|
|
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
|
2022-03-24 22:09:15 +00:00
|
|
|
let opt = b!(self.settings, settings.get(key));
|
|
|
|
|
2023-08-15 01:00:00 +00:00
|
|
|
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-06-20 20:59:08 +00:00
|
|
|
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
|
2022-03-24 22:09:15 +00:00
|
|
|
b!(self.settings, settings.remove(key));
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-22 22:57:52 +00:00
|
|
|
fn variant_access_key(hash: &[u8], variant: &str) -> Vec<u8> {
|
|
|
|
let variant = variant.as_bytes();
|
|
|
|
|
|
|
|
let hash_len: u64 = u64::try_from(hash.len()).expect("Length is reasonable");
|
|
|
|
|
|
|
|
let mut out = Vec::with_capacity(8 + hash.len() + variant.len());
|
|
|
|
|
|
|
|
let hash_length_bytes: [u8; 8] = hash_len.to_be_bytes();
|
|
|
|
out.extend(hash_length_bytes);
|
|
|
|
out.extend(hash);
|
|
|
|
out.extend(variant);
|
|
|
|
out
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
|
|
pub(crate) enum VariantKeyError {
|
|
|
|
#[error("Bytes too short to be VariantAccessKey")]
|
|
|
|
TooShort,
|
|
|
|
|
|
|
|
#[error("Prefix Length is longer than backing bytes")]
|
|
|
|
InvalidLength,
|
|
|
|
|
|
|
|
#[error("Invalid utf8 in Variant")]
|
|
|
|
Utf8,
|
2023-08-14 19:25:19 +00:00
|
|
|
|
|
|
|
#[error("Hash format is invalid")]
|
|
|
|
InvalidHash,
|
2023-07-22 22:57:52 +00:00
|
|
|
}
|
|
|
|
|
2023-08-14 19:25:19 +00:00
|
|
|
fn parse_variant_access_key(bytes: IVec) -> Result<(Hash, String), VariantKeyError> {
|
2023-07-22 22:57:52 +00:00
|
|
|
if bytes.len() < 8 {
|
|
|
|
return Err(VariantKeyError::TooShort);
|
|
|
|
}
|
|
|
|
|
|
|
|
let hash_len = u64::from_be_bytes(bytes[..8].try_into().expect("Verified length"));
|
|
|
|
let hash_len: usize = usize::try_from(hash_len).expect("Length is reasonable");
|
|
|
|
|
|
|
|
if (hash_len + 8) > bytes.len() {
|
|
|
|
return Err(VariantKeyError::InvalidLength);
|
|
|
|
}
|
|
|
|
|
|
|
|
let hash = bytes.subslice(8, hash_len);
|
|
|
|
|
2023-08-14 19:25:19 +00:00
|
|
|
let hash = Hash::from_ivec(hash).ok_or(VariantKeyError::InvalidHash)?;
|
|
|
|
|
2023-07-22 22:57:52 +00:00
|
|
|
let variant_len = bytes.len().saturating_sub(8).saturating_sub(hash_len);
|
|
|
|
|
|
|
|
if variant_len == 0 {
|
|
|
|
return Ok((hash, String::new()));
|
|
|
|
}
|
|
|
|
|
|
|
|
let variant_start = 8 + hash_len;
|
|
|
|
|
|
|
|
let variant = std::str::from_utf8(&bytes[variant_start..])
|
|
|
|
.map_err(|_| VariantKeyError::Utf8)?
|
|
|
|
.to_string();
|
|
|
|
|
|
|
|
Ok((hash, variant))
|
|
|
|
}
|
|
|
|
|
2022-03-26 21:49:23 +00:00
|
|
|
fn variant_key(hash: &[u8], variant: &str) -> Vec<u8> {
|
2022-03-25 23:47:50 +00:00
|
|
|
let mut bytes = hash.to_vec();
|
|
|
|
bytes.push(b'/');
|
|
|
|
bytes.extend_from_slice(variant.as_bytes());
|
2022-03-26 21:49:23 +00:00
|
|
|
bytes
|
2022-03-25 23:47:50 +00:00
|
|
|
}
|
|
|
|
|
2022-03-26 21:49:23 +00:00
|
|
|
fn variant_from_key(hash: &[u8], key: &[u8]) -> Option<String> {
|
|
|
|
let prefix_len = hash.len() + 1;
|
|
|
|
let variant_bytes = key.get(prefix_len..)?.to_vec();
|
|
|
|
String::from_utf8(variant_bytes).ok()
|
|
|
|
}
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2022-03-26 21:49:23 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
2023-08-16 21:09:40 +00:00
|
|
|
impl DetailsRepo for SledRepo {
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn relate_details(
|
2022-03-26 21:49:23 +00:00
|
|
|
&self,
|
2023-08-16 00:19:03 +00:00
|
|
|
identifier: &dyn Identifier,
|
2022-03-26 21:49:23 +00:00
|
|
|
details: &Details,
|
2023-06-20 20:59:08 +00:00
|
|
|
) -> Result<(), StoreError> {
|
2022-03-27 01:45:12 +00:00
|
|
|
let key = identifier.to_bytes()?;
|
2023-06-20 20:59:08 +00:00
|
|
|
let details = serde_json::to_vec(&details)
|
|
|
|
.map_err(SledError::from)
|
|
|
|
.map_err(RepoError::from)?;
|
2022-03-24 22:09:15 +00:00
|
|
|
|
|
|
|
b!(
|
|
|
|
self.identifier_details,
|
|
|
|
identifier_details.insert(key, details)
|
|
|
|
);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn details(&self, identifier: &dyn Identifier) -> Result<Option<Details>, StoreError> {
|
2022-03-27 01:45:12 +00:00
|
|
|
let key = identifier.to_bytes()?;
|
2022-03-24 22:09:15 +00:00
|
|
|
|
|
|
|
let opt = b!(self.identifier_details, identifier_details.get(key));
|
|
|
|
|
2023-02-06 02:50:59 +00:00
|
|
|
opt.map(|ivec| serde_json::from_slice(&ivec))
|
|
|
|
.transpose()
|
2023-06-20 20:59:08 +00:00
|
|
|
.map_err(SledError::from)
|
|
|
|
.map_err(RepoError::from)
|
|
|
|
.map_err(StoreError::from)
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn cleanup_details(&self, identifier: &dyn Identifier) -> Result<(), StoreError> {
|
2022-03-27 01:45:12 +00:00
|
|
|
let key = identifier.to_bytes()?;
|
2022-03-24 22:09:15 +00:00
|
|
|
|
|
|
|
b!(self.identifier_details, identifier_details.remove(key));
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-17 03:07:42 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
2023-08-16 16:47:36 +00:00
|
|
|
impl StoreMigrationRepo for SledRepo {
|
2023-07-17 03:07:42 +00:00
|
|
|
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
|
|
|
|
Ok(!self.migration_identifiers.is_empty())
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn mark_migrated(
|
2023-07-17 03:07:42 +00:00
|
|
|
&self,
|
2023-08-16 00:19:03 +00:00
|
|
|
old_identifier: &dyn Identifier,
|
|
|
|
new_identifier: &dyn Identifier,
|
2023-07-17 03:07:42 +00:00
|
|
|
) -> Result<(), StoreError> {
|
|
|
|
let key = new_identifier.to_bytes()?;
|
|
|
|
let value = old_identifier.to_bytes()?;
|
|
|
|
|
|
|
|
b!(
|
|
|
|
self.migration_identifiers,
|
|
|
|
migration_identifiers.insert(key, value)
|
|
|
|
);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn is_migrated(&self, identifier: &dyn Identifier) -> Result<bool, StoreError> {
|
2023-07-17 03:07:42 +00:00
|
|
|
let key = identifier.to_bytes()?;
|
|
|
|
|
|
|
|
Ok(b!(self.migration_identifiers, migration_identifiers.get(key)).is_some())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn clear(&self) -> Result<(), RepoError> {
|
|
|
|
b!(self.migration_identifiers, migration_identifiers.clear());
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-03-26 21:49:23 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
|
|
|
impl HashRepo for SledRepo {
|
2023-06-29 16:39:47 +00:00
|
|
|
async fn size(&self) -> Result<u64, RepoError> {
|
|
|
|
Ok(b!(
|
|
|
|
self.hashes,
|
|
|
|
Ok(u64::try_from(hashes.len()).expect("Length is reasonable"))
|
|
|
|
as Result<u64, SledError>
|
|
|
|
))
|
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn hashes(&self) -> LocalBoxStream<'static, Result<Hash, RepoError>> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let iter = self.hashes.iter().keys().filter_map(|res| {
|
|
|
|
res.map_err(SledError::from)
|
|
|
|
.map_err(RepoError::from)
|
|
|
|
.map(Hash::from_ivec)
|
|
|
|
.transpose()
|
|
|
|
});
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2022-03-29 21:18:00 +00:00
|
|
|
Box::pin(from_iterator(iter, 8))
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn create_hash(
|
2023-07-26 01:08:18 +00:00
|
|
|
&self,
|
2023-08-14 03:06:42 +00:00
|
|
|
hash: Hash,
|
2023-08-16 00:19:03 +00:00
|
|
|
identifier: &dyn Identifier,
|
2023-07-26 01:08:18 +00:00
|
|
|
) -> Result<Result<(), HashAlreadyExists>, StoreError> {
|
|
|
|
let identifier: sled::IVec = identifier.to_bytes()?.into();
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
let hashes = self.hashes.clone();
|
|
|
|
let hash_identifiers = self.hash_identifiers.clone();
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_ivec();
|
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
let res = actix_web::web::block(move || {
|
|
|
|
(&hashes, &hash_identifiers).transaction(|(hashes, hash_identifiers)| {
|
2023-08-14 03:06:42 +00:00
|
|
|
if hashes.get(hash.clone())?.is_some() {
|
2023-07-26 01:08:18 +00:00
|
|
|
return Ok(Err(HashAlreadyExists));
|
|
|
|
}
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
hashes.insert(hash.clone(), hash.clone())?;
|
|
|
|
hash_identifiers.insert(hash.clone(), &identifier)?;
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
Ok(Ok(()))
|
|
|
|
})
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
match res {
|
|
|
|
Ok(res) => Ok(res),
|
|
|
|
Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => {
|
|
|
|
Err(StoreError::from(RepoError::from(SledError::from(e))))
|
|
|
|
}
|
|
|
|
}
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn update_identifier(
|
2022-03-26 21:49:23 +00:00
|
|
|
&self,
|
2023-08-14 03:06:42 +00:00
|
|
|
hash: Hash,
|
2023-08-16 00:19:03 +00:00
|
|
|
identifier: &dyn Identifier,
|
2023-06-20 20:59:08 +00:00
|
|
|
) -> Result<(), StoreError> {
|
2023-07-26 01:08:18 +00:00
|
|
|
let identifier = identifier.to_bytes()?;
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_ivec();
|
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
b!(
|
|
|
|
self.hash_identifiers,
|
|
|
|
hash_identifiers.insert(hash, identifier)
|
|
|
|
);
|
2022-03-24 22:09:15 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_ivec();
|
|
|
|
|
2023-07-07 18:17:26 +00:00
|
|
|
let Some(ivec) = b!(self.hash_identifiers, hash_identifiers.get(hash)) else {
|
|
|
|
return Ok(None);
|
|
|
|
};
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
Ok(Some(Arc::from(ivec.to_vec())))
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn relate_variant_identifier(
|
2022-03-25 23:47:50 +00:00
|
|
|
&self,
|
2023-08-14 03:06:42 +00:00
|
|
|
hash: Hash,
|
2022-03-25 23:47:50 +00:00
|
|
|
variant: String,
|
2023-08-16 00:19:03 +00:00
|
|
|
identifier: &dyn Identifier,
|
2023-08-16 20:12:16 +00:00
|
|
|
) -> Result<Result<(), VariantAlreadyExists>, StoreError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_bytes();
|
|
|
|
|
2022-03-26 21:49:23 +00:00
|
|
|
let key = variant_key(&hash, &variant);
|
2022-03-27 01:45:12 +00:00
|
|
|
let value = identifier.to_bytes()?;
|
2022-03-25 23:47:50 +00:00
|
|
|
|
2023-08-16 20:12:16 +00:00
|
|
|
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
|
2022-03-25 23:47:50 +00:00
|
|
|
|
2023-08-16 20:12:16 +00:00
|
|
|
actix_rt::task::spawn_blocking(move || {
|
|
|
|
hash_variant_identifiers
|
|
|
|
.compare_and_swap(key, Option::<&[u8]>::None, Some(value))
|
|
|
|
.map(|res| res.map_err(|_| VariantAlreadyExists))
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)?
|
|
|
|
.map_err(SledError::from)
|
|
|
|
.map_err(RepoError::from)
|
|
|
|
.map_err(StoreError::from)
|
2022-03-25 23:47:50 +00:00
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn variant_identifier(
|
2022-03-25 23:47:50 +00:00
|
|
|
&self,
|
2023-08-14 03:06:42 +00:00
|
|
|
hash: Hash,
|
2022-03-25 23:47:50 +00:00
|
|
|
variant: String,
|
2023-08-16 00:19:03 +00:00
|
|
|
) -> Result<Option<Arc<[u8]>>, RepoError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_bytes();
|
|
|
|
|
2022-03-26 21:49:23 +00:00
|
|
|
let key = variant_key(&hash, &variant);
|
2022-03-25 23:47:50 +00:00
|
|
|
|
|
|
|
let opt = b!(
|
|
|
|
self.hash_variant_identifiers,
|
|
|
|
hash_variant_identifiers.get(key)
|
|
|
|
);
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
|
2022-03-25 23:47:50 +00:00
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "debug", skip(self))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<[u8]>)>, RepoError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_ivec();
|
|
|
|
|
2022-03-26 21:49:23 +00:00
|
|
|
let vec = b!(
|
|
|
|
self.hash_variant_identifiers,
|
|
|
|
Ok(hash_variant_identifiers
|
2023-08-14 03:06:42 +00:00
|
|
|
.scan_prefix(hash.clone())
|
2022-03-26 21:49:23 +00:00
|
|
|
.filter_map(|res| res.ok())
|
|
|
|
.filter_map(|(key, ivec)| {
|
2023-08-16 00:19:03 +00:00
|
|
|
let identifier = Arc::from(ivec.to_vec());
|
2023-01-05 00:58:05 +00:00
|
|
|
|
|
|
|
let variant = variant_from_key(&hash, &key);
|
|
|
|
if variant.is_none() {
|
|
|
|
tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key));
|
|
|
|
}
|
2022-03-26 21:49:23 +00:00
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
Some((variant?, identifier))
|
2022-03-26 21:49:23 +00:00
|
|
|
})
|
2023-06-20 20:59:08 +00:00
|
|
|
.collect::<Vec<_>>()) as Result<Vec<_>, SledError>
|
2022-03-26 21:49:23 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
Ok(vec)
|
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
|
|
|
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
|
|
|
|
let hash = hash.to_bytes();
|
|
|
|
|
2022-04-11 21:56:39 +00:00
|
|
|
let key = variant_key(&hash, &variant);
|
|
|
|
|
|
|
|
b!(
|
|
|
|
self.hash_variant_identifiers,
|
|
|
|
hash_variant_identifiers.remove(key)
|
|
|
|
);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn relate_motion_identifier(
|
2022-03-26 21:49:23 +00:00
|
|
|
&self,
|
2023-08-14 03:06:42 +00:00
|
|
|
hash: Hash,
|
2023-08-16 00:19:03 +00:00
|
|
|
identifier: &dyn Identifier,
|
2023-06-20 20:59:08 +00:00
|
|
|
) -> Result<(), StoreError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_ivec();
|
2022-03-27 01:45:12 +00:00
|
|
|
let bytes = identifier.to_bytes()?;
|
2022-03-25 23:47:50 +00:00
|
|
|
|
|
|
|
b!(
|
|
|
|
self.hash_motion_identifiers,
|
|
|
|
hash_motion_identifiers.insert(hash, bytes)
|
|
|
|
);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-08-16 00:19:03 +00:00
|
|
|
async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<[u8]>>, RepoError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_ivec();
|
|
|
|
|
2022-03-25 23:47:50 +00:00
|
|
|
let opt = b!(
|
|
|
|
self.hash_motion_identifiers,
|
|
|
|
hash_motion_identifiers.get(hash)
|
|
|
|
);
|
|
|
|
|
2023-08-16 00:19:03 +00:00
|
|
|
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
|
2022-03-25 23:47:50 +00:00
|
|
|
}
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
#[tracing::instrument(skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_ivec();
|
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
let hashes = self.hashes.clone();
|
|
|
|
let hash_identifiers = self.hash_identifiers.clone();
|
|
|
|
let hash_motion_identifiers = self.hash_motion_identifiers.clone();
|
|
|
|
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2022-03-25 23:47:50 +00:00
|
|
|
let hash2 = hash.clone();
|
|
|
|
let variant_keys = b!(self.hash_variant_identifiers, {
|
|
|
|
let v = hash_variant_identifiers
|
2023-07-26 01:08:18 +00:00
|
|
|
.scan_prefix(hash2)
|
2022-03-25 23:47:50 +00:00
|
|
|
.keys()
|
|
|
|
.filter_map(Result::ok)
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
2023-06-20 20:59:08 +00:00
|
|
|
Ok(v) as Result<Vec<_>, SledError>
|
2022-03-25 23:47:50 +00:00
|
|
|
});
|
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
let res = actix_web::web::block(move || {
|
|
|
|
(
|
|
|
|
&hashes,
|
|
|
|
&hash_identifiers,
|
|
|
|
&hash_motion_identifiers,
|
|
|
|
&hash_variant_identifiers,
|
|
|
|
)
|
|
|
|
.transaction(
|
|
|
|
|(
|
|
|
|
hashes,
|
|
|
|
hash_identifiers,
|
|
|
|
hash_motion_identifiers,
|
|
|
|
hash_variant_identifiers,
|
|
|
|
)| {
|
|
|
|
hashes.remove(&hash)?;
|
|
|
|
hash_identifiers.remove(&hash)?;
|
|
|
|
hash_motion_identifiers.remove(&hash)?;
|
|
|
|
|
|
|
|
for key in &variant_keys {
|
|
|
|
hash_variant_identifiers.remove(key)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
|
|
|
|
|
|
|
match res {
|
|
|
|
Ok(()) => Ok(()),
|
|
|
|
Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => {
|
|
|
|
Err(SledError::from(e).into())
|
|
|
|
}
|
|
|
|
}
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-26 01:30:22 +00:00
|
|
|
fn hash_alias_key(hash: &IVec, alias: &IVec) -> Vec<u8> {
|
|
|
|
let mut v = hash.to_vec();
|
2023-07-27 03:53:41 +00:00
|
|
|
v.extend_from_slice(alias);
|
2023-07-26 01:30:22 +00:00
|
|
|
v
|
|
|
|
}
|
|
|
|
|
2022-03-26 21:49:23 +00:00
|
|
|
#[async_trait::async_trait(?Send)]
|
2022-03-24 22:09:15 +00:00
|
|
|
impl AliasRepo for SledRepo {
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn create_alias(
|
2022-03-24 22:09:15 +00:00
|
|
|
&self,
|
2022-03-26 21:49:23 +00:00
|
|
|
alias: &Alias,
|
|
|
|
delete_token: &DeleteToken,
|
2023-08-14 03:06:42 +00:00
|
|
|
hash: Hash,
|
2023-07-26 01:08:18 +00:00
|
|
|
) -> Result<Result<(), AliasAlreadyExists>, RepoError> {
|
2023-08-14 03:06:42 +00:00
|
|
|
let hash = hash.to_ivec();
|
2023-07-26 01:08:18 +00:00
|
|
|
let alias: sled::IVec = alias.to_bytes().into();
|
|
|
|
let delete_token: sled::IVec = delete_token.to_bytes().into();
|
|
|
|
|
|
|
|
let aliases = self.aliases.clone();
|
|
|
|
let alias_hashes = self.alias_hashes.clone();
|
|
|
|
let hash_aliases = self.hash_aliases.clone();
|
|
|
|
let alias_delete_tokens = self.alias_delete_tokens.clone();
|
|
|
|
|
|
|
|
let res = actix_web::web::block(move || {
|
|
|
|
(&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction(
|
|
|
|
|(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| {
|
|
|
|
if aliases.get(&alias)?.is_some() {
|
|
|
|
return Ok(Err(AliasAlreadyExists));
|
|
|
|
}
|
2023-07-07 18:17:26 +00:00
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
aliases.insert(&alias, &alias)?;
|
|
|
|
alias_hashes.insert(&alias, &hash)?;
|
2023-07-26 01:30:22 +00:00
|
|
|
|
|
|
|
hash_aliases.insert(hash_alias_key(&hash, &alias), &alias)?;
|
2023-07-26 01:08:18 +00:00
|
|
|
alias_delete_tokens.insert(&alias, &delete_token)?;
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
Ok(Ok(()))
|
|
|
|
},
|
|
|
|
)
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
2023-07-07 18:17:26 +00:00
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
match res {
|
|
|
|
Ok(res) => Ok(res),
|
|
|
|
Err(TransactionError::Abort(e) | TransactionError::Storage(e)) => {
|
|
|
|
Err(SledError::from(e).into())
|
2023-07-07 18:17:26 +00:00
|
|
|
}
|
2023-07-26 01:08:18 +00:00
|
|
|
}
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-07-07 18:17:26 +00:00
|
|
|
async fn delete_token(&self, alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
|
2022-03-24 22:09:15 +00:00
|
|
|
let key = alias.to_bytes();
|
|
|
|
|
2023-07-07 18:17:26 +00:00
|
|
|
let Some(ivec) = b!(self.alias_delete_tokens, alias_delete_tokens.get(key)) else {
|
|
|
|
return Ok(None);
|
|
|
|
};
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-07-07 18:17:26 +00:00
|
|
|
let Some(token) = DeleteToken::from_slice(&ivec) else {
|
|
|
|
return Ok(None);
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(Some(token))
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2022-10-02 02:17:18 +00:00
|
|
|
#[tracing::instrument(level = "trace", skip(self))]
|
2023-08-14 03:06:42 +00:00
|
|
|
async fn hash(&self, alias: &Alias) -> Result<Option<Hash>, RepoError> {
|
2022-03-24 22:09:15 +00:00
|
|
|
let key = alias.to_bytes();
|
|
|
|
|
|
|
|
let opt = b!(self.alias_hashes, alias_hashes.get(key));
|
|
|
|
|
2023-08-14 03:06:42 +00:00
|
|
|
Ok(opt.and_then(Hash::from_ivec))
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
#[tracing::instrument(skip_all)]
|
2023-08-14 03:06:42 +00:00
|
|
|
async fn for_hash(&self, hash: Hash) -> Result<Vec<Alias>, RepoError> {
|
|
|
|
let hash = hash.to_ivec();
|
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
let v = b!(self.hash_aliases, {
|
|
|
|
Ok(hash_aliases
|
|
|
|
.scan_prefix(hash)
|
|
|
|
.values()
|
|
|
|
.filter_map(Result::ok)
|
|
|
|
.filter_map(|ivec| Alias::from_slice(&ivec))
|
|
|
|
.collect::<Vec<_>>()) as Result<_, sled::Error>
|
|
|
|
});
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
Ok(v)
|
|
|
|
}
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
#[tracing::instrument(skip(self))]
|
2023-08-16 21:09:40 +00:00
|
|
|
async fn cleanup_alias(&self, alias: &Alias) -> Result<(), RepoError> {
|
2023-07-26 01:30:22 +00:00
|
|
|
let alias: IVec = alias.to_bytes().into();
|
2023-07-26 01:08:18 +00:00
|
|
|
|
|
|
|
let aliases = self.aliases.clone();
|
|
|
|
let alias_hashes = self.alias_hashes.clone();
|
|
|
|
let hash_aliases = self.hash_aliases.clone();
|
|
|
|
let alias_delete_tokens = self.alias_delete_tokens.clone();
|
|
|
|
|
|
|
|
let res = actix_web::web::block(move || {
|
|
|
|
(&aliases, &alias_hashes, &hash_aliases, &alias_delete_tokens).transaction(
|
|
|
|
|(aliases, alias_hashes, hash_aliases, alias_delete_tokens)| {
|
|
|
|
aliases.remove(&alias)?;
|
|
|
|
if let Some(hash) = alias_hashes.remove(&alias)? {
|
2023-07-26 01:30:22 +00:00
|
|
|
hash_aliases.remove(hash_alias_key(&hash, &alias))?;
|
2023-07-26 01:08:18 +00:00
|
|
|
}
|
|
|
|
alias_delete_tokens.remove(&alias)?;
|
|
|
|
Ok(())
|
|
|
|
},
|
|
|
|
)
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.map_err(|_| RepoError::Canceled)?;
|
2022-03-24 22:09:15 +00:00
|
|
|
|
2023-07-26 01:08:18 +00:00
|
|
|
match res {
|
|
|
|
Ok(()) => Ok(()),
|
|
|
|
Err(TransactionError::Abort(e)) | Err(TransactionError::Storage(e)) => {
|
|
|
|
Err(SledError::from(e).into())
|
|
|
|
}
|
|
|
|
}
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl std::fmt::Debug for SledRepo {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
f.debug_struct("SledRepo").finish()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-03-27 01:45:12 +00:00
|
|
|
impl From<actix_rt::task::JoinError> for SledError {
|
2022-03-24 22:09:15 +00:00
|
|
|
fn from(_: actix_rt::task::JoinError) -> Self {
|
2022-03-27 01:45:12 +00:00
|
|
|
SledError::Panic
|
2022-03-24 22:09:15 +00:00
|
|
|
}
|
|
|
|
}
|
2023-07-22 22:57:52 +00:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
#[test]
|
|
|
|
fn round_trip() {
|
2023-08-14 19:25:19 +00:00
|
|
|
let hash = crate::repo::Hash::test_value();
|
2023-07-22 22:57:52 +00:00
|
|
|
let variant = String::from("some string value");
|
|
|
|
|
2023-08-14 19:25:19 +00:00
|
|
|
let key = super::variant_access_key(&hash.to_bytes(), &variant);
|
2023-07-22 22:57:52 +00:00
|
|
|
|
|
|
|
let (out_hash, out_variant) =
|
|
|
|
super::parse_variant_access_key(sled::IVec::from(key)).expect("Parsed bytes");
|
|
|
|
|
|
|
|
assert_eq!(out_hash, hash);
|
|
|
|
assert_eq!(out_variant, variant);
|
|
|
|
}
|
|
|
|
}
|