Better handle concurrent proxies

This commit is contained in:
asonix 2024-06-09 14:44:18 -05:00
parent 80af2b67b0
commit 9af7e01b01
4 changed files with 73 additions and 21 deletions

View file

@ -74,7 +74,10 @@ use self::{
middleware::{Deadline, Internal, Log, Metrics, Payload},
migrate_store::migrate_store,
queue::queue_generate,
repo::{sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, Repo, UploadId, UploadResult},
repo::{
sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, ProxyAlreadyExists, Repo, UploadId,
UploadResult,
},
serde_str::Serde,
state::State,
store::{file_store::FileStore, object_store::ObjectStore, Store},
@ -1286,11 +1289,28 @@ async fn proxy_alias_from_query<S: Store + 'static>(
} else if !state.config.server.read_only {
let stream = download_stream(proxy.as_str(), state).await?;
let (alias, _, _) = ingest_inline(stream, state, &Default::default()).await?;
// some time has passed, see if we've proxied elsewhere
if let Some(alias) = state.repo.related(proxy.clone()).await? {
alias
} else {
let (alias, token, _) =
ingest_inline(stream, state, &Default::default()).await?;
state.repo.relate_url(proxy, alias.clone()).await?;
// last check, do we succeed or fail to relate the proxy alias
if let Err(ProxyAlreadyExists) =
state.repo.relate_url(proxy.clone(), alias.clone()).await?
{
queue::cleanup_alias(&state.repo, alias, token).await?;
alias
state
.repo
.related(proxy)
.await?
.ok_or(UploadError::MissingAlias)?
} else {
alias
}
}
} else {
return Err(UploadError::ReadOnly.into());
};

View file

@ -46,6 +46,8 @@ pub(crate) struct HashAlreadyExists;
pub(crate) struct AliasAlreadyExists;
#[derive(Debug)]
pub(crate) struct VariantAlreadyExists;
#[derive(Debug)]
pub(crate) struct ProxyAlreadyExists;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct UploadId {
@ -151,7 +153,11 @@ impl<T> BaseRepo for Arc<T> where T: BaseRepo {}
#[async_trait::async_trait(?Send)]
pub(crate) trait ProxyRepo: BaseRepo {
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError>;
async fn relate_url(
&self,
url: Url,
alias: Alias,
) -> Result<Result<(), ProxyAlreadyExists>, RepoError>;
async fn related(&self, url: Url) -> Result<Option<Alias>, RepoError>;
@ -163,7 +169,11 @@ impl<T> ProxyRepo for Arc<T>
where
T: ProxyRepo,
{
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> {
async fn relate_url(
&self,
url: Url,
alias: Alias,
) -> Result<Result<(), ProxyAlreadyExists>, RepoError> {
T::relate_url(self, url, alias).await
}

View file

@ -47,8 +47,8 @@ use super::{
notification_map::{NotificationEntry, NotificationMap},
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo,
FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
ProxyAlreadyExists, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo,
UploadId, UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
};
#[derive(Clone)]
@ -1884,21 +1884,31 @@ impl StoreMigrationRepo for PostgresRepo {
#[async_trait::async_trait(?Send)]
impl ProxyRepo for PostgresRepo {
#[tracing::instrument(level = "debug", skip(self))]
async fn relate_url(&self, input_url: Url, input_alias: Alias) -> Result<(), RepoError> {
async fn relate_url(
&self,
input_url: Url,
input_alias: Alias,
) -> Result<Result<(), ProxyAlreadyExists>, RepoError> {
use schema::proxies::dsl::*;
let mut conn = self.get_connection().await?;
diesel::insert_into(proxies)
let res = diesel::insert_into(proxies)
.values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_PROXY_RELATE_URL)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
.map_err(|_| PostgresError::DbTimeout)?;
Ok(())
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(ProxyAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
}
}
#[tracing::instrument(level = "debug", skip(self))]

View file

@ -26,8 +26,8 @@ use super::{
notification_map::{NotificationEntry, NotificationMap},
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details,
DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
ProxyAlreadyExists, ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo,
UploadId, UploadRepo, UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
};
macro_rules! b {
@ -218,20 +218,32 @@ impl FullRepo for SledRepo {
#[async_trait::async_trait(?Send)]
impl ProxyRepo for SledRepo {
async fn relate_url(&self, url: Url, alias: Alias) -> Result<(), RepoError> {
async fn relate_url(
&self,
url: Url,
alias: Alias,
) -> Result<Result<(), ProxyAlreadyExists>, RepoError> {
let proxy = self.proxy.clone();
let inverse_proxy = self.inverse_proxy.clone();
crate::sync::spawn_blocking("sled-io", move || {
proxy.insert(url.as_str().as_bytes(), alias.to_bytes())?;
inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?;
let res = crate::sync::spawn_blocking("sled-io", move || {
match proxy.compare_and_swap(
url.as_str().as_bytes(),
Option::<sled::IVec>::None,
Some(alias.to_bytes()),
)? {
Ok(_) => {
inverse_proxy.insert(alias.to_bytes(), url.as_str().as_bytes())?;
Ok(()) as Result<(), SledError>
Ok(Ok(())) as Result<Result<(), ProxyAlreadyExists>, SledError>
}
Err(_) => Ok(Err(ProxyAlreadyExists)),
}
})
.await
.map_err(|_| RepoError::Canceled)??;
Ok(())
Ok(res)
}
async fn related(&self, url: Url) -> Result<Option<Alias>, RepoError> {