Pass clippy

This commit is contained in:
asonix 2023-09-03 16:59:41 -05:00
parent 443d327edf
commit 94cb2a9ef3
6 changed files with 470 additions and 25 deletions

View file

@ -64,7 +64,7 @@ pub(crate) async fn cleanup_alias(
alias: Alias,
token: DeleteToken,
) -> Result<(), Error> {
let job = serde_json::to_value(&Cleanup::Alias {
let job = serde_json::to_value(Cleanup::Alias {
alias: Serde::new(alias),
token: Serde::new(token),
})
@ -74,7 +74,7 @@ pub(crate) async fn cleanup_alias(
}
pub(crate) async fn cleanup_hash(repo: &Arc<dyn FullRepo>, hash: Hash) -> Result<(), Error> {
let job = serde_json::to_value(&Cleanup::Hash { hash }).map_err(UploadError::PushJob)?;
let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
Ok(())
}
@ -83,7 +83,7 @@ pub(crate) async fn cleanup_identifier(
repo: &Arc<dyn FullRepo>,
identifier: &Arc<str>,
) -> Result<(), Error> {
let job = serde_json::to_value(&Cleanup::Identifier {
let job = serde_json::to_value(Cleanup::Identifier {
identifier: identifier.to_string(),
})
.map_err(UploadError::PushJob)?;
@ -97,25 +97,25 @@ async fn cleanup_variants(
variant: Option<String>,
) -> Result<(), Error> {
let job =
serde_json::to_value(&Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?;
serde_json::to_value(Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
Ok(())
}
pub(crate) async fn cleanup_outdated_proxies(repo: &Arc<dyn FullRepo>) -> Result<(), Error> {
let job = serde_json::to_value(&Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?;
let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
Ok(())
}
pub(crate) async fn cleanup_outdated_variants(repo: &Arc<dyn FullRepo>) -> Result<(), Error> {
let job = serde_json::to_value(&Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?;
let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
Ok(())
}
pub(crate) async fn cleanup_all_variants(repo: &Arc<dyn FullRepo>) -> Result<(), Error> {
let job = serde_json::to_value(&Cleanup::AllVariants).map_err(UploadError::PushJob)?;
let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?;
repo.push(CLEANUP_QUEUE, job).await?;
Ok(())
}
@ -126,7 +126,7 @@ pub(crate) async fn queue_ingest(
upload_id: UploadId,
declared_alias: Option<Alias>,
) -> Result<(), Error> {
let job = serde_json::to_value(&Process::Ingest {
let job = serde_json::to_value(Process::Ingest {
identifier: identifier.to_string(),
declared_alias: declared_alias.map(Serde::new),
upload_id: Serde::new(upload_id),
@ -143,7 +143,7 @@ pub(crate) async fn queue_generate(
process_path: PathBuf,
process_args: Vec<String>,
) -> Result<(), Error> {
let job = serde_json::to_value(&Process::Generate {
let job = serde_json::to_value(Process::Generate {
target_format,
source: Serde::new(source),
process_path,

View file

@ -562,10 +562,13 @@ impl HashPage {
}
}
type LocalBoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'a>>;
type PageFuture = LocalBoxFuture<'static, Result<HashPage, RepoError>>;
pub(crate) struct HashStream {
repo: Option<ArcRepo>,
page_future:
Option<std::pin::Pin<Box<dyn std::future::Future<Output = Result<HashPage, RepoError>>>>>,
page_future: Option<PageFuture>,
page: Option<HashPage>,
}

View file

@ -33,7 +33,7 @@ where
fn from_sql(bytes: <B as Backend>::RawValue<'_>) -> diesel::deserialize::Result<Self> {
let s = String::from_sql(bytes)?;
Self::from_base64(s).ok_or_else(|| format!("Invalid base64 hash").into())
Self::from_base64(s).ok_or_else(|| "Invalid base64 hash".to_string().into())
}
}

View file

@ -2,9 +2,15 @@ mod embedded;
mod job_status;
mod schema;
use std::sync::Arc;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use dashmap::{DashMap, DashSet};
use dashmap::DashMap;
use diesel::prelude::*;
use diesel_async::{
pooled_connection::{
@ -18,26 +24,33 @@ use tokio_postgres::{AsyncMessage, Notification};
use url::Url;
use uuid::Uuid;
use crate::{details::Details, error_code::ErrorCode};
use crate::{
details::Details,
error_code::{ErrorCode, OwnedErrorCode},
serde_str::Serde,
stream::LocalBoxStream,
};
use self::job_status::JobStatus;
use super::{
Alias, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo, Hash,
HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, QueueRepo, RepoError, SettingsRepo,
StoreMigrationRepo, UploadId, VariantAlreadyExists,
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo,
FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo,
QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult,
VariantAccessRepo, VariantAlreadyExists,
};
#[derive(Clone)]
pub(crate) struct PostgresRepo {
inner: Arc<Inner>,
#[allow(dead_code)]
notifications: Arc<actix_rt::task::JoinHandle<()>>,
}
struct Inner {
health_count: AtomicU64,
pool: Pool<AsyncPgConnection>,
queue_notifications: DashMap<String, Arc<Notify>>,
completed_uploads: DashSet<UploadId>,
upload_notifier: Notify,
}
@ -54,6 +67,9 @@ async fn delegate_notifications(receiver: flume::Receiver<Notification>, inner:
.or_insert_with(|| Arc::new(Notify::new()))
.notify_waiters();
}
"upload_completion_channel" => {
inner.upload_notifier.notify_waiters();
}
channel => {
tracing::info!(
"Unhandled postgres notification: {channel}: {}",
@ -61,7 +77,6 @@ async fn delegate_notifications(receiver: flume::Receiver<Notification>, inner:
);
}
}
todo!()
}
tracing::warn!("Notification delegator shutting down");
@ -95,6 +110,12 @@ pub(crate) enum PostgresError {
#[error("Error deserializing details")]
DeserializeDetails(#[source] serde_json::Error),
#[error("Error serializing upload result")]
SerializeUploadResult(#[source] serde_json::Error),
#[error("Error deserializing upload result")]
DeserializeUploadResult(#[source] serde_json::Error),
}
impl PostgresError {
@ -134,9 +155,9 @@ impl PostgresRepo {
.map_err(ConnectPostgresError::BuildPool)?;
let inner = Arc::new(Inner {
health_count: AtomicU64::new(0),
pool,
queue_notifications: DashMap::new(),
completed_uploads: DashSet::new(),
upload_notifier: Notify::new(),
});
@ -829,8 +850,7 @@ impl QueueRepo for PostgresRepo {
return Ok((JobId(job_id), job_json));
}
let _ = actix_rt::time::timeout(std::time::Duration::from_secs(5), notifier.notified())
.await;
let _ = actix_rt::time::timeout(Duration::from_secs(5), notifier.notified()).await;
diesel::sql_query("UNLISTEN queue_status_channel;")
.execute(&mut conn)
@ -959,6 +979,422 @@ impl StoreMigrationRepo for PostgresRepo {
}
}
#[async_trait::async_trait(?Send)]
impl ProxyRepo for PostgresRepo {
async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> {
use schema::proxies::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
diesel::insert_into(proxies)
.values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
Ok(())
}
async fn related(&self, input_url: Url) -> Result<Option<Alias>, RepoError> {
use schema::proxies::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
let opt = proxies
.select(alias)
.filter(url.eq(input_url.as_str()))
.get_result(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?;
Ok(opt)
}
async fn remove_relation(&self, input_alias: Alias) -> Result<(), RepoError> {
use schema::proxies::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
diesel::delete(proxies)
.filter(alias.eq(&input_alias))
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl AliasAccessRepo for PostgresRepo {
async fn set_accessed_alias(
&self,
input_alias: Alias,
timestamp: time::OffsetDateTime,
) -> Result<(), RepoError> {
use schema::proxies::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
let timestamp = to_primitive(timestamp);
diesel::update(proxies)
.filter(alias.eq(&input_alias))
.set(accessed.eq(timestamp))
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
Ok(())
}
async fn alias_accessed_at(
&self,
input_alias: Alias,
) -> Result<Option<time::OffsetDateTime>, RepoError> {
use schema::proxies::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
let opt = proxies
.select(accessed)
.filter(alias.eq(&input_alias))
.get_result::<time::PrimitiveDateTime>(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?
.map(time::PrimitiveDateTime::assume_utc);
Ok(opt)
}
async fn older_aliases(
&self,
timestamp: time::OffsetDateTime,
) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError> {
Ok(Box::pin(PageStream {
inner: self.inner.clone(),
future: None,
current: Vec::new(),
older_than: to_primitive(timestamp),
next: Box::new(|inner, older_than| {
Box::pin(async move {
use schema::proxies::dsl::*;
let mut conn = inner.pool.get().await.map_err(PostgresError::Pool)?;
let vec = proxies
.select((accessed, alias))
.filter(accessed.lt(older_than))
.order(accessed.desc())
.limit(100)
.get_results(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
Ok(vec)
})
}),
}))
}
async fn remove_alias_access(&self, _: Alias) -> Result<(), RepoError> {
// Noop - handled by ProxyRepo::remove_relation
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl VariantAccessRepo for PostgresRepo {
async fn set_accessed_variant(
&self,
input_hash: Hash,
input_variant: String,
input_accessed: time::OffsetDateTime,
) -> Result<(), RepoError> {
use schema::variants::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
let timestamp = to_primitive(input_accessed);
diesel::update(variants)
.filter(hash.eq(&input_hash).and(variant.eq(&input_variant)))
.set(accessed.eq(timestamp))
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
Ok(())
}
async fn variant_accessed_at(
&self,
input_hash: Hash,
input_variant: String,
) -> Result<Option<time::OffsetDateTime>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
let opt = variants
.select(accessed)
.filter(hash.eq(&input_hash).and(variant.eq(&input_variant)))
.get_result(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?
.map(time::PrimitiveDateTime::assume_utc);
Ok(opt)
}
async fn older_variants(
&self,
timestamp: time::OffsetDateTime,
) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError> {
Ok(Box::pin(PageStream {
inner: self.inner.clone(),
future: None,
current: Vec::new(),
older_than: to_primitive(timestamp),
next: Box::new(|inner, older_than| {
Box::pin(async move {
use schema::variants::dsl::*;
let mut conn = inner.pool.get().await.map_err(PostgresError::Pool)?;
let vec = variants
.select((accessed, (hash, variant)))
.filter(accessed.lt(older_than))
.order(accessed.desc())
.limit(100)
.get_results(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
Ok(vec)
})
}),
}))
}
async fn remove_variant_access(&self, _: Hash, _: String) -> Result<(), RepoError> {
// Noop - handled by HashRepo::remove_variant
Ok(())
}
}
#[derive(serde::Deserialize, serde::Serialize)]
enum InnerUploadResult {
Success {
alias: Serde<Alias>,
token: Serde<DeleteToken>,
},
Failure {
message: String,
code: OwnedErrorCode,
},
}
impl From<UploadResult> for InnerUploadResult {
fn from(u: UploadResult) -> Self {
match u {
UploadResult::Success { alias, token } => InnerUploadResult::Success {
alias: Serde::new(alias),
token: Serde::new(token),
},
UploadResult::Failure { message, code } => InnerUploadResult::Failure { message, code },
}
}
}
impl From<InnerUploadResult> for UploadResult {
fn from(i: InnerUploadResult) -> Self {
match i {
InnerUploadResult::Success { alias, token } => UploadResult::Success {
alias: Serde::into_inner(alias),
token: Serde::into_inner(token),
},
InnerUploadResult::Failure { message, code } => UploadResult::Failure { message, code },
}
}
}
#[async_trait::async_trait(?Send)]
impl UploadRepo for PostgresRepo {
async fn create_upload(&self) -> Result<UploadId, RepoError> {
use schema::uploads::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
let uuid = diesel::insert_into(uploads)
.default_values()
.returning(id)
.get_result(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
Ok(UploadId { id: uuid })
}
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
use schema::uploads::dsl::*;
loop {
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
diesel::sql_query("LISTEN upload_completion_channel;")
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
let opt = uploads
.select(result)
.filter(id.eq(upload_id.id))
.get_result(&mut conn)
.await
.optional()
.map_err(PostgresError::Diesel)?
.flatten();
if let Some(upload_result) = opt {
diesel::sql_query("UNLISTEN upload_completion_channel;")
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
let upload_result: InnerUploadResult = serde_json::from_value(upload_result)
.map_err(PostgresError::DeserializeUploadResult)?;
return Ok(upload_result.into());
}
let _ = actix_rt::time::timeout(
Duration::from_secs(2),
self.inner.upload_notifier.notified(),
)
.await;
diesel::sql_query("UNLISTEN upload_completion_channel;")
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
drop(conn);
}
}
async fn claim(&self, upload_id: UploadId) -> Result<(), RepoError> {
use schema::uploads::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
diesel::delete(uploads)
.filter(id.eq(upload_id.id))
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
Ok(())
}
async fn complete_upload(
&self,
upload_id: UploadId,
upload_result: UploadResult,
) -> Result<(), RepoError> {
use schema::uploads::dsl::*;
let mut conn = self.inner.pool.get().await.map_err(PostgresError::Pool)?;
let upload_result: InnerUploadResult = upload_result.into();
let upload_result =
serde_json::to_value(&upload_result).map_err(PostgresError::SerializeUploadResult)?;
diesel::update(uploads)
.filter(id.eq(upload_id.id))
.set(result.eq(upload_result))
.execute(&mut conn)
.await
.map_err(PostgresError::Diesel)?;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl FullRepo for PostgresRepo {
async fn health_check(&self) -> Result<(), RepoError> {
let next = self.inner.health_count.fetch_add(1, Ordering::Relaxed);
self.set("health-value", Arc::from(next.to_be_bytes()))
.await?;
Ok(())
}
}
type LocalBoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'a>>;
type NextFuture<I> = LocalBoxFuture<'static, Result<Vec<(time::PrimitiveDateTime, I)>, RepoError>>;
struct PageStream<I> {
inner: Arc<Inner>,
future: Option<NextFuture<I>>,
current: Vec<I>,
older_than: time::PrimitiveDateTime,
next: Box<dyn Fn(Arc<Inner>, time::PrimitiveDateTime) -> NextFuture<I>>,
}
impl<I> futures_core::Stream for PageStream<I>
where
I: Unpin,
{
type Item = Result<I, RepoError>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
// Pop because we reversed the list
if let Some(alias) = this.current.pop() {
return std::task::Poll::Ready(Some(Ok(alias)));
}
if let Some(future) = this.future.as_mut() {
let res = std::task::ready!(future.as_mut().poll(cx));
this.future.take();
match res {
Ok(page) if page.is_empty() => {
return std::task::Poll::Ready(None);
}
Ok(page) => {
let (mut timestamps, mut aliases): (Vec<_>, Vec<_>) =
page.into_iter().unzip();
// reverse because we use .pop() to get next
aliases.reverse();
this.current = aliases;
this.older_than = timestamps.pop().expect("Verified nonempty");
}
Err(e) => return std::task::Poll::Ready(Some(Err(e))),
}
} else {
let inner = this.inner.clone();
let older_than = this.older_than;
this.future = Some((this.next)(inner, older_than));
}
}
}
}
impl std::fmt::Debug for PostgresRepo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PostgresRepo")

View file

@ -7,7 +7,13 @@ pub(crate) fn migration() -> String {
m.create_table("uploads", |t| {
t.inject_custom(r#""id" UUID PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL UNIQUE"#);
t.add_column("result", types::custom("jsonb"));
t.add_column("result", types::custom("jsonb").nullable(true));
t.add_column(
"created_at",
types::datetime()
.nullable(false)
.default(AutogenFunction::CurrentTimestamp),
);
});
m.inject_custom(

View file

@ -82,7 +82,7 @@ diesel::table! {
diesel::table! {
uploads (id) {
id -> Uuid,
result -> Jsonb,
result -> Nullable<Jsonb>,
}
}