mirror of
https://git.asonix.dog/asonix/pict-rs.git
synced 2024-11-24 10:31:07 +00:00
it might work
This commit is contained in:
parent
a2933dbebc
commit
8a892ba622
7 changed files with 84 additions and 18 deletions
4
dev.toml
4
dev.toml
|
@ -62,6 +62,10 @@ crf_max = 12
|
||||||
type = 'postgres'
|
type = 'postgres'
|
||||||
url = 'postgres://postgres:1234@localhost:5432/postgres'
|
url = 'postgres://postgres:1234@localhost:5432/postgres'
|
||||||
|
|
||||||
|
# [repo]
|
||||||
|
# type = 'sled'
|
||||||
|
# path = 'data/sled-repo-local'
|
||||||
|
|
||||||
[store]
|
[store]
|
||||||
type = 'filesystem'
|
type = 'filesystem'
|
||||||
path = 'data/files-local'
|
path = 'data/files-local'
|
||||||
|
|
|
@ -68,7 +68,7 @@ where
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip(repo, store, media))]
|
||||||
async fn process_ingest<S>(
|
async fn process_ingest<S>(
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
|
@ -126,7 +126,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip(repo, store, process_map, process_path, process_args, config))]
|
||||||
async fn generate<S: Store + 'static>(
|
async fn generate<S: Store + 'static>(
|
||||||
repo: &ArcRepo,
|
repo: &ArcRepo,
|
||||||
store: &S,
|
store: &S,
|
||||||
|
|
|
@ -524,7 +524,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub(crate) struct OrderedHash {
|
pub(crate) struct OrderedHash {
|
||||||
timestamp: time::OffsetDateTime,
|
timestamp: time::OffsetDateTime,
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
|
|
|
@ -231,6 +231,7 @@ impl BaseRepo for PostgresRepo {}
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl HashRepo for PostgresRepo {
|
impl HashRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn size(&self) -> Result<u64, RepoError> {
|
async fn size(&self) -> Result<u64, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
|
@ -245,6 +246,7 @@ impl HashRepo for PostgresRepo {
|
||||||
Ok(count.try_into().expect("non-negative count"))
|
Ok(count.try_into().expect("non-negative count"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn bound(&self, input_hash: Hash) -> Result<Option<OrderedHash>, RepoError> {
|
async fn bound(&self, input_hash: Hash) -> Result<Option<OrderedHash>, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
|
@ -265,6 +267,7 @@ impl HashRepo for PostgresRepo {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn hash_page_by_date(
|
async fn hash_page_by_date(
|
||||||
&self,
|
&self,
|
||||||
date: time::OffsetDateTime,
|
date: time::OffsetDateTime,
|
||||||
|
@ -292,6 +295,7 @@ impl HashRepo for PostgresRepo {
|
||||||
self.hashes_ordered(ordered_hash, limit).await
|
self.hashes_ordered(ordered_hash, limit).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn hashes_ordered(
|
async fn hashes_ordered(
|
||||||
&self,
|
&self,
|
||||||
bound: Option<OrderedHash>,
|
bound: Option<OrderedHash>,
|
||||||
|
@ -355,6 +359,7 @@ impl HashRepo for PostgresRepo {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn create_hash_with_timestamp(
|
async fn create_hash_with_timestamp(
|
||||||
&self,
|
&self,
|
||||||
input_hash: Hash,
|
input_hash: Hash,
|
||||||
|
@ -386,6 +391,7 @@ impl HashRepo for PostgresRepo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn update_identifier(
|
async fn update_identifier(
|
||||||
&self,
|
&self,
|
||||||
input_hash: Hash,
|
input_hash: Hash,
|
||||||
|
@ -405,6 +411,7 @@ impl HashRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
async fn identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
|
@ -421,6 +428,7 @@ impl HashRepo for PostgresRepo {
|
||||||
Ok(opt.map(Arc::from))
|
Ok(opt.map(Arc::from))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn relate_variant_identifier(
|
async fn relate_variant_identifier(
|
||||||
&self,
|
&self,
|
||||||
input_hash: Hash,
|
input_hash: Hash,
|
||||||
|
@ -450,6 +458,7 @@ impl HashRepo for PostgresRepo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn variant_identifier(
|
async fn variant_identifier(
|
||||||
&self,
|
&self,
|
||||||
input_hash: Hash,
|
input_hash: Hash,
|
||||||
|
@ -472,6 +481,7 @@ impl HashRepo for PostgresRepo {
|
||||||
Ok(opt)
|
Ok(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
|
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
|
||||||
use schema::variants::dsl::*;
|
use schema::variants::dsl::*;
|
||||||
|
|
||||||
|
@ -490,6 +500,7 @@ impl HashRepo for PostgresRepo {
|
||||||
Ok(vec)
|
Ok(vec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn remove_variant(
|
async fn remove_variant(
|
||||||
&self,
|
&self,
|
||||||
input_hash: Hash,
|
input_hash: Hash,
|
||||||
|
@ -509,6 +520,7 @@ impl HashRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn relate_motion_identifier(
|
async fn relate_motion_identifier(
|
||||||
&self,
|
&self,
|
||||||
input_hash: Hash,
|
input_hash: Hash,
|
||||||
|
@ -528,6 +540,7 @@ impl HashRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn motion_identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
async fn motion_identifier(&self, input_hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
|
||||||
use schema::hashes::dsl::*;
|
use schema::hashes::dsl::*;
|
||||||
|
|
||||||
|
@ -546,18 +559,19 @@ impl HashRepo for PostgresRepo {
|
||||||
Ok(opt)
|
Ok(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> {
|
async fn cleanup_hash(&self, input_hash: Hash) -> Result<(), RepoError> {
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
||||||
|
|
||||||
conn.transaction(|conn| {
|
conn.transaction(|conn| {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
diesel::delete(schema::hashes::dsl::hashes)
|
diesel::delete(schema::variants::dsl::variants)
|
||||||
.filter(schema::hashes::dsl::hash.eq(&input_hash))
|
.filter(schema::variants::dsl::hash.eq(&input_hash))
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
diesel::delete(schema::variants::dsl::variants)
|
diesel::delete(schema::hashes::dsl::hashes)
|
||||||
.filter(schema::variants::dsl::hash.eq(&input_hash))
|
.filter(schema::hashes::dsl::hash.eq(&input_hash))
|
||||||
.execute(conn)
|
.execute(conn)
|
||||||
.await
|
.await
|
||||||
})
|
})
|
||||||
|
@ -571,6 +585,7 @@ impl HashRepo for PostgresRepo {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl AliasRepo for PostgresRepo {
|
impl AliasRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn create_alias(
|
async fn create_alias(
|
||||||
&self,
|
&self,
|
||||||
input_alias: &Alias,
|
input_alias: &Alias,
|
||||||
|
@ -600,6 +615,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn delete_token(&self, input_alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
|
async fn delete_token(&self, input_alias: &Alias) -> Result<Option<DeleteToken>, RepoError> {
|
||||||
use schema::aliases::dsl::*;
|
use schema::aliases::dsl::*;
|
||||||
|
|
||||||
|
@ -616,6 +632,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
Ok(opt)
|
Ok(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn hash(&self, input_alias: &Alias) -> Result<Option<Hash>, RepoError> {
|
async fn hash(&self, input_alias: &Alias) -> Result<Option<Hash>, RepoError> {
|
||||||
use schema::aliases::dsl::*;
|
use schema::aliases::dsl::*;
|
||||||
|
|
||||||
|
@ -632,6 +649,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
Ok(opt)
|
Ok(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn aliases_for_hash(&self, input_hash: Hash) -> Result<Vec<Alias>, RepoError> {
|
async fn aliases_for_hash(&self, input_hash: Hash) -> Result<Vec<Alias>, RepoError> {
|
||||||
use schema::aliases::dsl::*;
|
use schema::aliases::dsl::*;
|
||||||
|
|
||||||
|
@ -647,6 +665,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
Ok(vec)
|
Ok(vec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn cleanup_alias(&self, input_alias: &Alias) -> Result<(), RepoError> {
|
async fn cleanup_alias(&self, input_alias: &Alias) -> Result<(), RepoError> {
|
||||||
use schema::aliases::dsl::*;
|
use schema::aliases::dsl::*;
|
||||||
|
|
||||||
|
@ -664,6 +683,7 @@ impl AliasRepo for PostgresRepo {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl SettingsRepo for PostgresRepo {
|
impl SettingsRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self, input_value))]
|
||||||
async fn set(&self, input_key: &'static str, input_value: Arc<[u8]>) -> Result<(), RepoError> {
|
async fn set(&self, input_key: &'static str, input_value: Arc<[u8]>) -> Result<(), RepoError> {
|
||||||
use schema::settings::dsl::*;
|
use schema::settings::dsl::*;
|
||||||
|
|
||||||
|
@ -672,7 +692,10 @@ impl SettingsRepo for PostgresRepo {
|
||||||
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
|
||||||
|
|
||||||
diesel::insert_into(settings)
|
diesel::insert_into(settings)
|
||||||
.values((key.eq(input_key), value.eq(input_value)))
|
.values((key.eq(input_key), value.eq(&input_value)))
|
||||||
|
.on_conflict(key)
|
||||||
|
.do_update()
|
||||||
|
.set(value.eq(&input_value))
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await
|
.await
|
||||||
.map_err(PostgresError::Diesel)?;
|
.map_err(PostgresError::Diesel)?;
|
||||||
|
@ -680,6 +703,7 @@ impl SettingsRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn get(&self, input_key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
|
async fn get(&self, input_key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
|
||||||
use schema::settings::dsl::*;
|
use schema::settings::dsl::*;
|
||||||
|
|
||||||
|
@ -700,6 +724,7 @@ impl SettingsRepo for PostgresRepo {
|
||||||
Ok(opt)
|
Ok(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> {
|
async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> {
|
||||||
use schema::settings::dsl::*;
|
use schema::settings::dsl::*;
|
||||||
|
|
||||||
|
@ -717,6 +742,7 @@ impl SettingsRepo for PostgresRepo {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl DetailsRepo for PostgresRepo {
|
impl DetailsRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self, input_details))]
|
||||||
async fn relate_details(
|
async fn relate_details(
|
||||||
&self,
|
&self,
|
||||||
input_identifier: &Arc<str>,
|
input_identifier: &Arc<str>,
|
||||||
|
@ -738,6 +764,7 @@ impl DetailsRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn details(&self, input_identifier: &Arc<str>) -> Result<Option<Details>, RepoError> {
|
async fn details(&self, input_identifier: &Arc<str>) -> Result<Option<Details>, RepoError> {
|
||||||
use schema::details::dsl::*;
|
use schema::details::dsl::*;
|
||||||
|
|
||||||
|
@ -758,6 +785,7 @@ impl DetailsRepo for PostgresRepo {
|
||||||
Ok(opt)
|
Ok(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn cleanup_details(&self, input_identifier: &Arc<str>) -> Result<(), RepoError> {
|
async fn cleanup_details(&self, input_identifier: &Arc<str>) -> Result<(), RepoError> {
|
||||||
use schema::details::dsl::*;
|
use schema::details::dsl::*;
|
||||||
|
|
||||||
|
@ -775,6 +803,7 @@ impl DetailsRepo for PostgresRepo {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl QueueRepo for PostgresRepo {
|
impl QueueRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self, job_json))]
|
||||||
async fn push(
|
async fn push(
|
||||||
&self,
|
&self,
|
||||||
queue_name: &'static str,
|
queue_name: &'static str,
|
||||||
|
@ -794,6 +823,7 @@ impl QueueRepo for PostgresRepo {
|
||||||
Ok(JobId(job_id))
|
Ok(JobId(job_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn pop(
|
async fn pop(
|
||||||
&self,
|
&self,
|
||||||
queue_name: &'static str,
|
queue_name: &'static str,
|
||||||
|
@ -869,6 +899,7 @@ impl QueueRepo for PostgresRepo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn heartbeat(
|
async fn heartbeat(
|
||||||
&self,
|
&self,
|
||||||
queue_name: &'static str,
|
queue_name: &'static str,
|
||||||
|
@ -895,6 +926,7 @@ impl QueueRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn complete_job(
|
async fn complete_job(
|
||||||
&self,
|
&self,
|
||||||
queue_name: &'static str,
|
queue_name: &'static str,
|
||||||
|
@ -921,6 +953,7 @@ impl QueueRepo for PostgresRepo {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl StoreMigrationRepo for PostgresRepo {
|
impl StoreMigrationRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
|
async fn is_continuing_migration(&self) -> Result<bool, RepoError> {
|
||||||
use schema::store_migrations::dsl::*;
|
use schema::store_migrations::dsl::*;
|
||||||
|
|
||||||
|
@ -935,6 +968,7 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
Ok(count > 0)
|
Ok(count > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn mark_migrated(
|
async fn mark_migrated(
|
||||||
&self,
|
&self,
|
||||||
input_old_identifier: &Arc<str>,
|
input_old_identifier: &Arc<str>,
|
||||||
|
@ -958,6 +992,7 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn is_migrated(&self, input_old_identifier: &Arc<str>) -> Result<bool, RepoError> {
|
async fn is_migrated(&self, input_old_identifier: &Arc<str>) -> Result<bool, RepoError> {
|
||||||
use schema::store_migrations::dsl::*;
|
use schema::store_migrations::dsl::*;
|
||||||
|
|
||||||
|
@ -973,6 +1008,7 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
Ok(b)
|
Ok(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn clear(&self) -> Result<(), RepoError> {
|
async fn clear(&self) -> Result<(), RepoError> {
|
||||||
use schema::store_migrations::dsl::*;
|
use schema::store_migrations::dsl::*;
|
||||||
|
|
||||||
|
@ -989,6 +1025,7 @@ impl StoreMigrationRepo for PostgresRepo {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl ProxyRepo for PostgresRepo {
|
impl ProxyRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> {
|
async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> {
|
||||||
use schema::proxies::dsl::*;
|
use schema::proxies::dsl::*;
|
||||||
|
|
||||||
|
@ -1003,6 +1040,7 @@ impl ProxyRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn related(&self, input_url: Url) -> Result<Option<Alias>, RepoError> {
|
async fn related(&self, input_url: Url) -> Result<Option<Alias>, RepoError> {
|
||||||
use schema::proxies::dsl::*;
|
use schema::proxies::dsl::*;
|
||||||
|
|
||||||
|
@ -1019,6 +1057,7 @@ impl ProxyRepo for PostgresRepo {
|
||||||
Ok(opt)
|
Ok(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn remove_relation(&self, input_alias: Alias) -> Result<(), RepoError> {
|
async fn remove_relation(&self, input_alias: Alias) -> Result<(), RepoError> {
|
||||||
use schema::proxies::dsl::*;
|
use schema::proxies::dsl::*;
|
||||||
|
|
||||||
|
@ -1036,6 +1075,7 @@ impl ProxyRepo for PostgresRepo {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl AliasAccessRepo for PostgresRepo {
|
impl AliasAccessRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn set_accessed_alias(
|
async fn set_accessed_alias(
|
||||||
&self,
|
&self,
|
||||||
input_alias: Alias,
|
input_alias: Alias,
|
||||||
|
@ -1057,6 +1097,7 @@ impl AliasAccessRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn alias_accessed_at(
|
async fn alias_accessed_at(
|
||||||
&self,
|
&self,
|
||||||
input_alias: Alias,
|
input_alias: Alias,
|
||||||
|
@ -1077,6 +1118,7 @@ impl AliasAccessRepo for PostgresRepo {
|
||||||
Ok(opt)
|
Ok(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn older_aliases(
|
async fn older_aliases(
|
||||||
&self,
|
&self,
|
||||||
timestamp: time::OffsetDateTime,
|
timestamp: time::OffsetDateTime,
|
||||||
|
@ -1115,6 +1157,7 @@ impl AliasAccessRepo for PostgresRepo {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl VariantAccessRepo for PostgresRepo {
|
impl VariantAccessRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn set_accessed_variant(
|
async fn set_accessed_variant(
|
||||||
&self,
|
&self,
|
||||||
input_hash: Hash,
|
input_hash: Hash,
|
||||||
|
@ -1137,6 +1180,7 @@ impl VariantAccessRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn variant_accessed_at(
|
async fn variant_accessed_at(
|
||||||
&self,
|
&self,
|
||||||
input_hash: Hash,
|
input_hash: Hash,
|
||||||
|
@ -1158,6 +1202,7 @@ impl VariantAccessRepo for PostgresRepo {
|
||||||
Ok(opt)
|
Ok(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn older_variants(
|
async fn older_variants(
|
||||||
&self,
|
&self,
|
||||||
timestamp: time::OffsetDateTime,
|
timestamp: time::OffsetDateTime,
|
||||||
|
@ -1232,6 +1277,7 @@ impl From<InnerUploadResult> for UploadResult {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl UploadRepo for PostgresRepo {
|
impl UploadRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn create_upload(&self) -> Result<UploadId, RepoError> {
|
async fn create_upload(&self) -> Result<UploadId, RepoError> {
|
||||||
use schema::uploads::dsl::*;
|
use schema::uploads::dsl::*;
|
||||||
|
|
||||||
|
@ -1247,6 +1293,7 @@ impl UploadRepo for PostgresRepo {
|
||||||
Ok(UploadId { id: uuid })
|
Ok(UploadId { id: uuid })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
|
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
|
||||||
use schema::uploads::dsl::*;
|
use schema::uploads::dsl::*;
|
||||||
|
|
||||||
|
@ -1293,6 +1340,7 @@ impl UploadRepo for PostgresRepo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> {
|
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> {
|
||||||
use schema::uploads::dsl::*;
|
use schema::uploads::dsl::*;
|
||||||
|
|
||||||
|
@ -1307,6 +1355,7 @@ impl UploadRepo for PostgresRepo {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn complete_upload(
|
async fn complete_upload(
|
||||||
&self,
|
&self,
|
||||||
upload_id: UploadId,
|
upload_id: UploadId,
|
||||||
|
@ -1333,6 +1382,7 @@ impl UploadRepo for PostgresRepo {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl FullRepo for PostgresRepo {
|
impl FullRepo for PostgresRepo {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn health_check(&self) -> Result<(), RepoError> {
|
async fn health_check(&self) -> Result<(), RepoError> {
|
||||||
let next = self.inner.health_count.fetch_add(1, Ordering::Relaxed);
|
let next = self.inner.health_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ use barrel::{types, Migration};
|
||||||
pub(crate) fn migration() -> String {
|
pub(crate) fn migration() -> String {
|
||||||
let mut m = Migration::new();
|
let mut m = Migration::new();
|
||||||
|
|
||||||
m.inject_custom("CREATE EXTENSION pgcrypto;");
|
m.inject_custom("CREATE EXTENSION IF NOT EXISTS pgcrypto;");
|
||||||
|
|
||||||
m.make::<Pg>().to_string()
|
m.make::<Pg>().to_string()
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,10 @@ pub(crate) fn migration() -> String {
|
||||||
t.add_column("queue", types::text().size(50).nullable(false));
|
t.add_column("queue", types::text().size(50).nullable(false));
|
||||||
t.add_column("job", types::custom("jsonb").nullable(false));
|
t.add_column("job", types::custom("jsonb").nullable(false));
|
||||||
t.add_column("worker", types::uuid().nullable(true));
|
t.add_column("worker", types::uuid().nullable(true));
|
||||||
t.add_column("status", types::custom("job_status").nullable(false));
|
t.add_column(
|
||||||
|
"status",
|
||||||
|
types::custom("job_status").nullable(false).default("new"),
|
||||||
|
);
|
||||||
t.add_column(
|
t.add_column(
|
||||||
"queue_time",
|
"queue_time",
|
||||||
types::datetime()
|
types::datetime()
|
||||||
|
|
|
@ -27,6 +27,9 @@ pub(crate) enum FileError {
|
||||||
#[error("Failed to generate path")]
|
#[error("Failed to generate path")]
|
||||||
PathGenerator(#[from] storage_path_generator::PathError),
|
PathGenerator(#[from] storage_path_generator::PathError),
|
||||||
|
|
||||||
|
#[error("Couldn't strip root dir")]
|
||||||
|
PrefixError,
|
||||||
|
|
||||||
#[error("Couldn't convert Path to String")]
|
#[error("Couldn't convert Path to String")]
|
||||||
StringError,
|
StringError,
|
||||||
|
|
||||||
|
@ -40,7 +43,7 @@ impl FileError {
|
||||||
Self::Io(_) => ErrorCode::FILE_IO_ERROR,
|
Self::Io(_) => ErrorCode::FILE_IO_ERROR,
|
||||||
Self::PathGenerator(_) => ErrorCode::PARSE_PATH_ERROR,
|
Self::PathGenerator(_) => ErrorCode::PARSE_PATH_ERROR,
|
||||||
Self::FileExists => ErrorCode::FILE_EXISTS,
|
Self::FileExists => ErrorCode::FILE_EXISTS,
|
||||||
Self::StringError => ErrorCode::FORMAT_FILE_ID_ERROR,
|
Self::StringError | Self::PrefixError => ErrorCode::FORMAT_FILE_ID_ERROR,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -54,6 +57,7 @@ pub(crate) struct FileStore {
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl Store for FileStore {
|
impl Store for FileStore {
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self))]
|
||||||
async fn health_check(&self) -> Result<(), StoreError> {
|
async fn health_check(&self) -> Result<(), StoreError> {
|
||||||
tokio::fs::metadata(&self.root_dir)
|
tokio::fs::metadata(&self.root_dir)
|
||||||
.await
|
.await
|
||||||
|
@ -62,7 +66,7 @@ impl Store for FileStore {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(reader))]
|
#[tracing::instrument(skip(self, reader))]
|
||||||
async fn save_async_read<Reader>(
|
async fn save_async_read<Reader>(
|
||||||
&self,
|
&self,
|
||||||
mut reader: Reader,
|
mut reader: Reader,
|
||||||
|
@ -93,7 +97,7 @@ impl Store for FileStore {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(bytes))]
|
#[tracing::instrument(skip(self, bytes))]
|
||||||
async fn save_bytes(
|
async fn save_bytes(
|
||||||
&self,
|
&self,
|
||||||
bytes: Bytes,
|
bytes: Bytes,
|
||||||
|
@ -113,7 +117,7 @@ impl Store for FileStore {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument]
|
#[tracing::instrument(skip(self))]
|
||||||
async fn to_stream(
|
async fn to_stream(
|
||||||
&self,
|
&self,
|
||||||
identifier: &Arc<str>,
|
identifier: &Arc<str>,
|
||||||
|
@ -137,7 +141,7 @@ impl Store for FileStore {
|
||||||
Ok(Box::pin(stream))
|
Ok(Box::pin(stream))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(writer))]
|
#[tracing::instrument(skip(self, writer))]
|
||||||
async fn read_into<Writer>(
|
async fn read_into<Writer>(
|
||||||
&self,
|
&self,
|
||||||
identifier: &Arc<str>,
|
identifier: &Arc<str>,
|
||||||
|
@ -153,7 +157,7 @@ impl Store for FileStore {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument]
|
#[tracing::instrument(skip(self))]
|
||||||
async fn len(&self, identifier: &Arc<str>) -> Result<u64, StoreError> {
|
async fn len(&self, identifier: &Arc<str>) -> Result<u64, StoreError> {
|
||||||
let path = self.path_from_file_id(identifier);
|
let path = self.path_from_file_id(identifier);
|
||||||
|
|
||||||
|
@ -165,7 +169,7 @@ impl Store for FileStore {
|
||||||
Ok(len)
|
Ok(len)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument]
|
#[tracing::instrument(skip(self))]
|
||||||
async fn remove(&self, identifier: &Arc<str>) -> Result<(), StoreError> {
|
async fn remove(&self, identifier: &Arc<str>) -> Result<(), StoreError> {
|
||||||
let path = self.path_from_file_id(identifier);
|
let path = self.path_from_file_id(identifier);
|
||||||
|
|
||||||
|
@ -190,7 +194,11 @@ impl FileStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn file_id_from_path(&self, path: PathBuf) -> Result<Arc<str>, FileError> {
|
fn file_id_from_path(&self, path: PathBuf) -> Result<Arc<str>, FileError> {
|
||||||
path.to_str().ok_or(FileError::StringError).map(Into::into)
|
path.strip_prefix(&self.root_dir)
|
||||||
|
.map_err(|_| FileError::PrefixError)?
|
||||||
|
.to_str()
|
||||||
|
.ok_or(FileError::StringError)
|
||||||
|
.map(Into::into)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn path_from_file_id(&self, file_id: &Arc<str>) -> PathBuf {
|
fn path_from_file_id(&self, file_id: &Arc<str>) -> PathBuf {
|
||||||
|
@ -219,6 +227,7 @@ impl FileStore {
|
||||||
Ok(target_path.join(filename))
|
Ok(target_path.join(filename))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "DEBUG", skip(self, path), fields(path = ?path.as_ref()))]
|
||||||
async fn safe_remove_file<P: AsRef<Path>>(&self, path: P) -> Result<(), FileError> {
|
async fn safe_remove_file<P: AsRef<Path>>(&self, path: P) -> Result<(), FileError> {
|
||||||
tokio::fs::remove_file(&path).await?;
|
tokio::fs::remove_file(&path).await?;
|
||||||
self.try_remove_parents(path.as_ref()).await;
|
self.try_remove_parents(path.as_ref()).await;
|
||||||
|
|
Loading…
Reference in a new issue