Create unique errors for Repo and Store, separate from UploadError

Add .is_not_found() for Store errors, wire up Object and File storage to properly set NotFound
Allow skipping files that are not found during store migration
This commit is contained in:
asonix 2023-06-20 15:59:08 -05:00
parent 1e6705684e
commit 554d852e68
13 changed files with 510 additions and 316 deletions

View file

@ -148,13 +148,33 @@ impl Args {
},
}
}
Command::MigrateStore(migrate_store) => {
Command::MigrateStore(MigrateStore {
skip_missing_files,
store,
}) => {
let server = Server::default();
let media = Media::default();
match migrate_store {
MigrateStore::Filesystem(MigrateFilesystem { from, to }) => match to {
MigrateStoreInner::Filesystem(MigrateFilesystemInner { to, repo }) => {
match store {
MigrateStoreFrom::Filesystem(MigrateFilesystem { from, to }) => match to {
MigrateStoreTo::Filesystem(MigrateFilesystemInner { to, repo }) => Output {
config_format: ConfigFormat {
server,
old_db,
tracing,
media,
store: None,
repo,
},
operation: Operation::MigrateStore {
skip_missing_files,
from: from.into(),
to: to.into(),
},
config_file,
save_to,
},
MigrateStoreTo::ObjectStorage(MigrateObjectStorageInner { to, repo }) => {
Output {
config_format: ConfigFormat {
server,
@ -165,6 +185,7 @@ impl Args {
repo,
},
operation: Operation::MigrateStore {
skip_missing_files,
from: from.into(),
to: to.into(),
},
@ -172,29 +193,32 @@ impl Args {
save_to,
}
}
MigrateStoreInner::ObjectStorage(MigrateObjectStorageInner {
to,
repo,
}) => Output {
config_format: ConfigFormat {
server,
old_db,
tracing,
media,
store: None,
repo,
},
operation: Operation::MigrateStore {
from: from.into(),
to: to.into(),
},
config_file,
save_to,
},
},
MigrateStore::ObjectStorage(MigrateObjectStorage { from, to }) => match to {
MigrateStoreInner::Filesystem(MigrateFilesystemInner { to, repo }) => {
Output {
MigrateStoreFrom::ObjectStorage(MigrateObjectStorage { from, to }) => {
match to {
MigrateStoreTo::Filesystem(MigrateFilesystemInner { to, repo }) => {
Output {
config_format: ConfigFormat {
server,
old_db,
tracing,
media,
store: None,
repo,
},
operation: Operation::MigrateStore {
skip_missing_files,
from: from.into(),
to: to.into(),
},
config_file,
save_to,
}
}
MigrateStoreTo::ObjectStorage(MigrateObjectStorageInner {
to,
repo,
}) => Output {
config_format: ConfigFormat {
server,
old_db,
@ -204,33 +228,15 @@ impl Args {
repo,
},
operation: Operation::MigrateStore {
skip_missing_files,
from: from.into(),
to: to.into(),
},
config_file,
save_to,
}
},
}
MigrateStoreInner::ObjectStorage(MigrateObjectStorageInner {
to,
repo,
}) => Output {
config_format: ConfigFormat {
server,
old_db,
tracing,
media,
store: None,
repo,
},
operation: Operation::MigrateStore {
from: from.into(),
to: to.into(),
},
config_file,
save_to,
},
},
}
}
}
}
@ -249,6 +255,7 @@ pub(super) struct Output {
pub(crate) enum Operation {
Run,
MigrateStore {
skip_missing_files: bool,
from: crate::config::primitives::Store,
to: crate::config::primitives::Store,
},
@ -418,7 +425,6 @@ enum Command {
Run(Run),
/// Migrates from one provided media store to another
#[command(flatten)]
MigrateStore(MigrateStore),
}
@ -527,9 +533,20 @@ enum RunStore {
ObjectStorage(RunObjectStorage),
}
#[derive(Debug, Parser)]
struct MigrateStore {
/// Normally, pict-rs will keep retrying when errors occur during migration. This flag tells
/// pict-rs to ignore errors that are caused by files not existing.
#[arg(long)]
skip_missing_files: bool,
#[command(subcommand)]
store: MigrateStoreFrom,
}
/// Configure the pict-rs storage migration
#[derive(Debug, Subcommand)]
enum MigrateStore {
enum MigrateStoreFrom {
/// Migrate from the provided filesystem storage
Filesystem(MigrateFilesystem),
@ -539,7 +556,7 @@ enum MigrateStore {
/// Configure the destination storage for pict-rs storage migration
#[derive(Debug, Subcommand)]
enum MigrateStoreInner {
enum MigrateStoreTo {
/// Migrate to the provided filesystem storage
Filesystem(MigrateFilesystemInner),
@ -554,7 +571,7 @@ struct MigrateFilesystem {
from: Filesystem,
#[command(subcommand)]
to: MigrateStoreInner,
to: MigrateStoreTo,
}
/// Migrate pict-rs' storage to the provided filesystem storage
@ -574,7 +591,7 @@ struct MigrateObjectStorage {
from: crate::config::primitives::ObjectStorage,
#[command(subcommand)]
to: MigrateStoreInner,
to: MigrateStoreTo,
}
/// Migrate pict-rs' storage to the provided object storage

View file

@ -46,7 +46,7 @@ pub(crate) enum UploadError {
Upload(#[from] actix_form_data::Error),
#[error("Error in DB")]
Sled(#[from] crate::repo::sled::SledError),
Repo(#[from] crate::repo::RepoError),
#[error("Error in old sled DB")]
OldSled(#[from] ::sled::Error),
@ -63,11 +63,8 @@ pub(crate) enum UploadError {
#[error("Error stripping prefix")]
StripPrefix(#[from] std::path::StripPrefixError),
#[error("Error storing file")]
FileStore(#[from] crate::store::file_store::FileError),
#[error("Error storing object")]
ObjectStore(#[from] crate::store::object_store::ObjectError),
#[error("Error in store")]
Store(#[source] crate::store::StoreError),
#[error("Provided process path is invalid")]
ParsePath,
@ -81,9 +78,6 @@ pub(crate) enum UploadError {
#[error("No files present in upload")]
NoFiles,
#[error("Upload was already claimed")]
AlreadyClaimed,
#[error("Requested a file that doesn't exist")]
MissingAlias,
@ -151,6 +145,15 @@ impl From<tokio::sync::AcquireError> for UploadError {
}
}
impl From<crate::store::StoreError> for UploadError {
fn from(value: crate::store::StoreError) -> Self {
match value {
crate::store::StoreError::Repo(repo_error) => Self::Repo(repo_error),
e => Self::Store(e),
}
}
}
impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match self.kind() {
@ -160,11 +163,16 @@ impl ResponseError for Error {
| UploadError::NoFiles
| UploadError::Upload(_)
| UploadError::UnsupportedFormat
| UploadError::AlreadyClaimed
| UploadError::Store(crate::store::StoreError::Repo(
crate::repo::RepoError::AlreadyClaimed,
))
| UploadError::Repo(crate::repo::RepoError::AlreadyClaimed)
| UploadError::SilentVideoDisabled,
) => StatusCode::BAD_REQUEST,
Some(
UploadError::Sled(crate::repo::sled::SledError::Missing)
UploadError::Repo(crate::repo::RepoError::SledError(
crate::repo::sled::SledError::Missing,
))
| UploadError::MissingAlias,
) => StatusCode::NOT_FOUND,
Some(UploadError::InvalidToken) => StatusCode::FORBIDDEN,

View file

@ -3,7 +3,7 @@ use crate::{
error::{Error, UploadError},
magick::{Details, ValidInputType},
process::Process,
store::Store,
store::{Store, StoreError},
};
use actix_web::web::Bytes;
use once_cell::sync::OnceCell;
@ -413,7 +413,9 @@ where
{
let input_file = crate::tmp_file::tmp_file(None);
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(StoreError::from)?;
let tmp_one = crate::file::File::create(&input_file).await?;
let tmp_one = (f)(tmp_one).await?;
@ -523,11 +525,15 @@ pub(crate) async fn transcode_bytes(
) -> Result<impl AsyncRead + Unpin, Error> {
let input_file = crate::tmp_file::tmp_file(Some(transcode_options.input_file_extension()));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(StoreError::from)?;
let output_file = crate::tmp_file::tmp_file(Some(transcode_options.output_file_extension()));
let output_file_str = output_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&output_file).await?;
crate::store::file_store::safe_create_parent(&output_file)
.await
.map_err(StoreError::from)?;
let mut tmp_one = crate::file::File::create(&input_file).await?;
tmp_one.write_from_bytes(input).await?;
@ -557,7 +563,10 @@ pub(crate) async fn transcode_bytes(
tokio::fs::remove_file(input_file).await?;
let tmp_two = crate::file::File::open(&output_file).await?;
let stream = tmp_two.read_to_stream(None, None).await?;
let stream = tmp_two
.read_to_stream(None, None)
.await
.map_err(StoreError::from)?;
let reader = tokio_util::io::StreamReader::new(stream);
let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file);
@ -573,11 +582,15 @@ pub(crate) async fn thumbnail<S: Store>(
) -> Result<impl AsyncRead + Unpin, Error> {
let input_file = crate::tmp_file::tmp_file(Some(input_format.to_file_extension()));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(StoreError::from)?;
let output_file = crate::tmp_file::tmp_file(Some(format.to_file_extension()));
let output_file_str = output_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&output_file).await?;
crate::store::file_store::safe_create_parent(&output_file)
.await
.map_err(StoreError::from)?;
let mut tmp_one = crate::file::File::create(&input_file).await?;
tmp_one
@ -607,7 +620,10 @@ pub(crate) async fn thumbnail<S: Store>(
tokio::fs::remove_file(input_file).await?;
let tmp_two = crate::file::File::open(&output_file).await?;
let stream = tmp_two.read_to_stream(None, None).await?;
let stream = tmp_two
.read_to_stream(None, None)
.await
.map_err(StoreError::from)?;
let reader = tokio_util::io::StreamReader::new(stream);
let clean_reader = crate::tmp_file::cleanup_tmpfile(reader, output_file);

View file

@ -1108,7 +1108,12 @@ async fn launch<R: FullRepo + 'static, SC: StoreConfig + 'static>(
Ok(())
}
async fn migrate_inner<S1>(repo: &Repo, from: S1, to: config::Store) -> color_eyre::Result<()>
async fn migrate_inner<S1>(
repo: &Repo,
from: S1,
to: config::Store,
skip_missing_files: bool,
) -> color_eyre::Result<()>
where
S1: Store,
{
@ -1117,7 +1122,7 @@ where
let to = FileStore::build(path.clone(), repo.clone()).await?.build();
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?,
}
}
config::Store::ObjectStorage(config::ObjectStorage {
@ -1147,7 +1152,7 @@ where
.build();
match repo {
Repo::Sled(repo) => migrate_store(repo, from, to).await?,
Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?,
}
}
}
@ -1219,11 +1224,15 @@ pub async fn run() -> color_eyre::Result<()> {
match (*OPERATION).clone() {
Operation::Run => (),
Operation::MigrateStore { from, to } => {
Operation::MigrateStore {
skip_missing_files,
from,
to,
} => {
match from {
config::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?.build();
migrate_inner(&repo, from, to).await?;
migrate_inner(&repo, from, to, skip_missing_files).await?;
}
config::Store::ObjectStorage(config::ObjectStorage {
endpoint,
@ -1251,7 +1260,7 @@ pub async fn run() -> color_eyre::Result<()> {
.await?
.build();
migrate_inner(&repo, from, to).await?;
migrate_inner(&repo, from, to, skip_missing_files).await?;
}
}
@ -1304,15 +1313,22 @@ const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";
const STORE_MIGRATION_MOTION: &str = "store-migration-motion";
const STORE_MIGRATION_VARIANT: &str = "store-migration-variant";
async fn migrate_store<R, S1, S2>(repo: &R, from: S1, to: S2) -> Result<(), Error>
async fn migrate_store<R, S1, S2>(
repo: &R,
from: S1,
to: S2,
skip_missing_files: bool,
) -> Result<(), Error>
where
S1: Store + Clone,
S2: Store + Clone,
R: IdentifierRepo + HashRepo + SettingsRepo,
{
tracing::info!("Migrating store");
let mut failure_count = 0;
while let Err(e) = do_migrate_store(repo, from.clone(), to.clone()).await {
while let Err(e) = do_migrate_store(repo, from.clone(), to.clone(), skip_missing_files).await {
tracing::error!("Failed with {}", e.to_string());
failure_count += 1;
@ -1326,7 +1342,12 @@ where
Ok(())
}
async fn do_migrate_store<R, S1, S2>(repo: &R, from: S1, to: S2) -> Result<(), Error>
async fn do_migrate_store<R, S1, S2>(
repo: &R,
from: S1,
to: S2,
skip_missing_files: bool,
) -> Result<(), Error>
where
S1: Store,
S2: Store,
@ -1352,12 +1373,22 @@ where
.await?
{
if repo.get(STORE_MIGRATION_MOTION).await?.is_none() {
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_motion_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into())
.await?;
match migrate_file(&from, &to, &identifier, skip_missing_files).await {
Ok(new_identifier) => {
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_motion_identifier(
hash.as_ref().to_vec().into(),
&new_identifier,
)
.await?;
repo.set(STORE_MIGRATION_MOTION, b"1".to_vec().into())
.await?;
}
Err(e) if e.is_not_found() && skip_missing_files => {
tracing::warn!("Skipping motion file for hash {}", hex::encode(&hash));
}
Err(e) => return Err(e.into()),
}
}
}
@ -1371,22 +1402,45 @@ where
continue;
}
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone())
.await?;
repo.relate_variant_identifier(hash.as_ref().to_vec().into(), variant, &new_identifier)
.await?;
match migrate_file(&from, &to, &identifier, skip_missing_files).await {
Ok(new_identifier) => {
migrate_details(repo, identifier, &new_identifier).await?;
repo.remove_variant(hash.as_ref().to_vec().into(), variant.clone())
.await?;
repo.relate_variant_identifier(
hash.as_ref().to_vec().into(),
variant,
&new_identifier,
)
.await?;
repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into())
.await?;
repo.set(STORE_MIGRATION_VARIANT, new_identifier.to_bytes()?.into())
.await?;
}
Err(e) if e.is_not_found() && skip_missing_files => {
tracing::warn!(
"Skipping variant {} for hash {}",
variant,
hex::encode(&hash)
);
}
Err(e) => return Err(e.into()),
}
}
let identifier = repo.identifier(hash.as_ref().to_vec().into()).await?;
let new_identifier = migrate_file(&from, &to, &identifier).await?;
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
match migrate_file(&from, &to, &identifier, skip_missing_files).await {
Ok(new_identifier) => {
migrate_details(repo, identifier, &new_identifier).await?;
repo.relate_identifier(hash.as_ref().to_vec().into(), &new_identifier)
.await?;
}
Err(e) if e.is_not_found() && skip_missing_files => {
tracing::warn!("Skipping original file for hash {}", hex::encode(&hash));
}
Err(e) => return Err(e.into()),
}
repo.set(STORE_MIGRATION_PROGRESS, hash.as_ref().to_vec().into())
.await?;
@ -1404,7 +1458,8 @@ async fn migrate_file<S1, S2>(
from: &S1,
to: &S2,
identifier: &S1::Identifier,
) -> Result<S2::Identifier, Error>
skip_missing_files: bool,
) -> Result<S2::Identifier, crate::store::StoreError>
where
S1: Store,
S2: Store,
@ -1414,6 +1469,7 @@ where
loop {
match do_migrate_file(from, to, identifier).await {
Ok(identifier) => return Ok(identifier),
Err(e) if e.is_not_found() && skip_missing_files => return Err(e),
Err(e) => {
failure_count += 1;
@ -1432,7 +1488,7 @@ async fn do_migrate_file<S1, S2>(
from: &S1,
to: &S2,
identifier: &S1::Identifier,
) -> Result<S2::Identifier, Error>
) -> Result<S2::Identifier, crate::store::StoreError>
where
S1: Store,
S2: Store,

View file

@ -3,7 +3,7 @@ use crate::{
error::{Error, UploadError},
process::Process,
repo::Alias,
store::Store,
store::{Store, StoreError},
};
use actix_web::web::Bytes;
use tokio::{
@ -140,7 +140,9 @@ pub(crate) async fn details_bytes(
if let Some(hint) = hint.and_then(|hint| hint.video_hint()) {
let input_file = crate::tmp_file::tmp_file(Some(hint));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(StoreError::from)?;
let mut tmp_one = crate::file::File::create(&input_file).await?;
tmp_one.write_from_bytes(input).await?;
@ -178,7 +180,9 @@ pub(crate) async fn details_store<S: Store + 'static>(
if let Some(hint) = hint.and_then(|hint| hint.video_hint()) {
let input_file = crate::tmp_file::tmp_file(Some(hint));
let input_file_str = input_file.to_str().ok_or(UploadError::Path)?;
crate::store::file_store::safe_create_parent(&input_file).await?;
crate::store::file_store::safe_create_parent(&input_file)
.await
.map_err(StoreError::from)?;
let mut tmp_one = crate::file::File::create(&input_file).await?;
tmp_one

View file

@ -34,7 +34,8 @@ pub(crate) async fn chop_store<S: Store>(
let end = end + 1;
return store
.to_stream(identifier, Some(start), Some(end.saturating_sub(start)))
.await;
.await
.map_err(Error::from);
}
Err(UploadError::Range.into())

View file

@ -1,8 +1,7 @@
use crate::{
config,
details::Details,
error::Error,
store::{file_store::FileId, Identifier},
store::{file_store::FileId, Identifier, StoreError},
};
use base64::{prelude::BASE64_STANDARD, Engine};
use futures_util::Stream;
@ -47,6 +46,18 @@ pub(crate) enum UploadResult {
Failure { message: String },
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum RepoError {
#[error("Error in sled")]
SledError(#[from] crate::repo::sled::SledError),
#[error("Upload was already claimed")]
AlreadyClaimed,
#[error("Panic in blocking operation")]
Canceled,
}
#[async_trait::async_trait(?Send)]
pub(crate) trait FullRepo:
UploadRepo
@ -60,19 +71,19 @@ pub(crate) trait FullRepo:
+ Clone
+ Debug
{
async fn health_check(&self) -> Result<(), Error>;
async fn health_check(&self) -> Result<(), RepoError>;
#[tracing::instrument(skip(self))]
async fn identifier_from_alias<I: Identifier + 'static>(
&self,
alias: &Alias,
) -> Result<I, Error> {
) -> Result<I, StoreError> {
let hash = self.hash(alias).await?;
self.identifier(hash).await
}
#[tracing::instrument(skip(self))]
async fn aliases_from_alias(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
async fn aliases_from_alias(&self, alias: &Alias) -> Result<Vec<Alias>, RepoError> {
let hash = self.hash(alias).await?;
self.aliases(hash).await
}
@ -81,7 +92,7 @@ pub(crate) trait FullRepo:
async fn still_identifier_from_alias<I: Identifier + 'static>(
&self,
alias: &Alias,
) -> Result<Option<I>, Error> {
) -> Result<Option<I>, StoreError> {
let hash = self.hash(alias).await?;
let identifier = self.identifier::<I>(hash.clone()).await?;
@ -98,7 +109,7 @@ impl<T> FullRepo for actix_web::web::Data<T>
where
T: FullRepo,
{
async fn health_check(&self) -> Result<(), Error> {
async fn health_check(&self) -> Result<(), RepoError> {
T::health_check(self).await
}
}
@ -116,13 +127,13 @@ where
#[async_trait::async_trait(?Send)]
pub(crate) trait UploadRepo: BaseRepo {
async fn create(&self, upload_id: UploadId) -> Result<(), Error>;
async fn create(&self, upload_id: UploadId) -> Result<(), RepoError>;
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, Error>;
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError>;
async fn claim(&self, upload_id: UploadId) -> Result<(), Error>;
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError>;
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error>;
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
@ -130,30 +141,30 @@ impl<T> UploadRepo for actix_web::web::Data<T>
where
T: UploadRepo,
{
async fn create(&self, upload_id: UploadId) -> Result<(), Error> {
async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> {
T::create(self, upload_id).await
}
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, Error> {
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
T::wait(self, upload_id).await
}
async fn claim(&self, upload_id: UploadId) -> Result<(), Error> {
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> {
T::claim(self, upload_id).await
}
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> {
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> {
T::complete(self, upload_id, result).await
}
}
#[async_trait::async_trait(?Send)]
pub(crate) trait QueueRepo: BaseRepo {
async fn requeue_in_progress(&self, worker_prefix: Vec<u8>) -> Result<(), Error>;
async fn requeue_in_progress(&self, worker_prefix: Vec<u8>) -> Result<(), RepoError>;
async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error>;
async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError>;
async fn pop(&self, queue: &'static str, worker_id: Vec<u8>) -> Result<Self::Bytes, Error>;
async fn pop(&self, queue: &'static str, worker_id: Vec<u8>) -> Result<Self::Bytes, RepoError>;
}
#[async_trait::async_trait(?Send)]
@ -161,24 +172,24 @@ impl<T> QueueRepo for actix_web::web::Data<T>
where
T: QueueRepo,
{
async fn requeue_in_progress(&self, worker_prefix: Vec<u8>) -> Result<(), Error> {
async fn requeue_in_progress(&self, worker_prefix: Vec<u8>) -> Result<(), RepoError> {
T::requeue_in_progress(self, worker_prefix).await
}
async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), Error> {
async fn push(&self, queue: &'static str, job: Self::Bytes) -> Result<(), RepoError> {
T::push(self, queue, job).await
}
async fn pop(&self, queue: &'static str, worker_id: Vec<u8>) -> Result<Self::Bytes, Error> {
async fn pop(&self, queue: &'static str, worker_id: Vec<u8>) -> Result<Self::Bytes, RepoError> {
T::pop(self, queue, worker_id).await
}
}
#[async_trait::async_trait(?Send)]
pub(crate) trait SettingsRepo: BaseRepo {
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error>;
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, Error>;
async fn remove(&self, key: &'static str) -> Result<(), Error>;
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError>;
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, RepoError>;
async fn remove(&self, key: &'static str) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
@ -186,15 +197,15 @@ impl<T> SettingsRepo for actix_web::web::Data<T>
where
T: SettingsRepo,
{
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error> {
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> {
T::set(self, key, value).await
}
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, Error> {
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, RepoError> {
T::get(self, key).await
}
async fn remove(&self, key: &'static str) -> Result<(), Error> {
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
T::remove(self, key).await
}
}
@ -205,10 +216,10 @@ pub(crate) trait IdentifierRepo: BaseRepo {
&self,
identifier: &I,
details: &Details,
) -> Result<(), Error>;
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Error>;
) -> Result<(), StoreError>;
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, StoreError>;
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Error>;
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), StoreError>;
}
#[async_trait::async_trait(?Send)]
@ -220,66 +231,67 @@ where
&self,
identifier: &I,
details: &Details,
) -> Result<(), Error> {
) -> Result<(), StoreError> {
T::relate_details(self, identifier, details).await
}
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Error> {
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, StoreError> {
T::details(self, identifier).await
}
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Error> {
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), StoreError> {
T::cleanup(self, identifier).await
}
}
#[async_trait::async_trait(?Send)]
pub(crate) trait HashRepo: BaseRepo {
type Stream: Stream<Item = Result<Self::Bytes, Error>>;
type Stream: Stream<Item = Result<Self::Bytes, RepoError>>;
async fn hashes(&self) -> Self::Stream;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Error>;
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, RepoError>;
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error>;
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error>;
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Error>;
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>;
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError>;
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError>;
async fn relate_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Error>;
async fn identifier<I: Identifier + 'static>(&self, hash: Self::Bytes) -> Result<I, Error>;
) -> Result<(), StoreError>;
async fn identifier<I: Identifier + 'static>(&self, hash: Self::Bytes)
-> Result<I, StoreError>;
async fn relate_variant_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
variant: String,
identifier: &I,
) -> Result<(), Error>;
) -> Result<(), StoreError>;
async fn variant_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
variant: String,
) -> Result<Option<I>, Error>;
) -> Result<Option<I>, StoreError>;
async fn variants<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Vec<(String, I)>, Error>;
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error>;
) -> Result<Vec<(String, I)>, StoreError>;
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError>;
async fn relate_motion_identifier<I: Identifier>(
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Error>;
) -> Result<(), StoreError>;
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Option<I>, Error>;
) -> Result<Option<I>, StoreError>;
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error>;
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
@ -293,19 +305,19 @@ where
T::hashes(self).await
}
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Error> {
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, RepoError> {
T::create(self, hash).await
}
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> {
T::relate_alias(self, hash, alias).await
}
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> {
T::remove_alias(self, hash, alias).await
}
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Error> {
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError> {
T::aliases(self, hash).await
}
@ -313,11 +325,14 @@ where
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Error> {
) -> Result<(), StoreError> {
T::relate_identifier(self, hash, identifier).await
}
async fn identifier<I: Identifier + 'static>(&self, hash: Self::Bytes) -> Result<I, Error> {
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<I, StoreError> {
T::identifier(self, hash).await
}
@ -326,7 +341,7 @@ where
hash: Self::Bytes,
variant: String,
identifier: &I,
) -> Result<(), Error> {
) -> Result<(), StoreError> {
T::relate_variant_identifier(self, hash, variant, identifier).await
}
@ -334,18 +349,18 @@ where
&self,
hash: Self::Bytes,
variant: String,
) -> Result<Option<I>, Error> {
) -> Result<Option<I>, StoreError> {
T::variant_identifier(self, hash, variant).await
}
async fn variants<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Vec<(String, I)>, Error> {
) -> Result<Vec<(String, I)>, StoreError> {
T::variants(self, hash).await
}
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> {
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> {
T::remove_variant(self, hash, variant).await
}
@ -353,37 +368,37 @@ where
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Error> {
) -> Result<(), StoreError> {
T::relate_motion_identifier(self, hash, identifier).await
}
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Option<I>, Error> {
) -> Result<Option<I>, StoreError> {
T::motion_identifier(self, hash).await
}
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error> {
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> {
T::cleanup(self, hash).await
}
}
#[async_trait::async_trait(?Send)]
pub(crate) trait AliasRepo: BaseRepo {
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Error>;
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, RepoError>;
async fn relate_delete_token(
&self,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, Error>;
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Error>;
) -> Result<Result<(), AlreadyExists>, RepoError>;
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, RepoError>;
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error>;
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Error>;
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError>;
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, RepoError>;
async fn cleanup(&self, alias: &Alias) -> Result<(), Error>;
async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
@ -391,7 +406,7 @@ impl<T> AliasRepo for actix_web::web::Data<T>
where
T: AliasRepo,
{
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Error> {
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, RepoError> {
T::create(self, alias).await
}
@ -399,23 +414,23 @@ where
&self,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, Error> {
) -> Result<Result<(), AlreadyExists>, RepoError> {
T::relate_delete_token(self, alias, delete_token).await
}
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Error> {
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, RepoError> {
T::delete_token(self, alias).await
}
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> {
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError> {
T::relate_hash(self, alias, hash).await
}
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Error> {
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, RepoError> {
T::hash(self, alias).await
}
async fn cleanup(&self, alias: &Alias) -> Result<(), Error> {
async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> {
T::cleanup(self, alias).await
}
}
@ -836,14 +851,14 @@ impl std::fmt::Display for Alias {
}
impl Identifier for Vec<u8> {
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Error>
fn from_bytes(bytes: Vec<u8>) -> Result<Self, StoreError>
where
Self: Sized,
{
Ok(bytes)
}
fn to_bytes(&self) -> Result<Vec<u8>, Error> {
fn to_bytes(&self) -> Result<Vec<u8>, StoreError> {
Ok(self.clone())
}

View file

@ -1,10 +1,10 @@
use crate::{
error::{Error, UploadError},
repo::{
Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo,
Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult,
},
serde_str::Serde,
store::StoreError,
stream::from_iterator,
};
use futures_util::Stream;
@ -19,6 +19,8 @@ use std::{
};
use tokio::sync::Notify;
use super::RepoError;
macro_rules! b {
($self:ident.$ident:ident, $expr:expr) => {{
let $ident = $self.$ident.clone();
@ -27,7 +29,10 @@ macro_rules! b {
actix_rt::task::spawn_blocking(move || span.in_scope(|| $expr))
.await
.map_err(SledError::from)??
.map_err(SledError::from)
.map_err(RepoError::from)?
.map_err(SledError::from)
.map_err(RepoError::from)?
}};
}
@ -97,12 +102,12 @@ impl BaseRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl FullRepo for SledRepo {
async fn health_check(&self) -> Result<(), Error> {
async fn health_check(&self) -> Result<(), RepoError> {
let next = self.healthz_count.fetch_add(1, Ordering::Relaxed);
b!(self.healthz, {
healthz.insert("healthz", &next.to_be_bytes()[..])
});
self.healthz.flush_async().await?;
self.healthz.flush_async().await.map_err(SledError::from)?;
b!(self.healthz, healthz.get("healthz"));
Ok(())
}
@ -146,13 +151,13 @@ impl From<InnerUploadResult> for UploadResult {
#[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))]
async fn create(&self, upload_id: UploadId) -> Result<(), Error> {
async fn create(&self, upload_id: UploadId) -> Result<(), RepoError> {
b!(self.uploads, uploads.insert(upload_id.as_bytes(), b"1"));
Ok(())
}
#[tracing::instrument(skip(self))]
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, Error> {
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes());
let bytes = upload_id.as_bytes().to_vec();
@ -160,40 +165,42 @@ impl UploadRepo for SledRepo {
if let Some(bytes) = opt {
if bytes != b"1" {
let result: InnerUploadResult = serde_json::from_slice(&bytes)?;
let result: InnerUploadResult =
serde_json::from_slice(&bytes).map_err(SledError::from)?;
return Ok(result.into());
}
} else {
return Err(UploadError::AlreadyClaimed.into());
return Err(RepoError::AlreadyClaimed);
}
while let Some(event) = (&mut subscriber).await {
match event {
sled::Event::Remove { .. } => {
return Err(UploadError::AlreadyClaimed.into());
return Err(RepoError::AlreadyClaimed);
}
sled::Event::Insert { value, .. } => {
if value != b"1" {
let result: InnerUploadResult = serde_json::from_slice(&value)?;
let result: InnerUploadResult =
serde_json::from_slice(&value).map_err(SledError::from)?;
return Ok(result.into());
}
}
}
}
Err(UploadError::Canceled.into())
Err(RepoError::Canceled)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn claim(&self, upload_id: UploadId) -> Result<(), Error> {
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> {
b!(self.uploads, uploads.remove(upload_id.as_bytes()));
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, result))]
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> {
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), RepoError> {
let result: InnerUploadResult = result.into();
let result = serde_json::to_vec(&result)?;
let result = serde_json::to_vec(&result).map_err(SledError::from)?;
b!(self.uploads, uploads.insert(upload_id.as_bytes(), result));
@ -204,7 +211,7 @@ impl UploadRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl QueueRepo for SledRepo {
#[tracing::instrument(skip_all, fields(worker_id = %String::from_utf8_lossy(&worker_prefix)))]
async fn requeue_in_progress(&self, worker_prefix: Vec<u8>) -> Result<(), Error> {
async fn requeue_in_progress(&self, worker_prefix: Vec<u8>) -> Result<(), RepoError> {
let vec: Vec<(String, IVec)> = b!(self.in_progress_queue, {
let vec = in_progress_queue
.scan_prefix(worker_prefix)
@ -229,7 +236,7 @@ impl QueueRepo for SledRepo {
})
.collect::<Vec<(String, IVec)>>();
Ok(vec) as Result<_, Error>
Ok(vec) as Result<_, SledError>
});
let db = self.db.clone();
@ -242,15 +249,15 @@ impl QueueRepo for SledRepo {
queue.insert(key, job)?;
}
Ok(()) as Result<(), Error>
Ok(()) as Result<(), SledError>
});
Ok(())
}
#[tracing::instrument(skip(self, job), fields(job = %String::from_utf8_lossy(&job)))]
async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), Error> {
let id = self.db.generate_id()?;
async fn push(&self, queue_name: &'static str, job: Self::Bytes) -> Result<(), RepoError> {
let id = self.db.generate_id().map_err(SledError::from)?;
let mut key = queue_name.as_bytes().to_vec();
key.extend(id.to_be_bytes());
@ -276,7 +283,7 @@ impl QueueRepo for SledRepo {
&self,
queue_name: &'static str,
worker_id: Vec<u8>,
) -> Result<Self::Bytes, Error> {
) -> Result<Self::Bytes, RepoError> {
loop {
let in_progress_queue = self.in_progress_queue.clone();
@ -333,21 +340,21 @@ impl QueueRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl SettingsRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(value))]
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), Error> {
async fn set(&self, key: &'static str, value: Self::Bytes) -> Result<(), RepoError> {
b!(self.settings, settings.insert(key, value));
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, Error> {
async fn get(&self, key: &'static str) -> Result<Option<Self::Bytes>, RepoError> {
let opt = b!(self.settings, settings.get(key));
Ok(opt)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn remove(&self, key: &'static str) -> Result<(), Error> {
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
b!(self.settings, settings.remove(key));
Ok(())
@ -374,9 +381,11 @@ impl IdentifierRepo for SledRepo {
&self,
identifier: &I,
details: &Details,
) -> Result<(), Error> {
) -> Result<(), StoreError> {
let key = identifier.to_bytes()?;
let details = serde_json::to_vec(&details)?;
let details = serde_json::to_vec(&details)
.map_err(SledError::from)
.map_err(RepoError::from)?;
b!(
self.identifier_details,
@ -387,18 +396,20 @@ impl IdentifierRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, Error> {
async fn details<I: Identifier>(&self, identifier: &I) -> Result<Option<Details>, StoreError> {
let key = identifier.to_bytes()?;
let opt = b!(self.identifier_details, identifier_details.get(key));
opt.map(|ivec| serde_json::from_slice(&ivec))
.transpose()
.map_err(Error::from)
.map_err(SledError::from)
.map_err(RepoError::from)
.map_err(StoreError::from)
}
#[tracing::instrument(level = "trace", skip(self, identifier), fields(identifier = identifier.string_repr()))]
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), Error> {
async fn cleanup<I: Identifier>(&self, identifier: &I) -> Result<(), StoreError> {
let key = identifier.to_bytes()?;
b!(self.identifier_details, identifier_details.remove(key));
@ -407,7 +418,7 @@ impl IdentifierRepo for SledRepo {
}
}
type StreamItem = Result<IVec, Error>;
type StreamItem = Result<IVec, RepoError>;
type LocalBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + 'a>>;
fn hash_alias_key(hash: &IVec, alias: &Alias) -> Vec<u8> {
@ -425,13 +436,13 @@ impl HashRepo for SledRepo {
.hashes
.iter()
.keys()
.map(|res| res.map_err(Error::from));
.map(|res| res.map_err(SledError::from).map_err(RepoError::from));
Box::pin(from_iterator(iter, 8))
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, Error> {
async fn create(&self, hash: Self::Bytes) -> Result<Result<(), AlreadyExists>, RepoError> {
let res = b!(self.hashes, {
let hash2 = hash.clone();
hashes.compare_and_swap(hash, None as Option<Self::Bytes>, Some(hash2))
@ -441,7 +452,7 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
async fn relate_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> {
let key = hash_alias_key(&hash, alias);
let value = alias.to_bytes();
@ -451,7 +462,7 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), Error> {
async fn remove_alias(&self, hash: Self::Bytes, alias: &Alias) -> Result<(), RepoError> {
let key = hash_alias_key(&hash, alias);
b!(self.hash_aliases, hash_aliases.remove(key));
@ -460,7 +471,7 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(skip_all)]
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, Error> {
async fn aliases(&self, hash: Self::Bytes) -> Result<Vec<Alias>, RepoError> {
let v = b!(self.hash_aliases, {
Ok(hash_aliases
.scan_prefix(hash)
@ -478,7 +489,7 @@ impl HashRepo for SledRepo {
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Error> {
) -> Result<(), StoreError> {
let bytes = identifier.to_bytes()?;
b!(self.hash_identifiers, hash_identifiers.insert(hash, bytes));
@ -487,11 +498,15 @@ impl HashRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn identifier<I: Identifier + 'static>(&self, hash: Self::Bytes) -> Result<I, Error> {
async fn identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<I, StoreError> {
let opt = b!(self.hash_identifiers, hash_identifiers.get(hash));
opt.ok_or(SledError::Missing)
.map_err(Error::from)
.map_err(RepoError::from)
.map_err(StoreError::from)
.and_then(|ivec| I::from_bytes(ivec.to_vec()))
}
@ -501,7 +516,7 @@ impl HashRepo for SledRepo {
hash: Self::Bytes,
variant: String,
identifier: &I,
) -> Result<(), Error> {
) -> Result<(), StoreError> {
let key = variant_key(&hash, &variant);
let value = identifier.to_bytes()?;
@ -518,7 +533,7 @@ impl HashRepo for SledRepo {
&self,
hash: Self::Bytes,
variant: String,
) -> Result<Option<I>, Error> {
) -> Result<Option<I>, StoreError> {
let key = variant_key(&hash, &variant);
let opt = b!(
@ -526,16 +541,14 @@ impl HashRepo for SledRepo {
hash_variant_identifiers.get(key)
);
opt.map(|ivec| I::from_bytes(ivec.to_vec()))
.transpose()
.map_err(Error::from)
opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose()
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn variants<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Vec<(String, I)>, Error> {
) -> Result<Vec<(String, I)>, StoreError> {
let vec = b!(
self.hash_variant_identifiers,
Ok(hash_variant_identifiers
@ -557,14 +570,14 @@ impl HashRepo for SledRepo {
Some((variant?, identifier?))
})
.collect::<Vec<_>>()) as Result<Vec<_>, sled::Error>
.collect::<Vec<_>>()) as Result<Vec<_>, SledError>
);
Ok(vec)
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), Error> {
async fn remove_variant(&self, hash: Self::Bytes, variant: String) -> Result<(), RepoError> {
let key = variant_key(&hash, &variant);
b!(
@ -580,7 +593,7 @@ impl HashRepo for SledRepo {
&self,
hash: Self::Bytes,
identifier: &I,
) -> Result<(), Error> {
) -> Result<(), StoreError> {
let bytes = identifier.to_bytes()?;
b!(
@ -595,19 +608,17 @@ impl HashRepo for SledRepo {
async fn motion_identifier<I: Identifier + 'static>(
&self,
hash: Self::Bytes,
) -> Result<Option<I>, Error> {
) -> Result<Option<I>, StoreError> {
let opt = b!(
self.hash_motion_identifiers,
hash_motion_identifiers.get(hash)
);
opt.map(|ivec| I::from_bytes(ivec.to_vec()))
.transpose()
.map_err(Error::from)
opt.map(|ivec| I::from_bytes(ivec.to_vec())).transpose()
}
#[tracing::instrument(skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), Error> {
async fn cleanup(&self, hash: Self::Bytes) -> Result<(), RepoError> {
let hash2 = hash.clone();
b!(self.hashes, hashes.remove(hash2));
@ -628,7 +639,7 @@ impl HashRepo for SledRepo {
let _ = hash_aliases.remove(key);
}
Ok(()) as Result<(), sled::Error>
Ok(()) as Result<(), SledError>
});
let variant_keys = b!(self.hash_variant_identifiers, {
@ -638,13 +649,13 @@ impl HashRepo for SledRepo {
.filter_map(Result::ok)
.collect::<Vec<_>>();
Ok(v) as Result<Vec<_>, sled::Error>
Ok(v) as Result<Vec<_>, SledError>
});
b!(self.hash_variant_identifiers, {
for key in variant_keys {
let _ = hash_variant_identifiers.remove(key);
}
Ok(()) as Result<(), sled::Error>
Ok(()) as Result<(), SledError>
});
Ok(())
@ -654,7 +665,7 @@ impl HashRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl AliasRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))]
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, Error> {
async fn create(&self, alias: &Alias) -> Result<Result<(), AlreadyExists>, RepoError> {
let bytes = alias.to_bytes();
let bytes2 = bytes.clone();
@ -671,7 +682,7 @@ impl AliasRepo for SledRepo {
&self,
alias: &Alias,
delete_token: &DeleteToken,
) -> Result<Result<(), AlreadyExists>, Error> {
) -> Result<Result<(), AlreadyExists>, RepoError> {
let key = alias.to_bytes();
let token = delete_token.to_bytes();
@ -684,18 +695,18 @@ impl AliasRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self))]
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, Error> {
async fn delete_token(&self, alias: &Alias) -> Result<DeleteToken, RepoError> {
let key = alias.to_bytes();
let opt = b!(self.alias_delete_tokens, alias_delete_tokens.get(key));
opt.and_then(|ivec| DeleteToken::from_slice(&ivec))
.ok_or(SledError::Missing)
.map_err(Error::from)
.map_err(RepoError::from)
}
#[tracing::instrument(level = "trace", skip(self, hash), fields(hash = hex::encode(&hash)))]
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), Error> {
async fn relate_hash(&self, alias: &Alias, hash: Self::Bytes) -> Result<(), RepoError> {
let key = alias.to_bytes();
b!(self.alias_hashes, alias_hashes.insert(key, hash));
@ -704,16 +715,16 @@ impl AliasRepo for SledRepo {
}
#[tracing::instrument(level = "trace", skip(self))]
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, Error> {
async fn hash(&self, alias: &Alias) -> Result<Self::Bytes, RepoError> {
let key = alias.to_bytes();
let opt = b!(self.alias_hashes, alias_hashes.get(key));
opt.ok_or(SledError::Missing).map_err(Error::from)
opt.ok_or(SledError::Missing).map_err(RepoError::from)
}
#[tracing::instrument(skip(self))]
async fn cleanup(&self, alias: &Alias) -> Result<(), Error> {
async fn cleanup(&self, alias: &Alias) -> Result<(), RepoError> {
let key = alias.to_bytes();
let key2 = key.clone();

View file

@ -1,4 +1,3 @@
use crate::error::Error;
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use std::fmt::Debug;
@ -7,10 +6,56 @@ use tokio::io::{AsyncRead, AsyncWrite};
pub(crate) mod file_store;
pub(crate) mod object_store;
pub(crate) trait Identifier: Send + Sync + Clone + Debug {
fn to_bytes(&self) -> Result<Vec<u8>, Error>;
#[derive(Debug, thiserror::Error)]
pub(crate) enum StoreError {
#[error("Error in file store")]
FileStore(#[source] crate::store::file_store::FileError),
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Error>
#[error("Error in object store")]
ObjectStore(#[source] crate::store::object_store::ObjectError),
#[error("Error in DB")]
Repo(#[from] crate::repo::RepoError),
#[error("Requested file is not found")]
NotFound,
}
impl StoreError {
pub(crate) const fn is_not_found(&self) -> bool {
matches!(self, Self::NotFound)
}
}
impl From<crate::store::file_store::FileError> for StoreError {
fn from(value: crate::store::file_store::FileError) -> Self {
match value {
crate::store::file_store::FileError::Io(e)
if e.kind() == std::io::ErrorKind::NotFound =>
{
Self::NotFound
}
e => Self::FileStore(e),
}
}
}
impl From<crate::store::object_store::ObjectError> for StoreError {
fn from(value: crate::store::object_store::ObjectError) -> Self {
match value {
crate::store::object_store::ObjectError::Status(
actix_web::http::StatusCode::NOT_FOUND,
_,
) => Self::NotFound,
e => Self::ObjectStore(e),
}
}
}
pub(crate) trait Identifier: Send + Sync + Clone + Debug {
fn to_bytes(&self) -> Result<Vec<u8>, StoreError>;
fn from_bytes(bytes: Vec<u8>) -> Result<Self, StoreError>
where
Self: Sized;
@ -28,22 +73,22 @@ pub(crate) trait Store: Clone + Debug {
type Identifier: Identifier + 'static;
type Stream: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
where
Reader: AsyncRead + Unpin + 'static;
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error>;
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError>;
async fn to_stream(
&self,
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Error>;
) -> Result<Self::Stream, StoreError>;
async fn read_into<Writer>(
&self,
@ -53,9 +98,9 @@ pub(crate) trait Store: Clone + Debug {
where
Writer: AsyncWrite + Unpin;
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error>;
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, StoreError>;
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error>;
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError>;
}
#[async_trait::async_trait(?Send)]
@ -66,21 +111,21 @@ where
type Identifier = T::Identifier;
type Stream = T::Stream;
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
where
Reader: AsyncRead + Unpin + 'static,
{
T::save_async_read(self, reader).await
}
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
T::save_stream(self, stream).await
}
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
T::save_bytes(self, bytes).await
}
@ -89,7 +134,7 @@ where
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Error> {
) -> Result<Self::Stream, StoreError> {
T::to_stream(self, identifier, from_start, len).await
}
@ -104,11 +149,11 @@ where
T::read_into(self, identifier, writer).await
}
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, StoreError> {
T::len(self, identifier).await
}
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> {
T::remove(self, identifier).await
}
}
@ -121,21 +166,21 @@ where
type Identifier = T::Identifier;
type Stream = T::Stream;
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
where
Reader: AsyncRead + Unpin + 'static,
{
T::save_async_read(self, reader).await
}
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
T::save_stream(self, stream).await
}
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
T::save_bytes(self, bytes).await
}
@ -144,7 +189,7 @@ where
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Error> {
) -> Result<Self::Stream, StoreError> {
T::to_stream(self, identifier, from_start, len).await
}
@ -159,11 +204,11 @@ where
T::read_into(self, identifier, writer).await
}
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, StoreError> {
T::len(self, identifier).await
}
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> {
T::remove(self, identifier).await
}
}

View file

@ -1,5 +1,4 @@
use crate::{
error::Error,
file::File,
repo::{Repo, SettingsRepo},
store::{Store, StoreConfig},
@ -18,6 +17,8 @@ use tracing::Instrument;
mod file_id;
pub(crate) use file_id::FileId;
use super::StoreError;
// - Settings Tree
// - last-path -> last generated path
@ -62,7 +63,10 @@ impl Store for FileStore {
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
#[tracing::instrument(skip(reader))]
async fn save_async_read<Reader>(&self, mut reader: Reader) -> Result<Self::Identifier, Error>
async fn save_async_read<Reader>(
&self,
mut reader: Reader,
) -> Result<Self::Identifier, StoreError>
where
Reader: AsyncRead + Unpin + 'static,
{
@ -76,7 +80,7 @@ impl Store for FileStore {
Ok(self.file_id_from_path(path)?)
}
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, Error>
async fn save_stream<S>(&self, stream: S) -> Result<Self::Identifier, StoreError>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
@ -84,7 +88,7 @@ impl Store for FileStore {
}
#[tracing::instrument(skip(bytes))]
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
let path = self.next_file().await?;
if let Err(e) = self.safe_save_bytes(&path, bytes).await {
@ -101,14 +105,15 @@ impl Store for FileStore {
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Error> {
) -> Result<Self::Stream, StoreError> {
let path = self.path_from_file_id(identifier);
let file_span = tracing::trace_span!(parent: None, "File Stream");
let file = file_span
.in_scope(|| File::open(path))
.instrument(file_span.clone())
.await?;
.await
.map_err(FileError::from)?;
let stream = file_span
.in_scope(|| file.read_to_stream(from_start, len))
@ -135,16 +140,19 @@ impl Store for FileStore {
}
#[tracing::instrument]
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, StoreError> {
let path = self.path_from_file_id(identifier);
let len = tokio::fs::metadata(path).await?.len();
let len = tokio::fs::metadata(path)
.await
.map_err(FileError::from)?
.len();
Ok(len)
}
#[tracing::instrument]
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> {
let path = self.path_from_file_id(identifier);
self.safe_remove_file(path).await?;
@ -154,7 +162,7 @@ impl Store for FileStore {
}
impl FileStore {
pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result<Self, Error> {
pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result<Self, StoreError> {
let path_gen = init_generator(&repo).await?;
Ok(FileStore {
@ -164,7 +172,7 @@ impl FileStore {
})
}
async fn next_directory(&self) -> Result<PathBuf, Error> {
async fn next_directory(&self) -> Result<PathBuf, StoreError> {
let path = self.path_gen.next();
match self.repo {
@ -183,7 +191,7 @@ impl FileStore {
Ok(target_path)
}
async fn next_file(&self) -> Result<PathBuf, Error> {
async fn next_file(&self) -> Result<PathBuf, StoreError> {
let target_path = self.next_directory().await?;
let filename = uuid::Uuid::new_v4().to_string();
@ -271,12 +279,13 @@ pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), Fi
Ok(())
}
async fn init_generator(repo: &Repo) -> Result<Generator, Error> {
async fn init_generator(repo: &Repo) -> Result<Generator, StoreError> {
match repo {
Repo::Sled(sled_repo) => {
if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? {
Ok(Generator::from_existing(
storage_path_generator::Path::from_be_bytes(ivec.to_vec())?,
storage_path_generator::Path::from_be_bytes(ivec.to_vec())
.map_err(FileError::from)?,
))
} else {
Ok(Generator::new())

View file

@ -1,9 +1,6 @@
use crate::{
error::Error,
store::{
file_store::{FileError, FileStore},
Identifier,
},
use crate::store::{
file_store::{FileError, FileStore},
Identifier, StoreError,
};
use std::path::PathBuf;
@ -11,7 +8,7 @@ use std::path::PathBuf;
pub(crate) struct FileId(PathBuf);
impl Identifier for FileId {
fn to_bytes(&self) -> Result<Vec<u8>, Error> {
fn to_bytes(&self) -> Result<Vec<u8>, StoreError> {
let vec = self
.0
.to_str()
@ -22,7 +19,7 @@ impl Identifier for FileId {
Ok(vec)
}
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Error>
fn from_bytes(bytes: Vec<u8>) -> Result<Self, StoreError>
where
Self: Sized,
{

View file

@ -1,6 +1,5 @@
use crate::{
bytes_stream::BytesStream,
error::Error,
repo::{Repo, SettingsRepo},
store::{Store, StoreConfig},
};
@ -27,6 +26,8 @@ use url::Url;
mod object_id;
pub(crate) use object_id::ObjectId;
use super::StoreError;
const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880);
// - Settings Tree
@ -42,6 +43,9 @@ pub(crate) enum ObjectError {
#[error("Failed to generate request")]
S3(#[from] BucketError),
#[error("IO Error")]
IO(#[from] std::io::Error),
#[error("Error making request")]
SendRequest(String),
@ -62,6 +66,9 @@ pub(crate) enum ObjectError {
#[error("Invalid status: {0}\n{1}")]
Status(StatusCode, String),
#[error("Unable to upload image")]
Upload(awc::error::PayloadError),
}
impl From<SendRequestError> for ObjectError {
@ -131,7 +138,7 @@ fn payload_to_io_error(e: PayloadError) -> std::io::Error {
}
#[tracing::instrument(skip(stream))]
async fn read_chunk<S>(stream: &mut S) -> std::io::Result<BytesStream>
async fn read_chunk<S>(stream: &mut S) -> Result<BytesStream, ObjectError>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
@ -148,9 +155,9 @@ where
Ok(buf)
}
async fn status_error(mut response: ClientResponse) -> Error {
async fn status_error(mut response: ClientResponse) -> StoreError {
let body = match response.body().await {
Err(e) => return e.into(),
Err(e) => return ObjectError::Upload(e).into(),
Ok(body) => body,
};
@ -164,7 +171,7 @@ impl Store for ObjectStore {
type Identifier = ObjectId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, Error>
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
where
Reader: AsyncRead + Unpin + 'static,
{
@ -172,7 +179,7 @@ impl Store for ObjectStore {
}
#[tracing::instrument(skip_all)]
async fn save_stream<S>(&self, mut stream: S) -> Result<Self::Identifier, Error>
async fn save_stream<S>(&self, mut stream: S) -> Result<Self::Identifier, StoreError>
where
S: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static,
{
@ -181,7 +188,10 @@ impl Store for ObjectStore {
if first_chunk.len() < CHUNK_SIZE {
drop(stream);
let (req, object_id) = self.put_object_request().await?;
let response = req.send_body(first_chunk).await?;
let response = req
.send_body(first_chunk)
.await
.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response).await);
@ -199,7 +209,7 @@ impl Store for ObjectStore {
return Err(status_error(response).await);
}
let body = response.body().await?;
let body = response.body().await.map_err(ObjectError::Upload)?;
let body: InitiateMultipartUploadResponse =
quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?;
let upload_id = &body.upload_id;
@ -236,7 +246,8 @@ impl Store for ObjectStore {
)
.await?
.send_body(buf)
.await?;
.await
.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response).await);
@ -253,7 +264,7 @@ impl Store for ObjectStore {
// early-drop response to close its tracing spans
drop(response);
Ok(etag) as Result<String, Error>
Ok(etag) as Result<String, StoreError>
}
.instrument(tracing::Span::current()),
);
@ -276,20 +287,22 @@ impl Store for ObjectStore {
upload_id,
etags.iter().map(|s| s.as_ref()),
)
.await?;
.await
.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response).await);
}
Ok(()) as Result<(), Error>
Ok(()) as Result<(), StoreError>
}
.await;
if let Err(e) = res {
self.create_abort_multipart_request(&object_id, upload_id)
.send()
.await?;
.await
.map_err(ObjectError::from)?;
return Err(e);
}
@ -297,7 +310,7 @@ impl Store for ObjectStore {
}
#[tracing::instrument(skip_all)]
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Error> {
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, StoreError> {
let (req, object_id) = self.put_object_request().await?;
let response = req.send_body(bytes).await.map_err(ObjectError::from)?;
@ -315,7 +328,7 @@ impl Store for ObjectStore {
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Error> {
) -> Result<Self::Stream, StoreError> {
let response = self
.get_object_request(identifier, from_start, len)
.send()
@ -361,7 +374,7 @@ impl Store for ObjectStore {
}
#[tracing::instrument(skip(self))]
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Error> {
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, StoreError> {
let response = self
.head_object_request(identifier)
.send()
@ -385,8 +398,12 @@ impl Store for ObjectStore {
}
#[tracing::instrument(skip(self))]
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> {
let response = self.delete_object_request(identifier).send().await?;
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), StoreError> {
let response = self
.delete_object_request(identifier)
.send()
.await
.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response).await);
@ -407,7 +424,7 @@ impl ObjectStore {
secret_key: String,
session_token: Option<String>,
repo: Repo,
) -> Result<ObjectStoreConfig, Error> {
) -> Result<ObjectStoreConfig, StoreError> {
let path_gen = init_generator(&repo).await?;
Ok(ObjectStoreConfig {
@ -423,7 +440,7 @@ impl ObjectStore {
})
}
async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), Error> {
async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> {
let path = self.next_file().await?;
let mut action = self.bucket.put_object(Some(&self.credentials), &path);
@ -435,7 +452,7 @@ impl ObjectStore {
Ok((self.build_request(action), ObjectId::from_string(path)))
}
async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), Error> {
async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> {
let path = self.next_file().await?;
let mut action = self
@ -455,7 +472,7 @@ impl ObjectStore {
object_id: &ObjectId,
part_number: u16,
upload_id: &str,
) -> Result<ClientRequest, Error> {
) -> Result<ClientRequest, ObjectError> {
use md5::Digest;
let mut action = self.bucket.upload_part(
@ -593,7 +610,7 @@ impl ObjectStore {
self.build_request(action)
}
async fn next_directory(&self) -> Result<Path, Error> {
async fn next_directory(&self) -> Result<Path, StoreError> {
let path = self.path_gen.next();
match self.repo {
@ -607,7 +624,7 @@ impl ObjectStore {
Ok(path)
}
async fn next_file(&self) -> Result<String, Error> {
async fn next_file(&self) -> Result<String, StoreError> {
let path = self.next_directory().await?.to_strings().join("/");
let filename = uuid::Uuid::new_v4().to_string();
@ -615,12 +632,13 @@ impl ObjectStore {
}
}
async fn init_generator(repo: &Repo) -> Result<Generator, Error> {
async fn init_generator(repo: &Repo) -> Result<Generator, StoreError> {
match repo {
Repo::Sled(sled_repo) => {
if let Some(ivec) = sled_repo.get(GENERATOR_KEY).await? {
Ok(Generator::from_existing(
storage_path_generator::Path::from_be_bytes(ivec.to_vec())?,
storage_path_generator::Path::from_be_bytes(ivec.to_vec())
.map_err(ObjectError::from)?,
))
} else {
Ok(Generator::new())

View file

@ -1,17 +1,14 @@
use crate::{
error::Error,
store::{object_store::ObjectError, Identifier},
};
use crate::store::{object_store::ObjectError, Identifier, StoreError};
#[derive(Debug, Clone)]
pub(crate) struct ObjectId(String);
impl Identifier for ObjectId {
fn to_bytes(&self) -> Result<Vec<u8>, Error> {
fn to_bytes(&self) -> Result<Vec<u8>, StoreError> {
Ok(self.0.as_bytes().to_vec())
}
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Error> {
fn from_bytes(bytes: Vec<u8>) -> Result<Self, StoreError> {
Ok(ObjectId(
String::from_utf8(bytes).map_err(ObjectError::from)?,
))