Cache on Alias rather than Hash

Make sled repo blocking tasks share parent span
This commit is contained in:
Aode (Lion) 2022-04-06 12:13:46 -05:00
parent 865f4ee09c
commit 05e2cf5e08
3 changed files with 113 additions and 107 deletions

View file

@ -86,19 +86,13 @@ pub(crate) trait FullRepo:
}
}
#[tracing::instrument]
async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> {
let hash = self.hash(alias).await?;
CachedRepo::create(self, hash).await
}
#[tracing::instrument]
async fn check_cached(&self, alias: &Alias) -> Result<(), Error> {
let hash = self.hash(alias).await?;
let hashes = CachedRepo::update(self, hash).await?;
let aliases = CachedRepo::update(self, alias).await?;
for hash in hashes {
crate::queue::cleanup_hash(self, hash).await?;
for alias in aliases {
let token = self.delete_token(&alias).await?;
crate::queue::cleanup_alias(self, alias, token).await?;
}
Ok(())
@ -111,9 +105,9 @@ pub(crate) trait BaseRepo {
#[async_trait::async_trait(?Send)]
pub(crate) trait CachedRepo: BaseRepo {
async fn create(&self, hash: Self::Bytes) -> Result<(), Error>;
async fn mark_cached(&self, alias: &Alias) -> Result<(), Error>;
async fn update(&self, hash: Self::Bytes) -> Result<Vec<Self::Bytes>, Error>;
async fn update(&self, alias: &Alias) -> Result<Vec<Alias>, Error>;
}
#[async_trait::async_trait(?Send)]

View file

@ -11,21 +11,25 @@ use crate::{
use futures_util::Stream;
use sled::{CompareAndSwapError, Db, IVec, Tree};
use std::{
collections::{HashMap, HashSet},
collections::HashMap,
pin::Pin,
sync::{Arc, RwLock},
};
use tokio::sync::Notify;
mod bucket;
mod datetime;
use bucket::Bucket;
use datetime::DateTime;
macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{
let $ident = $self.$ident.clone();
actix_rt::task::spawn_blocking(move || $expr)
let span = tracing::Span::current();
actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr))
.await
.map_err(SledError::from)??
}};
@ -132,129 +136,99 @@ impl From<InnerUploadResult> for UploadResult {
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Bucket {
// each Vec<u8> represents a unique image hash
inner: HashSet<Vec<u8>>,
fn insert_cache_inverse(
cache_inverse: &Tree,
now_bytes: &[u8],
alias_bytes: &[u8],
) -> Result<(), Error> {
let mut old = cache_inverse.get(now_bytes)?;
loop {
// unsure of whether to bail on deserialize error or fail with empty bucket
let mut bucket = old
.as_ref()
.and_then(|old| serde_cbor::from_slice::<Bucket>(old).ok())
.unwrap_or_else(Bucket::empty);
bucket.insert(alias_bytes.to_vec());
tracing::info!("Inserting new {:?}", bucket);
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
let new = Some(bucket_bytes);
let res = cache_inverse.compare_and_swap(now_bytes, old, new)?;
if let Err(CompareAndSwapError { current, .. }) = res {
old = current;
} else {
break;
}
}
Ok(())
}
#[async_trait::async_trait(?Send)]
impl CachedRepo for SledRepo {
#[tracing::instrument(skip(hash))]
async fn create(&self, hash: Self::Bytes) -> Result<(), Error> {
#[tracing::instrument]
async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> {
let now = DateTime::now();
let bytes = serde_json::to_vec(&now)?;
let now_bytes = serde_json::to_vec(&now)?;
let alias_bytes = alias.to_bytes();
let cache_inverse = self.cache_inverse.clone();
b!(self.cache, {
cache.insert(hash.clone(), bytes.clone())?;
cache.insert(&alias_bytes, now_bytes.clone())?;
let mut old = cache_inverse.get(bytes.clone())?;
loop {
let mut bucket = if let Some(old) = old.as_ref() {
// unsure of whether to bail on deserialize error or fail with empty bucket
if let Ok(bucket) = serde_cbor::from_slice::<Bucket>(old) {
bucket
} else {
Bucket {
inner: HashSet::new(),
}
}
} else {
Bucket {
inner: HashSet::new(),
}
};
bucket.inner.insert(hash.as_ref().to_vec());
tracing::info!("Inserting new {:?}", bucket);
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
let new = Some(bucket_bytes);
let res = cache_inverse.compare_and_swap(bytes.clone(), old, new)?;
if let Err(CompareAndSwapError { current, .. }) = res {
old = current;
} else {
break;
}
}
Ok(()) as Result<(), Error>
insert_cache_inverse(&cache_inverse, &now_bytes, &alias_bytes)
});
Ok(())
}
#[tracing::instrument(skip(hash))]
async fn update(&self, hash: Self::Bytes) -> Result<Vec<Self::Bytes>, Error> {
#[tracing::instrument]
async fn update(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
let now = DateTime::now();
let now_bytes = serde_json::to_vec(&now)?;
let to_clean = now.min_cache_date();
let to_clean_bytes = serde_json::to_vec(&to_clean)?;
let alias_bytes = alias.to_bytes();
let cache_inverse = self.cache_inverse.clone();
let hashes = b!(self.cache, {
let aliases = b!(self.cache, {
let previous_datetime_opt = cache
.fetch_and_update(hash.clone(), |previous_datetime_opt| {
.fetch_and_update(&alias_bytes, |previous_datetime_opt| {
previous_datetime_opt.map(|_| now_bytes.clone())
})?;
if let Some(previous_datetime_bytes) = previous_datetime_opt {
// Insert cached media into new date bucket
let mut old = cache_inverse.get(now_bytes.clone())?;
loop {
let mut bucket = if let Some(bucket_bytes) = old.as_ref() {
if let Ok(bucket) = serde_cbor::from_slice::<Bucket>(bucket_bytes) {
bucket
} else {
Bucket {
inner: HashSet::new(),
}
}
} else {
Bucket {
inner: HashSet::new(),
}
};
bucket.inner.insert(hash.as_ref().to_vec());
tracing::info!("Inserting new {:?}", bucket);
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
let new = Some(bucket_bytes);
if let Err(CompareAndSwapError { current, .. }) =
cache_inverse.compare_and_swap(now_bytes.clone(), old, new)?
{
old = current;
} else {
break;
}
}
insert_cache_inverse(&cache_inverse, &now_bytes, &alias_bytes)?;
// Remove cached media from old date bucket
let mut old = cache_inverse.get(previous_datetime_bytes.clone())?;
let mut old = cache_inverse.get(&previous_datetime_bytes)?;
loop {
let new = if let Some(bucket_bytes) = old.as_ref() {
if let Ok(mut bucket) = serde_cbor::from_slice::<Bucket>(bucket_bytes) {
bucket.inner.remove(hash.as_ref());
if bucket.inner.is_empty() {
let new = old
.as_ref()
.and_then(|bucket_bytes| {
let mut bucket = serde_cbor::from_slice::<Bucket>(bucket_bytes).ok()?;
bucket.remove(&alias_bytes);
if bucket.is_empty() {
tracing::info!("Removed old {:?}", bucket);
None
} else {
tracing::info!("Inserting old {:?}", bucket);
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
Some(bucket_bytes)
Some(serde_cbor::to_vec(&bucket))
}
} else {
None
}
} else {
None
};
})
.transpose()?;
if let Err(CompareAndSwapError { current, .. }) =
cache_inverse.compare_and_swap(previous_datetime_bytes.clone(), old, new)?
cache_inverse.compare_and_swap(&previous_datetime_bytes, old, new)?
{
old = current;
} else {
@ -263,7 +237,7 @@ impl CachedRepo for SledRepo {
}
}
let mut hashes: Vec<Self::Bytes> = Vec::new();
let mut aliases: Vec<Alias> = Vec::new();
for (date_bytes, bucket_bytes) in
cache_inverse.range(..to_clean_bytes).filter_map(Result::ok)
@ -275,10 +249,12 @@ impl CachedRepo for SledRepo {
}
if let Ok(bucket) = serde_cbor::from_slice::<Bucket>(&bucket_bytes) {
tracing::info!("Read for deletion: {:?}", bucket);
for hash in bucket.inner {
for alias_bytes in bucket {
// Best effort cleanup
let _ = cache.remove(&hash);
hashes.push(hash.into());
let _ = cache.remove(&alias_bytes);
if let Some(alias) = Alias::from_slice(&alias_bytes) {
aliases.push(alias);
}
}
} else {
tracing::warn!("Invalid bucket");
@ -296,10 +272,10 @@ impl CachedRepo for SledRepo {
}
}
Ok(hashes) as Result<_, Error>
Ok(aliases) as Result<_, Error>
});
Ok(hashes)
Ok(aliases)
}
}

36
src/repo/sled/bucket.rs Normal file
View file

@ -0,0 +1,36 @@
use std::collections::HashSet;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub(super) struct Bucket {
// each Vec<u8> represents a unique image hash
inner: HashSet<Vec<u8>>,
}
impl Bucket {
pub(super) fn empty() -> Self {
Self {
inner: HashSet::new(),
}
}
pub(super) fn insert(&mut self, alias_bytes: Vec<u8>) {
self.inner.insert(alias_bytes);
}
pub(super) fn remove(&mut self, alias_bytes: &[u8]) {
self.inner.remove(alias_bytes);
}
pub(super) fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl IntoIterator for Bucket {
type Item = <HashSet<Vec<u8>> as IntoIterator>::Item;
type IntoIter = <HashSet<Vec<u8>> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}