endpoint: refactor actor_cache into separate file

This commit is contained in:
Astro 2023-10-28 01:29:18 +02:00
parent cfdd55facf
commit 8c431650e2
4 changed files with 92 additions and 87 deletions

88
src/actor_cache.rs Normal file
View file

@ -0,0 +1,88 @@
use std::{
collections::HashMap,
sync::Arc,
time::Instant,
};
use futures::Future;
use lru::LruCache;
use tokio::sync::{Mutex, oneshot};
use crate::activitypub::Actor;
use crate::error::Error;
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct ActorCache {
cache: Arc<Mutex<LruCache<String, Result<Arc<Actor>, Error>>>>,
queues: Arc<Mutex<HashMap<String, Vec<oneshot::Sender<Result<Arc<Actor>, Error>>>>>>,
}
impl Default for ActorCache {
fn default() -> Self {
ActorCache {
cache: Arc::new(Mutex::new(
LruCache::new(std::num::NonZeroUsize::new(64).unwrap())
)),
queues: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl ActorCache {
pub async fn get<F, R>(&self, k: &str, f: F) -> Result<Arc<Actor>, Error>
where
F: (FnOnce() -> R) + Send + 'static,
R: Future<Output = Result<Actor, Error>> + Send,
{
let begin = Instant::now();
let mut lru = self.cache.lock().await;
if let Some(v) = lru.get(k) {
return v.clone();
}
drop(lru);
let (tx, rx) = oneshot::channel();
let mut new = false;
let mut queues = self.queues.lock().await;
let queue = queues.entry(k.to_string())
.or_insert_with(|| {
new = true;
Vec::with_capacity(1)
});
queue.push(tx);
drop(queues);
if new {
let k = k.to_string();
let cache = self.cache.clone();
let queues = self.queues.clone();
tokio::spawn(async move {
let result = f().await
.map(Arc::new);
let mut lru = cache.lock().await;
lru.put(k.clone(), result.clone());
drop(lru);
let mut queues = queues.lock().await;
let queue = queues.remove(&k)
.expect("queues.remove");
let queue_len = queue.len();
let mut notified = 0usize;
for tx in queue.into_iter() {
if let Ok(()) = tx.send(result.clone()) {
notified += 1;
}
}
let end = Instant::now();
tracing::info!("Notified {notified}/{queue_len} endpoint verifications for actor {k} in {:?}", end - begin);
});
}
rx.await.unwrap()
}
}

View file

@ -1,8 +1,4 @@
use std::{ use std::sync::Arc;
collections::HashMap,
sync::Arc,
time::Instant,
};
use axum::{ use axum::{
async_trait, async_trait,
@ -10,93 +6,13 @@ use axum::{
extract::{FromRef, FromRequest}, extract::{FromRef, FromRequest},
http::{header::CONTENT_TYPE, Request, StatusCode}, BoxError, http::{header::CONTENT_TYPE, Request, StatusCode}, BoxError,
}; };
use futures::Future;
use http_digest_headers::DigestHeader; use http_digest_headers::DigestHeader;
use sigh::{Signature, PublicKey, Key, PrivateKey}; use sigh::{Signature, PublicKey, Key, PrivateKey};
use lru::LruCache;
use tokio::sync::{Mutex, oneshot};
use crate::fetch::authorized_fetch; use crate::fetch::authorized_fetch;
use crate::activitypub::Actor; use crate::activitypub::Actor;
use crate::error::Error; use crate::error::Error;
use crate::actor_cache::ActorCache;
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct ActorCache {
cache: Arc<Mutex<LruCache<String, Result<Arc<Actor>, Error>>>>,
queues: Arc<Mutex<HashMap<String, Vec<oneshot::Sender<Result<Arc<Actor>, Error>>>>>>,
}
impl Default for ActorCache {
fn default() -> Self {
ActorCache {
cache: Arc::new(Mutex::new(
LruCache::new(std::num::NonZeroUsize::new(64).unwrap())
)),
queues: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl ActorCache {
pub async fn get<F, R>(&self, k: &str, f: F) -> Result<Arc<Actor>, Error>
where
F: (FnOnce() -> R) + Send + 'static,
R: Future<Output = Result<Actor, Error>> + Send,
{
let begin = Instant::now();
let mut lru = self.cache.lock().await;
if let Some(v) = lru.get(k) {
return v.clone();
}
drop(lru);
let (tx, rx) = oneshot::channel();
let mut new = false;
let mut queues = self.queues.lock().await;
let queue = queues.entry(k.to_string())
.or_insert_with(|| {
new = true;
Vec::with_capacity(1)
});
queue.push(tx);
drop(queues);
if new {
let k = k.to_string();
let cache = self.cache.clone();
let queues = self.queues.clone();
tokio::spawn(async move {
let result = f().await
.map(Arc::new);
let mut lru = cache.lock().await;
lru.put(k.clone(), result.clone());
drop(lru);
let mut queues = queues.lock().await;
let queue = queues.remove(&k)
.expect("queues.remove");
let queue_len = queue.len();
let mut notified = 0usize;
for tx in queue.into_iter() {
if let Ok(()) = tx.send(result.clone()) {
notified += 1;
}
}
let end = Instant::now();
tracing::info!("Notified {notified}/{queue_len} endpoint verifications for actor {k} in {:?}", end - begin);
});
}
rx.await.unwrap()
}
}
const SIGNATURE_HEADERS_REQUIRED: &[&str] = &[ const SIGNATURE_HEADERS_REQUIRED: &[&str] = &[

View file

@ -25,6 +25,7 @@ mod send;
mod stream; mod stream;
mod relay; mod relay;
mod activitypub; mod activitypub;
mod actor_cache;
mod endpoint; mod endpoint;
use state::State; use state::State;

View file

@ -3,7 +3,7 @@ use axum::{
}; };
use sigh::{PrivateKey, PublicKey}; use sigh::{PrivateKey, PublicKey};
use std::sync::Arc; use std::sync::Arc;
use crate::{config::Config, db::Database, endpoint::ActorCache}; use crate::{config::Config, db::Database, actor_cache::ActorCache};
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {