Remove cache functionality

This commit is contained in:
asonix 2023-02-25 11:34:48 -06:00
parent 123488f617
commit 281ac43dff
9 changed files with 16 additions and 344 deletions

View file

@ -91,8 +91,6 @@ Options:
Which media filters should be enabled on the `process` endpoint
--media-format <MEDIA_FORMAT>
Enforce uploaded media is transcoded to the provided format [possible values: jpeg, webp, png]
--media-cache-duration <MEDIA_CACHE_DURATION>
How long, in hours, to keep media ingested through the "cached" endpoint
-h, --help
Print help information (use `--help` for more detail)
```
@ -230,15 +228,11 @@ pict-rs offers the following endpoints:
"msg": "ok"
}
```
- `GET /image/download?url={url}&backgrounded=(true|false)&ephemeral=(true|false)` Download an image
- `GET /image/download?url={url}&backgrounded=(true|false)` Download an image
from a remote server, returning the same JSON payload as the `POST /image` endpoint by default.
if `backgrounded` is set to `true`, then the ingest processing will be queued for later and the
response json will be the same as the `POST /image/backgrounded` endpoint.
if `ephemeral` is set to true, the downloaded image will be marked as a "cached" image, and
automatically removed from pict-rs N hours after its last access. The duration is configurable
with the `--media-cache-duration` run flag, or the `[media] cache_duration` toml option.
- `GET /image/backgrounded/claim?upload_id={uuid}` Wait for a backgrounded upload to complete, claiming it's result
Possible results:
- 200 Ok (validation and ingest complete):

View file

@ -48,7 +48,6 @@ pub(crate) async fn ingest<R, S>(
stream: impl Stream<Item = Result<Bytes, Error>> + Unpin + 'static,
declared_alias: Option<Alias>,
should_validate: bool,
is_cached: bool,
) -> Result<Session<R, S>, Error>
where
R: FullRepo + 'static,
@ -98,9 +97,9 @@ where
save_upload(repo, store, &hash, &identifier).await?;
if let Some(alias) = declared_alias {
session.add_existing_alias(&hash, alias, is_cached).await?
session.add_existing_alias(&hash, alias).await?
} else {
session.create_alias(&hash, input_type, is_cached).await?;
session.create_alias(&hash, input_type).await?;
}
Ok(session)
@ -168,7 +167,6 @@ where
&mut self,
hash: &[u8],
alias: Alias,
is_cached: bool,
) -> Result<(), Error> {
AliasRepo::create(&self.repo, &alias)
.await?
@ -179,10 +177,6 @@ where
self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
if is_cached {
self.repo.mark_cached(&alias).await?;
}
Ok(())
}
@ -191,7 +185,6 @@ where
&mut self,
hash: &[u8],
input_type: ValidInputType,
is_cached: bool,
) -> Result<(), Error> {
loop {
let alias = Alias::generate(input_type.as_ext().to_string());
@ -202,10 +195,6 @@ where
self.repo.relate_hash(&alias, hash.to_vec().into()).await?;
self.repo.relate_alias(hash.to_vec().into(), &alias).await?;
if is_cached {
self.repo.mark_cached(&alias).await?;
}
return Ok(());
}

View file

@ -156,10 +156,8 @@ impl<R: FullRepo, S: Store + 'static> FormData for Upload<R, S> {
let stream = stream.map_err(Error::from);
Box::pin(
async move {
ingest::ingest(&**repo, &**store, stream, None, true, false).await
}
.instrument(span),
async move { ingest::ingest(&**repo, &**store, stream, None, true).await }
.instrument(span),
)
})),
)
@ -211,7 +209,6 @@ impl<R: FullRepo, S: Store + 'static> FormData for Import<R, S> {
stream,
Some(Alias::from_existing(&filename)),
!CONFIG.media.skip_validate_imports,
false,
)
.await
}
@ -367,7 +364,7 @@ async fn upload_backgrounded<R: FullRepo, S: Store>(
.expect("Identifier exists")
.to_bytes()?;
queue::queue_ingest(&repo, identifier, upload_id, None, true, false).await?;
queue::queue_ingest(&repo, identifier, upload_id, None, true).await?;
files.push(serde_json::json!({
"upload_id": upload_id.to_string(),
@ -433,9 +430,6 @@ struct UrlQuery {
#[serde(default)]
backgrounded: bool,
#[serde(default)]
ephemeral: bool,
}
/// download an image from a URL
@ -457,9 +451,9 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
.limit((CONFIG.media.max_file_size * MEGABYTES) as u64);
if query.backgrounded {
do_download_backgrounded(stream, repo, store, query.ephemeral).await
do_download_backgrounded(stream, repo, store).await
} else {
do_download_inline(stream, repo, store, query.ephemeral).await
do_download_inline(stream, repo, store).await
}
}
@ -468,9 +462,8 @@ async fn do_download_inline<R: FullRepo + 'static, S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>,
store: web::Data<S>,
is_cached: bool,
) -> Result<HttpResponse, Error> {
let mut session = ingest::ingest(&repo, &store, stream, None, true, is_cached).await?;
let mut session = ingest::ingest(&repo, &store, stream, None, true).await?;
let alias = session.alias().expect("alias should exist").to_owned();
let delete_token = session.delete_token().await?;
@ -494,7 +487,6 @@ async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin + 'static,
repo: web::Data<R>,
store: web::Data<S>,
is_cached: bool,
) -> Result<HttpResponse, Error> {
let backgrounded = Backgrounded::proxy((**repo).clone(), (**store).clone(), stream).await?;
@ -504,7 +496,7 @@ async fn do_download_backgrounded<R: FullRepo + 'static, S: Store + 'static>(
.expect("Identifier exists")
.to_bytes()?;
queue::queue_ingest(&repo, identifier, upload_id, None, true, is_cached).await?;
queue::queue_ingest(&repo, identifier, upload_id, None, true).await?;
backgrounded.disarm();
@ -605,8 +597,6 @@ async fn process<R: FullRepo, S: Store + 'static>(
) -> Result<HttpResponse, Error> {
let (format, alias, thumbnail_path, thumbnail_args) = prepare_process(query, ext.as_str())?;
repo.check_cached(&alias).await?;
let path_string = thumbnail_path.to_string_lossy().to_string();
let hash = repo.hash(&alias).await?;
@ -694,8 +684,6 @@ async fn process_head<R: FullRepo, S: Store + 'static>(
) -> Result<HttpResponse, Error> {
let (format, alias, thumbnail_path, _) = prepare_process(query, ext.as_str())?;
repo.check_cached(&alias).await?;
let path_string = thumbnail_path.to_string_lossy().to_string();
let hash = repo.hash(&alias).await?;
let identifier_opt = repo
@ -776,8 +764,6 @@ async fn serve<R: FullRepo, S: Store + 'static>(
) -> Result<HttpResponse, Error> {
let alias = alias.into_inner();
repo.check_cached(&alias).await?;
let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?;
let details = ensure_details(&repo, &store, &alias).await?;
@ -794,8 +780,6 @@ async fn serve_head<R: FullRepo, S: Store + 'static>(
) -> Result<HttpResponse, Error> {
let alias = alias.into_inner();
repo.check_cached(&alias).await?;
let identifier = repo.identifier_from_alias::<S::Identifier>(&alias).await?;
let details = ensure_details(&repo, &store, &alias).await?;

View file

@ -68,7 +68,6 @@ enum Process {
upload_id: Serde<UploadId>,
declared_alias: Option<Serde<Alias>>,
should_validate: bool,
is_cached: bool,
},
Generate {
target_format: ImageFormat,
@ -130,14 +129,12 @@ pub(crate) async fn queue_ingest<R: QueueRepo>(
upload_id: UploadId,
declared_alias: Option<Alias>,
should_validate: bool,
is_cached: bool,
) -> Result<(), Error> {
let job = serde_json::to_vec(&Process::Ingest {
identifier: Base64Bytes(identifier),
declared_alias: declared_alias.map(Serde::new),
upload_id: Serde::new(upload_id),
should_validate,
is_cached,
})?;
repo.push(PROCESS_QUEUE, job.into()).await?;
Ok(())

View file

@ -27,7 +27,6 @@ where
upload_id,
declared_alias,
should_validate,
is_cached,
} => {
process_ingest(
repo,
@ -36,7 +35,6 @@ where
Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner),
should_validate,
is_cached,
)
.await?
}
@ -74,7 +72,6 @@ async fn process_ingest<R, S>(
upload_id: UploadId,
declared_alias: Option<Alias>,
should_validate: bool,
is_cached: bool,
) -> Result<(), Error>
where
R: FullRepo + 'static,
@ -88,15 +85,8 @@ where
.await?
.map_err(Error::from);
let session = crate::ingest::ingest(
repo,
store,
stream,
declared_alias,
should_validate,
is_cached,
)
.await?;
let session =
crate::ingest::ingest(repo, store, stream, declared_alias, should_validate).await?;
let token = session.delete_token().await?;

View file

@ -49,8 +49,7 @@ pub(crate) enum UploadResult {
#[async_trait::async_trait(?Send)]
pub(crate) trait FullRepo:
CachedRepo
+ UploadRepo
UploadRepo
+ SettingsRepo
+ IdentifierRepo
+ AliasRepo
@ -92,18 +91,6 @@ pub(crate) trait FullRepo:
None => Ok(None),
}
}
#[tracing::instrument(skip(self))]
async fn check_cached(&self, alias: &Alias) -> Result<(), Error> {
let aliases = CachedRepo::update(self, alias).await?;
for alias in aliases {
let token = self.delete_token(&alias).await?;
crate::queue::cleanup_alias(self, alias, token).await?;
}
Ok(())
}
}
#[async_trait::async_trait(?Send)]
@ -127,27 +114,6 @@ where
type Bytes = T::Bytes;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait CachedRepo: BaseRepo {
async fn mark_cached(&self, alias: &Alias) -> Result<(), Error>;
async fn update(&self, alias: &Alias) -> Result<Vec<Alias>, Error>;
}
#[async_trait::async_trait(?Send)]
impl<T> CachedRepo for actix_web::web::Data<T>
where
T: CachedRepo,
{
async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> {
T::mark_cached(self, alias).await
}
async fn update(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
T::update(self, alias).await
}
}
#[async_trait::async_trait(?Send)]
pub(crate) trait UploadRepo: BaseRepo {
async fn create(&self, upload_id: UploadId) -> Result<(), Error>;

View file

@ -1,15 +1,14 @@
use crate::{
error::{Error, UploadError},
repo::{
Alias, AliasRepo, AlreadyExists, BaseRepo, CachedRepo, DeleteToken, Details, FullRepo,
HashRepo, Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo,
UploadResult,
Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo,
Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult,
},
serde_str::Serde,
stream::from_iterator,
};
use futures_util::Stream;
use sled::{CompareAndSwapError, Db, IVec, Tree};
use sled::{Db, IVec, Tree};
use std::{
collections::HashMap,
pin::Pin,
@ -20,12 +19,6 @@ use std::{
};
use tokio::sync::Notify;
mod bucket;
mod datetime;
use bucket::Bucket;
use datetime::DateTime;
macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{
let $ident = $self.$ident.clone();
@ -71,8 +64,6 @@ pub(crate) struct SledRepo {
in_progress_queue: Tree,
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
uploads: Tree,
cache: Tree,
cache_inverse: Tree,
db: Db,
}
@ -95,8 +86,6 @@ impl SledRepo {
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
queue_notifier: Arc::new(RwLock::new(HashMap::new())),
uploads: db.open_tree("pict-rs-uploads-tree")?,
cache: db.open_tree("pict-rs-cache-tree")?,
cache_inverse: db.open_tree("pict-rs-cache-inverse-tree")?,
db,
})
}
@ -154,149 +143,6 @@ impl From<InnerUploadResult> for UploadResult {
}
}
fn insert_cache_inverse(
cache_inverse: &Tree,
now_bytes: &[u8],
alias_bytes: &[u8],
) -> Result<(), Error> {
let mut old = cache_inverse.get(now_bytes)?;
loop {
// unsure of whether to bail on deserialize error or fail with empty bucket
let mut bucket = old
.as_ref()
.and_then(|old| serde_cbor::from_slice::<Bucket>(old).ok())
.unwrap_or_else(Bucket::empty);
bucket.insert(alias_bytes.to_vec());
tracing::trace!("Inserting new {:?}", bucket);
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
let new = Some(bucket_bytes);
let res = cache_inverse.compare_and_swap(now_bytes, old, new)?;
if let Err(CompareAndSwapError { current, .. }) = res {
old = current;
} else {
break;
}
}
Ok(())
}
#[async_trait::async_trait(?Send)]
impl CachedRepo for SledRepo {
#[tracing::instrument(skip(self))]
async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> {
let now = DateTime::now();
let now_bytes = serde_json::to_vec(&now)?;
let alias_bytes = alias.to_bytes();
let cache_inverse = self.cache_inverse.clone();
b!(self.cache, {
cache.insert(&alias_bytes, now_bytes.clone())?;
insert_cache_inverse(&cache_inverse, &now_bytes, &alias_bytes)
});
Ok(())
}
#[tracing::instrument(skip(self))]
async fn update(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
let now = DateTime::now();
let now_bytes = serde_json::to_vec(&now)?;
let to_clean = now.min_cache_date();
let to_clean_bytes = serde_json::to_vec(&to_clean)?;
let alias_bytes = alias.to_bytes();
let cache_inverse = self.cache_inverse.clone();
let aliases = b!(self.cache, {
let previous_datetime_opt = cache
.fetch_and_update(&alias_bytes, |previous_datetime_opt| {
previous_datetime_opt.map(|_| now_bytes.clone())
})?;
if let Some(previous_datetime_bytes) = previous_datetime_opt {
// Insert cached media into new date bucket
insert_cache_inverse(&cache_inverse, &now_bytes, &alias_bytes)?;
// Remove cached media from old date bucket
let mut old = cache_inverse.get(&previous_datetime_bytes)?;
loop {
let new = old
.as_ref()
.and_then(|bucket_bytes| {
let mut bucket = serde_cbor::from_slice::<Bucket>(bucket_bytes).ok()?;
bucket.remove(&alias_bytes);
if bucket.is_empty() {
tracing::trace!("Removed old {:?}", bucket);
None
} else {
tracing::trace!("Inserting old {:?}", bucket);
Some(serde_cbor::to_vec(&bucket))
}
})
.transpose()?;
if let Err(CompareAndSwapError { current, .. }) =
cache_inverse.compare_and_swap(&previous_datetime_bytes, old, new)?
{
old = current;
} else {
break;
}
}
}
let mut aliases: Vec<Alias> = Vec::new();
for (date_bytes, bucket_bytes) in
cache_inverse.range(..to_clean_bytes).filter_map(Result::ok)
{
if let Ok(datetime) = serde_json::from_slice::<DateTime>(&date_bytes) {
tracing::trace!("Checking {}", datetime);
} else {
tracing::warn!("Invalid date bytes");
}
if let Ok(bucket) = serde_cbor::from_slice::<Bucket>(&bucket_bytes) {
tracing::trace!("Read for deletion: {:?}", bucket);
for alias_bytes in bucket {
// Best effort cleanup
let _ = cache.remove(&alias_bytes);
if let Some(alias) = Alias::from_slice(&alias_bytes) {
aliases.push(alias);
}
}
} else {
tracing::warn!("Invalid bucket");
}
cache_inverse.remove(date_bytes)?;
}
#[cfg(debug)]
for date_bytes in cache_inverse.range(to_clean_bytes..).filter_map(Result::ok) {
if let Ok(datetime) = serde_json::from_slice::<DateTime>(&date_bytes) {
tracing::trace!("Not cleaning for {}", datetime);
} else {
tracing::warn!("Invalid date bytes");
}
}
Ok(aliases) as Result<_, Error>
});
Ok(aliases)
}
}
#[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))]

View file

@ -1,36 +0,0 @@
use std::collections::HashSet;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub(super) struct Bucket {
// each Vec<u8> represents a unique image hash
inner: HashSet<Vec<u8>>,
}
impl Bucket {
pub(super) fn empty() -> Self {
Self {
inner: HashSet::new(),
}
}
pub(super) fn insert(&mut self, alias_bytes: Vec<u8>) {
self.inner.insert(alias_bytes);
}
pub(super) fn remove(&mut self, alias_bytes: &[u8]) {
self.inner.remove(alias_bytes);
}
pub(super) fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl IntoIterator for Bucket {
type Item = <HashSet<Vec<u8>> as IntoIterator>::Item;
type IntoIter = <HashSet<Vec<u8>> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}

View file

@ -1,58 +0,0 @@
use std::ops::Deref;
use time::{Duration, OffsetDateTime};
use crate::CONFIG;
const SECONDS: i64 = 1;
const MINUTES: i64 = 60 * SECONDS;
const HOURS: i64 = 60 * MINUTES;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Deserialize, serde::Serialize)]
pub(super) struct DateTime {
#[serde(with = "time::serde::rfc3339")]
inner_date: OffsetDateTime,
}
impl DateTime {
pub(super) fn now() -> Self {
DateTime {
inner_date: OffsetDateTime::now_utc(),
}
}
pub(super) fn min_cache_date(&self) -> Self {
let cache_duration = Duration::new(CONFIG.media.cache_duration * HOURS, 0);
Self {
inner_date: self
.checked_sub(cache_duration)
.expect("Should never be older than Jan 7, 1970"),
}
}
}
impl std::fmt::Display for DateTime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner_date.fmt(f)
}
}
impl AsRef<OffsetDateTime> for DateTime {
fn as_ref(&self) -> &OffsetDateTime {
&self.inner_date
}
}
impl Deref for DateTime {
type Target = OffsetDateTime;
fn deref(&self) -> &Self::Target {
&self.inner_date
}
}
impl From<OffsetDateTime> for DateTime {
fn from(inner_date: OffsetDateTime) -> Self {
Self { inner_date }
}
}