mirror of
https://git.asonix.dog/asonix/pict-rs.git
synced 2025-01-01 07:08:42 +00:00
Share notification map between sled, postgres
This commit is contained in:
parent
d9d5ac5388
commit
74885f2932
9 changed files with 245 additions and 251 deletions
|
@ -13,7 +13,6 @@ use crate::{
|
|||
|
||||
use std::{
|
||||
future::Future,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
@ -52,7 +51,7 @@ impl Drop for MetricsGuard {
|
|||
pub(crate) async fn generate<S: Store + 'static>(
|
||||
state: &State<S>,
|
||||
format: InputProcessableFormat,
|
||||
thumbnail_path: PathBuf,
|
||||
variant: String,
|
||||
thumbnail_args: Vec<String>,
|
||||
original_details: &Details,
|
||||
hash: Hash,
|
||||
|
@ -66,12 +65,10 @@ pub(crate) async fn generate<S: Store + 'static>(
|
|||
|
||||
Ok((original_details.clone(), identifier))
|
||||
} else {
|
||||
let variant = thumbnail_path.to_string_lossy().to_string();
|
||||
|
||||
let mut attempts = 0;
|
||||
let tup = loop {
|
||||
if attempts > 4 {
|
||||
todo!("return error");
|
||||
if attempts > 2 {
|
||||
return Err(UploadError::ProcessTimeout.into());
|
||||
}
|
||||
|
||||
match state
|
||||
|
@ -95,35 +92,35 @@ pub(crate) async fn generate<S: Store + 'static>(
|
|||
.with_poll_timer("heartbeat-future")
|
||||
.await;
|
||||
|
||||
state
|
||||
.repo
|
||||
.notify_variant(hash.clone(), variant.clone())
|
||||
.await?;
|
||||
|
||||
match res {
|
||||
Ok(Ok(tuple)) => break tuple,
|
||||
Ok(Err(e)) | Err(e) => {
|
||||
state
|
||||
.repo
|
||||
.fail_variant(hash.clone(), variant.clone())
|
||||
.await?;
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
Ok(Err(e)) | Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
match state
|
||||
Err(mut entry) => {
|
||||
let notified = entry.notified_timeout(Duration::from_secs(20));
|
||||
|
||||
if let Some(identifier) = state
|
||||
.repo
|
||||
.await_variant(hash.clone(), variant.clone())
|
||||
.variant_identifier(hash.clone(), variant.clone())
|
||||
.await?
|
||||
{
|
||||
Some(identifier) => {
|
||||
let details =
|
||||
crate::ensure_details_identifier(state, &identifier).await?;
|
||||
|
||||
break (details, identifier);
|
||||
}
|
||||
None => {
|
||||
attempts += 1;
|
||||
continue;
|
||||
}
|
||||
drop(notified);
|
||||
let details = crate::ensure_details_identifier(state, &identifier).await?;
|
||||
break (details, identifier);
|
||||
}
|
||||
|
||||
match notified.await {
|
||||
Ok(()) => tracing::debug!("notified"),
|
||||
Err(_) => tracing::warn!("timeout"),
|
||||
}
|
||||
|
||||
attempts += 1;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
50
src/lib.rs
50
src/lib.rs
|
@ -56,7 +56,6 @@ use state::State;
|
|||
use std::{
|
||||
marker::PhantomData,
|
||||
path::Path,
|
||||
path::PathBuf,
|
||||
rc::Rc,
|
||||
sync::{Arc, OnceLock},
|
||||
time::{Duration, SystemTime},
|
||||
|
@ -774,7 +773,7 @@ fn prepare_process(
|
|||
config: &Configuration,
|
||||
operations: Vec<(String, String)>,
|
||||
ext: &str,
|
||||
) -> Result<(InputProcessableFormat, PathBuf, Vec<String>), Error> {
|
||||
) -> Result<(InputProcessableFormat, String, Vec<String>), Error> {
|
||||
let operations = operations
|
||||
.into_iter()
|
||||
.filter(|(k, _)| config.media.filters.contains(&k.to_lowercase()))
|
||||
|
@ -784,10 +783,9 @@ fn prepare_process(
|
|||
.parse::<InputProcessableFormat>()
|
||||
.map_err(|_| UploadError::UnsupportedProcessExtension)?;
|
||||
|
||||
let (thumbnail_path, thumbnail_args) =
|
||||
self::processor::build_chain(&operations, &format.to_string())?;
|
||||
let (variant, thumbnail_args) = self::processor::build_chain(&operations, &format.to_string())?;
|
||||
|
||||
Ok((format, thumbnail_path, thumbnail_args))
|
||||
Ok((format, variant, thumbnail_args))
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Fetching derived details", skip(state))]
|
||||
|
@ -798,7 +796,7 @@ async fn process_details<S: Store>(
|
|||
) -> Result<HttpResponse, Error> {
|
||||
let alias = alias_from_query(source.into(), &state).await?;
|
||||
|
||||
let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?;
|
||||
let (_, variant, _) = prepare_process(&state.config, operations, ext.as_str())?;
|
||||
|
||||
let hash = state
|
||||
.repo
|
||||
|
@ -806,18 +804,16 @@ async fn process_details<S: Store>(
|
|||
.await?
|
||||
.ok_or(UploadError::MissingAlias)?;
|
||||
|
||||
let thumbnail_string = thumbnail_path.to_string_lossy().to_string();
|
||||
|
||||
if !state.config.server.read_only {
|
||||
state
|
||||
.repo
|
||||
.accessed_variant(hash.clone(), thumbnail_string.clone())
|
||||
.accessed_variant(hash.clone(), variant.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
let identifier = state
|
||||
.repo
|
||||
.variant_identifier(hash, thumbnail_string)
|
||||
.variant_identifier(hash, variant)
|
||||
.await?
|
||||
.ok_or(UploadError::MissingAlias)?;
|
||||
|
||||
|
@ -856,11 +852,9 @@ async fn process<S: Store + 'static>(
|
|||
) -> Result<HttpResponse, Error> {
|
||||
let alias = proxy_alias_from_query(source.into(), &state).await?;
|
||||
|
||||
let (format, thumbnail_path, thumbnail_args) =
|
||||
let (format, variant, thumbnail_args) =
|
||||
prepare_process(&state.config, operations, ext.as_str())?;
|
||||
|
||||
let path_string = thumbnail_path.to_string_lossy().to_string();
|
||||
|
||||
let (hash, alias, not_found) = if let Some(hash) = state.repo.hash(&alias).await? {
|
||||
(hash, alias, false)
|
||||
} else {
|
||||
|
@ -874,13 +868,13 @@ async fn process<S: Store + 'static>(
|
|||
if !state.config.server.read_only {
|
||||
state
|
||||
.repo
|
||||
.accessed_variant(hash.clone(), path_string.clone())
|
||||
.accessed_variant(hash.clone(), variant.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
let identifier_opt = state
|
||||
.repo
|
||||
.variant_identifier(hash.clone(), path_string)
|
||||
.variant_identifier(hash.clone(), variant.clone())
|
||||
.await?;
|
||||
|
||||
let (details, identifier) = if let Some(identifier) = identifier_opt {
|
||||
|
@ -897,7 +891,7 @@ async fn process<S: Store + 'static>(
|
|||
generate::generate(
|
||||
&state,
|
||||
format,
|
||||
thumbnail_path,
|
||||
variant,
|
||||
thumbnail_args,
|
||||
&original_details,
|
||||
hash,
|
||||
|
@ -933,9 +927,8 @@ async fn process_head<S: Store + 'static>(
|
|||
}
|
||||
};
|
||||
|
||||
let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?;
|
||||
let (_, variant, _) = prepare_process(&state.config, operations, ext.as_str())?;
|
||||
|
||||
let path_string = thumbnail_path.to_string_lossy().to_string();
|
||||
let Some(hash) = state.repo.hash(&alias).await? else {
|
||||
// Invalid alias
|
||||
return Ok(HttpResponse::NotFound().finish());
|
||||
|
@ -944,14 +937,11 @@ async fn process_head<S: Store + 'static>(
|
|||
if !state.config.server.read_only {
|
||||
state
|
||||
.repo
|
||||
.accessed_variant(hash.clone(), path_string.clone())
|
||||
.accessed_variant(hash.clone(), variant.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
let identifier_opt = state
|
||||
.repo
|
||||
.variant_identifier(hash.clone(), path_string)
|
||||
.await?;
|
||||
let identifier_opt = state.repo.variant_identifier(hash.clone(), variant).await?;
|
||||
|
||||
if let Some(identifier) = identifier_opt {
|
||||
let details = ensure_details_identifier(&state, &identifier).await?;
|
||||
|
@ -987,10 +977,9 @@ async fn process_backgrounded<S: Store>(
|
|||
}
|
||||
};
|
||||
|
||||
let (target_format, process_path, process_args) =
|
||||
let (target_format, variant, process_args) =
|
||||
prepare_process(&state.config, operations, ext.as_str())?;
|
||||
|
||||
let path_string = process_path.to_string_lossy().to_string();
|
||||
let Some(hash) = state.repo.hash(&source).await? else {
|
||||
// Invalid alias
|
||||
return Ok(HttpResponse::BadRequest().finish());
|
||||
|
@ -998,7 +987,7 @@ async fn process_backgrounded<S: Store>(
|
|||
|
||||
let identifier_opt = state
|
||||
.repo
|
||||
.variant_identifier(hash.clone(), path_string)
|
||||
.variant_identifier(hash.clone(), variant.clone())
|
||||
.await?;
|
||||
|
||||
if identifier_opt.is_some() {
|
||||
|
@ -1009,14 +998,7 @@ async fn process_backgrounded<S: Store>(
|
|||
return Err(UploadError::ReadOnly.into());
|
||||
}
|
||||
|
||||
queue_generate(
|
||||
&state.repo,
|
||||
target_format,
|
||||
source,
|
||||
process_path,
|
||||
process_args,
|
||||
)
|
||||
.await?;
|
||||
queue_generate(&state.repo, target_format, source, variant, process_args).await?;
|
||||
|
||||
Ok(HttpResponse::Accepted().finish())
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ impl ResizeKind {
|
|||
pub(crate) fn build_chain(
|
||||
args: &[(String, String)],
|
||||
ext: &str,
|
||||
) -> Result<(PathBuf, Vec<String>), Error> {
|
||||
) -> Result<(String, Vec<String>), Error> {
|
||||
fn parse<P: Processor>(key: &str, value: &str) -> Result<Option<P>, Error> {
|
||||
if key == P::NAME {
|
||||
return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?));
|
||||
|
@ -122,7 +122,7 @@ pub(crate) fn build_chain(
|
|||
|
||||
path.push(ext);
|
||||
|
||||
Ok((path, args))
|
||||
Ok((path.to_string_lossy().to_string(), args))
|
||||
}
|
||||
|
||||
impl Processor for Identity {
|
||||
|
|
|
@ -11,7 +11,6 @@ use crate::{
|
|||
|
||||
use std::{
|
||||
ops::Deref,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
@ -62,7 +61,7 @@ enum Process {
|
|||
Generate {
|
||||
target_format: InputProcessableFormat,
|
||||
source: Serde<Alias>,
|
||||
process_path: PathBuf,
|
||||
process_path: String,
|
||||
process_args: Vec<String>,
|
||||
},
|
||||
}
|
||||
|
@ -177,13 +176,13 @@ pub(crate) async fn queue_generate(
|
|||
repo: &ArcRepo,
|
||||
target_format: InputProcessableFormat,
|
||||
source: Alias,
|
||||
process_path: PathBuf,
|
||||
variant: String,
|
||||
process_args: Vec<String>,
|
||||
) -> Result<(), Error> {
|
||||
let job = serde_json::to_value(Process::Generate {
|
||||
target_format,
|
||||
source: Serde::new(source),
|
||||
process_path,
|
||||
process_path: variant,
|
||||
process_args,
|
||||
})
|
||||
.map_err(UploadError::PushJob)?;
|
||||
|
|
|
@ -13,7 +13,7 @@ use crate::{
|
|||
store::Store,
|
||||
UploadQuery,
|
||||
};
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::{JobContext, JobFuture, JobResult};
|
||||
|
||||
|
@ -172,12 +172,12 @@ where
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(state, process_path, process_args))]
|
||||
#[tracing::instrument(skip(state, variant, process_args))]
|
||||
async fn generate<S: Store + 'static>(
|
||||
state: &State<S>,
|
||||
target_format: InputProcessableFormat,
|
||||
source: Alias,
|
||||
process_path: PathBuf,
|
||||
variant: String,
|
||||
process_args: Vec<String>,
|
||||
) -> JobResult {
|
||||
let hash = state
|
||||
|
@ -188,10 +188,9 @@ async fn generate<S: Store + 'static>(
|
|||
.ok_or(UploadError::MissingAlias)
|
||||
.abort()?;
|
||||
|
||||
let path_string = process_path.to_string_lossy().to_string();
|
||||
let identifier_opt = state
|
||||
.repo
|
||||
.variant_identifier(hash.clone(), path_string)
|
||||
.variant_identifier(hash.clone(), variant.clone())
|
||||
.await
|
||||
.retry()?;
|
||||
|
||||
|
@ -205,7 +204,7 @@ async fn generate<S: Store + 'static>(
|
|||
crate::generate::generate(
|
||||
state,
|
||||
target_format,
|
||||
process_path,
|
||||
variant,
|
||||
process_args,
|
||||
&original_details,
|
||||
hash,
|
||||
|
|
27
src/repo.rs
27
src/repo.rs
|
@ -3,6 +3,7 @@ mod delete_token;
|
|||
mod hash;
|
||||
mod metrics;
|
||||
mod migrate;
|
||||
mod notification_map;
|
||||
|
||||
use crate::{
|
||||
config,
|
||||
|
@ -24,6 +25,8 @@ pub(crate) use delete_token::DeleteToken;
|
|||
pub(crate) use hash::Hash;
|
||||
pub(crate) use migrate::{migrate_04, migrate_repo};
|
||||
|
||||
use self::notification_map::NotificationEntry;
|
||||
|
||||
pub(crate) type ArcRepo = Arc<dyn FullRepo>;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -744,17 +747,11 @@ pub(crate) trait VariantRepo: BaseRepo {
|
|||
&self,
|
||||
hash: Hash,
|
||||
variant: String,
|
||||
) -> Result<Result<(), VariantAlreadyExists>, RepoError>;
|
||||
) -> Result<Result<(), NotificationEntry>, RepoError>;
|
||||
|
||||
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
|
||||
|
||||
async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
|
||||
|
||||
async fn await_variant(
|
||||
&self,
|
||||
hash: Hash,
|
||||
variant: String,
|
||||
) -> Result<Option<Arc<str>>, RepoError>;
|
||||
async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
|
||||
|
||||
async fn relate_variant_identifier(
|
||||
&self,
|
||||
|
@ -783,7 +780,7 @@ where
|
|||
&self,
|
||||
hash: Hash,
|
||||
variant: String,
|
||||
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
|
||||
) -> Result<Result<(), NotificationEntry>, RepoError> {
|
||||
T::claim_variant_processing_rights(self, hash, variant).await
|
||||
}
|
||||
|
||||
|
@ -791,16 +788,8 @@ where
|
|||
T::variant_heartbeat(self, hash, variant).await
|
||||
}
|
||||
|
||||
async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
|
||||
T::fail_variant(self, hash, variant).await
|
||||
}
|
||||
|
||||
async fn await_variant(
|
||||
&self,
|
||||
hash: Hash,
|
||||
variant: String,
|
||||
) -> Result<Option<Arc<str>>, RepoError> {
|
||||
T::await_variant(self, hash, variant).await
|
||||
async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
|
||||
T::notify_variant(self, hash, variant).await
|
||||
}
|
||||
|
||||
async fn relate_variant_identifier(
|
||||
|
|
94
src/repo/notification_map.rs
Normal file
94
src/repo/notification_map.rs
Normal file
|
@ -0,0 +1,94 @@
|
|||
use dashmap::DashMap;
|
||||
use std::{
|
||||
future::Future,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::future::WithTimeout;
|
||||
|
||||
type Map = Arc<DashMap<Arc<str>, Weak<NotificationEntryInner>>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(super) struct NotificationMap {
|
||||
map: Map,
|
||||
}
|
||||
|
||||
pub(crate) struct NotificationEntry {
|
||||
inner: Arc<NotificationEntryInner>,
|
||||
}
|
||||
|
||||
struct NotificationEntryInner {
|
||||
key: Arc<str>,
|
||||
map: Map,
|
||||
notify: Notify,
|
||||
armed: AtomicBool,
|
||||
}
|
||||
|
||||
impl NotificationMap {
|
||||
pub(super) fn new() -> Self {
|
||||
Self {
|
||||
map: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn register_interest(&self, key: Arc<str>) -> NotificationEntry {
|
||||
let new_entry = Arc::new(NotificationEntryInner {
|
||||
key: key.clone(),
|
||||
map: self.map.clone(),
|
||||
notify: crate::sync::bare_notify(),
|
||||
armed: AtomicBool::new(false),
|
||||
});
|
||||
|
||||
let mut key_entry = self
|
||||
.map
|
||||
.entry(key)
|
||||
.or_insert_with(|| Arc::downgrade(&new_entry));
|
||||
|
||||
let upgraded_entry = key_entry.value().upgrade();
|
||||
|
||||
let inner = if let Some(entry) = upgraded_entry {
|
||||
entry
|
||||
} else {
|
||||
*key_entry.value_mut() = Arc::downgrade(&new_entry);
|
||||
new_entry
|
||||
};
|
||||
|
||||
inner.armed.store(true, Ordering::Release);
|
||||
|
||||
NotificationEntry { inner }
|
||||
}
|
||||
|
||||
pub(super) fn notify(&self, key: &str) {
|
||||
if let Some(notifier) = self.map.get(key).and_then(|v| v.upgrade()) {
|
||||
notifier.notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NotificationEntry {
|
||||
pub(crate) fn notified_timeout(
|
||||
&mut self,
|
||||
duration: Duration,
|
||||
) -> impl Future<Output = Result<(), tokio::time::error::Elapsed>> + '_ {
|
||||
self.inner.notify.notified().with_timeout(duration)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for NotificationMap {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for NotificationEntryInner {
|
||||
fn drop(&mut self) {
|
||||
if self.armed.load(Ordering::Acquire) {
|
||||
self.map.remove(&self.key);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,7 +21,6 @@ use diesel_async::{
|
|||
bb8::{Pool, PooledConnection, RunError},
|
||||
AsyncDieselConnectionManager, ManagerConfig, PoolError,
|
||||
},
|
||||
scoped_futures::ScopedFutureExt,
|
||||
AsyncConnection, AsyncPgConnection, RunQueryDsl,
|
||||
};
|
||||
use futures_core::Stream;
|
||||
|
@ -45,6 +44,7 @@ use self::job_status::JobStatus;
|
|||
|
||||
use super::{
|
||||
metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard},
|
||||
notification_map::{NotificationEntry, NotificationMap},
|
||||
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo,
|
||||
FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
|
||||
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
|
||||
|
@ -64,32 +64,7 @@ struct Inner {
|
|||
notifier_pool: Pool<AsyncPgConnection>,
|
||||
queue_notifications: DashMap<String, Arc<Notify>>,
|
||||
upload_notifications: DashMap<UploadId, Weak<Notify>>,
|
||||
keyed_notifications: DashMap<Arc<str>, Weak<NotificationEntry>>,
|
||||
}
|
||||
|
||||
struct NotificationEntry {
|
||||
key: Arc<str>,
|
||||
inner: Arc<Inner>,
|
||||
notify: Notify,
|
||||
}
|
||||
|
||||
impl Drop for NotificationEntry {
|
||||
fn drop(&mut self) {
|
||||
self.inner.keyed_notifications.remove(self.key.as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyListener {
|
||||
entry: Arc<NotificationEntry>,
|
||||
}
|
||||
|
||||
impl KeyListener {
|
||||
fn notified_timeout(
|
||||
&self,
|
||||
timeout: Duration,
|
||||
) -> impl Future<Output = Result<(), tokio::time::error::Elapsed>> + '_ {
|
||||
self.entry.notify.notified().with_timeout(timeout)
|
||||
}
|
||||
keyed_notifications: NotificationMap,
|
||||
}
|
||||
|
||||
struct UploadInterest {
|
||||
|
@ -363,7 +338,7 @@ impl PostgresRepo {
|
|||
notifier_pool,
|
||||
queue_notifications: DashMap::new(),
|
||||
upload_notifications: DashMap::new(),
|
||||
keyed_notifications: DashMap::new(),
|
||||
keyed_notifications: NotificationMap::new(),
|
||||
});
|
||||
|
||||
let handle = crate::sync::abort_on_drop(crate::sync::spawn_sendable(
|
||||
|
@ -451,29 +426,8 @@ impl PostgresRepo {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn listen_on_key(&self, key: Arc<str>) -> KeyListener {
|
||||
let new_entry = Arc::new(NotificationEntry {
|
||||
key: key.clone(),
|
||||
inner: Arc::clone(&self.inner),
|
||||
notify: crate::sync::bare_notify(),
|
||||
});
|
||||
|
||||
let mut entry = self
|
||||
.inner
|
||||
.keyed_notifications
|
||||
.entry(key)
|
||||
.or_insert_with(|| Arc::downgrade(&new_entry));
|
||||
|
||||
let upgraded = entry.value().upgrade();
|
||||
|
||||
let entry = if let Some(existing_entry) = upgraded {
|
||||
existing_entry
|
||||
} else {
|
||||
*entry.value_mut() = Arc::downgrade(&new_entry);
|
||||
new_entry
|
||||
};
|
||||
|
||||
KeyListener { entry }
|
||||
fn listen_on_key(&self, key: Arc<str>) -> NotificationEntry {
|
||||
self.inner.keyed_notifications.register_interest(key)
|
||||
}
|
||||
|
||||
async fn register_interest(&self) -> Result<(), PostgresError> {
|
||||
|
@ -658,14 +612,7 @@ impl<'a> UploadNotifierState<'a> {
|
|||
|
||||
impl<'a> KeyedNotifierState<'a> {
|
||||
fn handle(&self, key: &str) {
|
||||
if let Some(notification_entry) = self
|
||||
.inner
|
||||
.keyed_notifications
|
||||
.get(key)
|
||||
.and_then(|weak| weak.upgrade())
|
||||
{
|
||||
notification_entry.notify.notify_waiters();
|
||||
}
|
||||
self.inner.keyed_notifications.notify(key);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1150,20 +1097,23 @@ impl VariantRepo for PostgresRepo {
|
|||
&self,
|
||||
hash: Hash,
|
||||
variant: String,
|
||||
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
|
||||
) -> Result<Result<(), NotificationEntry>, RepoError> {
|
||||
let key = Arc::from(format!("{}{variant}", hash.to_base64()));
|
||||
let entry = self.listen_on_key(Arc::clone(&key));
|
||||
|
||||
self.register_interest().await?;
|
||||
|
||||
if self
|
||||
.variant_identifier(hash.clone(), variant.clone())
|
||||
.await?
|
||||
.is_some()
|
||||
{
|
||||
return Ok(Err(VariantAlreadyExists));
|
||||
return Ok(Err(entry));
|
||||
}
|
||||
|
||||
let key = format!("{}{variant}", hash.to_base64());
|
||||
|
||||
match self.insert_keyed_notifier(&key).await? {
|
||||
Ok(()) => Ok(Ok(())),
|
||||
Err(AlreadyInserted) => Ok(Err(VariantAlreadyExists)),
|
||||
Err(AlreadyInserted) => Ok(Err(entry)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1177,40 +1127,12 @@ impl VariantRepo for PostgresRepo {
|
|||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self))]
|
||||
async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
|
||||
async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
|
||||
let key = format!("{}{variant}", hash.to_base64());
|
||||
|
||||
self.clear_keyed_notifier(key).await.map_err(Into::into)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn await_variant(
|
||||
&self,
|
||||
hash: Hash,
|
||||
variant: String,
|
||||
) -> Result<Option<Arc<str>>, RepoError> {
|
||||
let key = Arc::from(format!("{}{variant}", hash.to_base64()));
|
||||
|
||||
let listener = self.listen_on_key(key);
|
||||
let notified = listener.notified_timeout(Duration::from_secs(5));
|
||||
|
||||
self.register_interest().await?;
|
||||
|
||||
if let Some(identifier) = self
|
||||
.variant_identifier(hash.clone(), variant.clone())
|
||||
.await?
|
||||
{
|
||||
return Ok(Some(identifier));
|
||||
}
|
||||
|
||||
match notified.await {
|
||||
Ok(()) => tracing::debug!("notified"),
|
||||
Err(_) => tracing::trace!("timeout"),
|
||||
}
|
||||
|
||||
self.variant_identifier(hash, variant).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn relate_variant_identifier(
|
||||
&self,
|
||||
|
@ -1218,60 +1140,30 @@ impl VariantRepo for PostgresRepo {
|
|||
input_variant: String,
|
||||
input_identifier: &Arc<str>,
|
||||
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
|
||||
use schema::variants::dsl::*;
|
||||
|
||||
let mut conn = self.get_connection().await?;
|
||||
|
||||
conn.transaction(|conn| {
|
||||
async move {
|
||||
let res = async {
|
||||
use schema::variants::dsl::*;
|
||||
let res = diesel::insert_into(variants)
|
||||
.values((
|
||||
hash.eq(&input_hash),
|
||||
variant.eq(&input_variant),
|
||||
identifier.eq(input_identifier.to_string()),
|
||||
))
|
||||
.execute(&mut conn)
|
||||
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER)
|
||||
.with_timeout(Duration::from_secs(5))
|
||||
.await
|
||||
.map_err(|_| PostgresError::DbTimeout)?;
|
||||
|
||||
diesel::insert_into(variants)
|
||||
.values((
|
||||
hash.eq(&input_hash),
|
||||
variant.eq(&input_variant),
|
||||
identifier.eq(input_identifier.to_string()),
|
||||
))
|
||||
.execute(conn)
|
||||
.with_metrics(
|
||||
crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER,
|
||||
)
|
||||
.with_timeout(Duration::from_secs(5))
|
||||
.await
|
||||
.map_err(|_| PostgresError::DbTimeout)
|
||||
}
|
||||
.await;
|
||||
|
||||
let notification_res = async {
|
||||
use schema::keyed_notifications::dsl::*;
|
||||
|
||||
let input_key = format!("{}{input_variant}", input_hash.to_base64());
|
||||
diesel::delete(keyed_notifications)
|
||||
.filter(key.eq(input_key))
|
||||
.execute(conn)
|
||||
.with_timeout(Duration::from_secs(5))
|
||||
.await
|
||||
.map_err(|_| PostgresError::DbTimeout)
|
||||
}
|
||||
.await;
|
||||
|
||||
match notification_res? {
|
||||
Ok(_) => {}
|
||||
Err(e) => tracing::warn!("Failed to clear notifier: {e}"),
|
||||
}
|
||||
|
||||
match res? {
|
||||
Ok(_) => Ok(Ok(())),
|
||||
Err(diesel::result::Error::DatabaseError(
|
||||
diesel::result::DatabaseErrorKind::UniqueViolation,
|
||||
_,
|
||||
)) => Ok(Err(VariantAlreadyExists)),
|
||||
Err(e) => Err(PostgresError::Diesel(e)),
|
||||
}
|
||||
}
|
||||
.scope_boxed()
|
||||
})
|
||||
.await
|
||||
.map_err(PostgresError::into)
|
||||
match res {
|
||||
Ok(_) => Ok(Ok(())),
|
||||
Err(diesel::result::Error::DatabaseError(
|
||||
diesel::result::DatabaseErrorKind::UniqueViolation,
|
||||
_,
|
||||
)) => Ok(Err(VariantAlreadyExists)),
|
||||
Err(e) => Err(PostgresError::Diesel(e).into()),
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
|
|
|
@ -5,6 +5,7 @@ use crate::{
|
|||
serde_str::Serde,
|
||||
stream::{from_iterator, LocalBoxStream},
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
|
@ -22,6 +23,7 @@ use uuid::Uuid;
|
|||
use super::{
|
||||
hash::Hash,
|
||||
metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard},
|
||||
notification_map::{NotificationEntry, NotificationMap},
|
||||
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details,
|
||||
DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
|
||||
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
|
||||
|
@ -113,6 +115,8 @@ pub(crate) struct SledRepo {
|
|||
migration_identifiers: Tree,
|
||||
cache_capacity: u64,
|
||||
export_path: PathBuf,
|
||||
variant_process_map: DashMap<(Hash, String), time::OffsetDateTime>,
|
||||
notifications: NotificationMap,
|
||||
db: Db,
|
||||
}
|
||||
|
||||
|
@ -156,6 +160,8 @@ impl SledRepo {
|
|||
migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?,
|
||||
cache_capacity,
|
||||
export_path,
|
||||
variant_process_map: DashMap::new(),
|
||||
notifications: NotificationMap::new(),
|
||||
db,
|
||||
})
|
||||
}
|
||||
|
@ -1453,27 +1459,61 @@ impl VariantRepo for SledRepo {
|
|||
&self,
|
||||
hash: Hash,
|
||||
variant: String,
|
||||
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
|
||||
todo!()
|
||||
) -> Result<Result<(), NotificationEntry>, RepoError> {
|
||||
let key = (hash.clone(), variant.clone());
|
||||
let now = time::OffsetDateTime::now_utc();
|
||||
let entry = self
|
||||
.notifications
|
||||
.register_interest(Arc::from(format!("{}{variant}", hash.to_base64())));
|
||||
|
||||
match self.variant_process_map.entry(key.clone()) {
|
||||
dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) => {
|
||||
if occupied_entry
|
||||
.get()
|
||||
.saturating_add(time::Duration::minutes(2))
|
||||
> now
|
||||
{
|
||||
return Ok(Err(entry));
|
||||
}
|
||||
|
||||
occupied_entry.insert(now);
|
||||
}
|
||||
dashmap::mapref::entry::Entry::Vacant(vacant_entry) => {
|
||||
vacant_entry.insert(now);
|
||||
}
|
||||
}
|
||||
|
||||
if self.variant_identifier(hash, variant).await?.is_some() {
|
||||
self.variant_process_map.remove(&key);
|
||||
return Ok(Err(entry));
|
||||
}
|
||||
|
||||
Ok(Ok(()))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self))]
|
||||
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
|
||||
todo!()
|
||||
let key = (hash, variant);
|
||||
let now = time::OffsetDateTime::now_utc();
|
||||
|
||||
if let dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) =
|
||||
self.variant_process_map.entry(key)
|
||||
{
|
||||
occupied_entry.insert(now);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self))]
|
||||
async fn fail_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
|
||||
todo!()
|
||||
}
|
||||
async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
|
||||
let key = (hash.clone(), variant.clone());
|
||||
self.variant_process_map.remove(&key);
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self))]
|
||||
async fn await_variant(
|
||||
&self,
|
||||
hash: Hash,
|
||||
variant: String,
|
||||
) -> Result<Option<Arc<str>>, RepoError> {
|
||||
todo!()
|
||||
let key = format!("{}{variant}", hash.to_base64());
|
||||
self.notifications.notify(&key);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self))]
|
||||
|
@ -1490,7 +1530,7 @@ impl VariantRepo for SledRepo {
|
|||
|
||||
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
|
||||
|
||||
crate::sync::spawn_blocking("sled-io", move || {
|
||||
let out = crate::sync::spawn_blocking("sled-io", move || {
|
||||
hash_variant_identifiers
|
||||
.compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes()))
|
||||
.map(|res| res.map_err(|_| VariantAlreadyExists))
|
||||
|
@ -1498,7 +1538,9 @@ impl VariantRepo for SledRepo {
|
|||
.await
|
||||
.map_err(|_| RepoError::Canceled)?
|
||||
.map_err(SledError::from)
|
||||
.map_err(RepoError::from)
|
||||
.map_err(RepoError::from)?;
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self))]
|
||||
|
|
Loading…
Reference in a new issue