diff --git a/Cargo.lock b/Cargo.lock index ee05854..28bebda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -678,6 +678,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "dashmap" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" +dependencies = [ + "cfg-if", + "num_cpus", +] + [[package]] name = "der" version = "0.4.4" @@ -1949,13 +1959,13 @@ dependencies = [ "actix-webfinger", "ammonia", "anyhow", - "async-mutex", "async-rwlock", "awc", "background-jobs", "base64", "chrono", "config", + "dashmap", "dotenv", "futures-util", "http-signature-normalization-actix", diff --git a/Cargo.toml b/Cargo.toml index 082fe5d..cfa7e25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,12 +20,12 @@ actix-webfinger = "0.4.0-beta.3" activitystreams = "0.7.0-alpha.10" activitystreams-ext = "0.1.0-alpha.2" ammonia = "3.1.0" -async-mutex = "1.0.1" async-rwlock = "1.3.0" awc = { version = "3.0.0-beta.6", default-features = false, features = ["rustls"] } base64 = "0.13" chrono = "0.4.19" config = "0.11.0" +dashmap = "4.0.2" dotenv = "0.15.0" futures-util = "0.3.17" lru = "0.7.0" diff --git a/src/data/actor.rs b/src/data/actor.rs index a1b31f0..c9000fb 100644 --- a/src/data/actor.rs +++ b/src/data/actor.rs @@ -17,10 +17,7 @@ pub enum MaybeCached { impl MaybeCached { pub(crate) fn is_cached(&self) -> bool { - match self { - MaybeCached::Cached(_) => true, - _ => false, - } + matches!(self, MaybeCached::Cached(_)) } pub(crate) fn into_inner(self) -> T { @@ -74,16 +71,16 @@ impl ActorCache { let input_domain = id.domain().ok_or(ErrorKind::MissingDomain)?; let accepted_actor_id = accepted_actor - .id(&input_domain)? + .id(input_domain)? .ok_or(ErrorKind::MissingId)?; let inbox = get_inbox(&accepted_actor)?.clone(); let actor = Actor { - id: accepted_actor_id.clone().into(), + id: accepted_actor_id.clone(), public_key: accepted_actor.ext_one.public_key.public_key_pem, public_key_id: accepted_actor.ext_one.public_key.id, - inbox: inbox.into(), + inbox, saved_at: SystemTime::now(), }; diff --git a/src/data/node.rs b/src/data/node.rs index fb3fed8..6808468 100644 --- a/src/data/node.rs +++ b/src/data/node.rs @@ -46,9 +46,9 @@ impl NodeCache { .await? .into_iter() .map(move |actor_id| { - let info = infos.get(&actor_id).map(|info| info.clone()); - let instance = instances.get(&actor_id).map(|instance| instance.clone()); - let contact = contacts.get(&actor_id).map(|contact| contact.clone()); + let info = infos.get(&actor_id).cloned(); + let instance = instances.get(&actor_id).cloned(); + let contact = contacts.get(&actor_id).cloned(); Node::new(actor_id) .info(info) diff --git a/src/db.rs b/src/db.rs index a625e93..2da98b1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -107,10 +107,7 @@ impl std::fmt::Debug for Contact { impl Inner { fn connected_by_domain(&self, domains: &[String]) -> impl DoubleEndedIterator { - let reversed: Vec<_> = domains - .into_iter() - .map(|s| domain_key(s.as_str())) - .collect(); + let reversed: Vec<_> = domains.iter().map(|s| domain_key(s.as_str())).collect(); self.connected_actor_ids .iter() @@ -147,7 +144,7 @@ impl Inner { .filter_map(url_from_ivec) } - fn connected_actors<'a>(&'a self) -> impl DoubleEndedIterator + 'a { + fn connected_actors(&self) -> impl DoubleEndedIterator + '_ { self.connected_actor_ids .iter() .values() @@ -159,7 +156,7 @@ impl Inner { }) } - fn connected_info<'a>(&'a self) -> impl DoubleEndedIterator + 'a { + fn connected_info(&self) -> impl DoubleEndedIterator + '_ { self.connected_actor_ids .iter() .values() @@ -173,7 +170,7 @@ impl Inner { }) } - fn connected_instance<'a>(&'a self) -> impl DoubleEndedIterator + 'a { + fn connected_instance(&self) -> impl DoubleEndedIterator + '_ { self.connected_actor_ids .iter() .values() @@ -187,7 +184,7 @@ impl Inner { }) } - fn connected_contact<'a>(&'a self) -> impl DoubleEndedIterator + 'a { + fn connected_contact(&self) -> impl DoubleEndedIterator + '_ { self.connected_actor_ids .iter() .values() diff --git a/src/error.rs b/src/error.rs index 0d8c98a..6c3ee0c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -17,7 +17,7 @@ pub(crate) struct Error { impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}\n", self.kind)?; + writeln!(f, "{}", self.kind)?; std::fmt::Display::fmt(&self.context, f) } } @@ -171,7 +171,7 @@ impl ResponseError for Error { serde_json::to_string(&serde_json::json!({ "error": self.kind.to_string(), })) - .unwrap_or("{}".to_string()), + .unwrap_or_else(|_| "{}".to_string()), ) } } diff --git a/src/jobs/contact.rs b/src/jobs/contact.rs index edb89f7..62714f3 100644 --- a/src/jobs/contact.rs +++ b/src/jobs/contact.rs @@ -46,7 +46,7 @@ impl QueryContact { .await?; let (username, display_name, url, avatar) = - to_contact(contact).ok_or_else(|| ErrorKind::Extract("contact"))?; + to_contact(contact).ok_or(ErrorKind::Extract("contact"))?; state .node_cache diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index cf0030c..5721423 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -51,18 +51,18 @@ impl QueryInstance { .await?; let description = if instance.description.is_empty() { - instance.short_description.unwrap_or(String::new()) + instance.short_description.unwrap_or_default() } else { instance.description }; if let Some(mut contact) = instance.contact { let uuid = if let Some(uuid) = state.media.get_uuid(contact.avatar.clone()).await? { - contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).into(); + contact.avatar = state.config.generate_url(UrlKind::Media(uuid)); uuid } else { let uuid = state.media.store_url(contact.avatar.clone()).await?; - contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).into(); + contact.avatar = state.config.generate_url(UrlKind::Media(uuid)); uuid }; diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index b46a85e..1b22710 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -15,7 +15,6 @@ pub(crate) use self::{ use crate::{ config::Config, data::{ActorCache, MediaCache, NodeCache, State}, - db::Db, error::{Error, ErrorKind}, jobs::process_listeners::Listeners, requests::Requests, @@ -24,7 +23,6 @@ use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig}; use std::time::Duration; pub(crate) fn create_workers( - db: Db, state: State, actors: ActorCache, media: MediaCache, @@ -32,7 +30,6 @@ pub(crate) fn create_workers( ) -> JobServer { let queue_handle = WorkerConfig::new(Storage::new(), move |queue_handle| { JobState::new( - db.clone(), state.clone(), actors.clone(), JobServer::new(queue_handle), @@ -62,7 +59,6 @@ pub(crate) fn create_workers( #[derive(Clone, Debug)] pub(crate) struct JobState { - db: Db, requests: Requests, state: State, actors: ActorCache, @@ -87,7 +83,6 @@ impl std::fmt::Debug for JobServer { impl JobState { fn new( - db: Db, state: State, actors: ActorCache, job_server: JobServer, @@ -97,7 +92,6 @@ impl JobState { JobState { requests: state.requests(&config), node_cache: state.node_cache(), - db, actors, config, media, diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index ea9d78b..7ffd732 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -66,7 +66,8 @@ impl QueryNodeinfo { if let Some(contact_id) = accounts.get(0) { state .job_server - .queue(QueryContact::new(self.actor_id, contact_id.clone())).await?; + .queue(QueryContact::new(self.actor_id, contact_id.clone())) + .await?; } } @@ -129,10 +130,7 @@ enum MaybeSupported { impl MaybeSupported { fn is_supported(&self) -> bool { - match self { - MaybeSupported::Supported(_) => true, - _ => false, - } + matches!(self, MaybeSupported::Supported(_)) } } diff --git a/src/main.rs b/src/main.rs index 3b36445..d5970cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -97,13 +97,7 @@ async fn main() -> Result<(), anyhow::Error> { let state = State::build(db.clone()).await?; let actors = ActorCache::new(db.clone()); - let job_server = create_workers( - db.clone(), - state.clone(), - actors.clone(), - media.clone(), - config.clone(), - ); + let job_server = create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); let bind_address = config.bind_address(); HttpServer::new(move || { diff --git a/src/middleware/verifier.rs b/src/middleware/verifier.rs index ccc388b..6ac45f8 100644 --- a/src/middleware/verifier.rs +++ b/src/middleware/verifier.rs @@ -65,7 +65,7 @@ impl MyVerify { .fetch::(public_key_id.as_str()) .await? .actor_id() - .ok_or_else(|| ErrorKind::MissingId)? + .ok_or(ErrorKind::MissingId)? }; // Previously we verified the sig from an actor's local cache @@ -90,14 +90,14 @@ enum PublicKeyResponse { #[allow(dead_code)] public_key_pem: String, }, - Actor(AcceptedActors), + Actor(Box), } impl PublicKeyResponse { fn actor_id(&self) -> Option { match self { PublicKeyResponse::PublicKey { owner, .. } => Some(owner.clone()), - PublicKeyResponse::Actor(actor) => actor.id_unchecked().map(|url| url.clone()), + PublicKeyResponse::Actor(actor) => actor.id_unchecked().cloned(), } } } diff --git a/src/requests.rs b/src/requests.rs index 8362ee3..1498183 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -1,16 +1,14 @@ use crate::error::{Error, ErrorKind}; use activitystreams::url::Url; use actix_web::{http::header::Date, web::Bytes}; -use async_mutex::Mutex; -use async_rwlock::RwLock; use awc::Client; use chrono::{DateTime, Utc}; +use dashmap::DashMap; use http_signature_normalization_actix::prelude::*; use rsa::{hash::Hash, padding::PaddingScheme, RsaPrivateKey}; use sha2::{Digest, Sha256}; use std::{ cell::RefCell, - collections::HashMap, rc::Rc, sync::{ atomic::{AtomicUsize, Ordering}, @@ -23,7 +21,7 @@ use tracing_awc::Propagate; #[derive(Clone)] pub(crate) struct Breakers { - inner: Arc>>>>, + inner: Arc>, } impl std::fmt::Debug for Breakers { @@ -33,10 +31,10 @@ impl std::fmt::Debug for Breakers { } impl Breakers { - async fn should_try(&self, url: &Url) -> bool { + fn should_try(&self, url: &Url) -> bool { if let Some(domain) = url.domain() { - if let Some(breaker) = self.inner.read().await.get(domain) { - breaker.lock().await.should_try() + if let Some(breaker) = self.inner.get(domain) { + breaker.should_try() } else { true } @@ -45,15 +43,11 @@ impl Breakers { } } - async fn fail(&self, url: &Url) { + fn fail(&self, url: &Url) { if let Some(domain) = url.domain() { let should_write = { - let read = self.inner.read().await; - - if let Some(breaker) = read.get(domain) { - let owned_breaker = Arc::clone(&breaker); - drop(breaker); - owned_breaker.lock().await.fail(); + if let Some(mut breaker) = self.inner.get_mut(domain) { + breaker.fail(); false } else { true @@ -61,24 +55,17 @@ impl Breakers { }; if should_write { - let mut hm = self.inner.write().await; - let breaker = hm - .entry(domain.to_owned()) - .or_insert(Arc::new(Mutex::new(Breaker::default()))); - breaker.lock().await.fail(); + let mut breaker = self.inner.entry(domain.to_owned()).or_default(); + breaker.fail(); } } } - async fn succeed(&self, url: &Url) { + fn succeed(&self, url: &Url) { if let Some(domain) = url.domain() { let should_write = { - let read = self.inner.read().await; - - if let Some(breaker) = read.get(domain) { - let owned_breaker = Arc::clone(&breaker); - drop(breaker); - owned_breaker.lock().await.succeed(); + if let Some(mut breaker) = self.inner.get_mut(domain) { + breaker.succeed(); false } else { true @@ -86,11 +73,8 @@ impl Breakers { }; if should_write { - let mut hm = self.inner.write().await; - let breaker = hm - .entry(domain.to_owned()) - .or_insert(Arc::new(Mutex::new(Breaker::default()))); - breaker.lock().await.succeed(); + let mut breaker = self.inner.entry(domain.to_owned()).or_default(); + breaker.succeed(); } } } @@ -99,7 +83,7 @@ impl Breakers { impl Default for Breakers { fn default() -> Self { Breakers { - inner: Arc::new(RwLock::new(HashMap::new())), + inner: Arc::new(DashMap::new()), } } } @@ -233,7 +217,7 @@ impl Requests { { let parsed_url = url.parse::()?; - if !self.breakers.should_try(&parsed_url).await { + if !self.breakers.should_try(&parsed_url) { return Err(ErrorKind::Breaker.into()); } @@ -256,7 +240,7 @@ impl Requests { if res.is_err() { self.count_err(); - self.breakers.fail(&parsed_url).await; + self.breakers.fail(&parsed_url); } let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?; @@ -272,12 +256,12 @@ impl Requests { } } - self.breakers.fail(&parsed_url).await; + self.breakers.fail(&parsed_url); return Err(ErrorKind::Status(url.to_string(), res.status()).into()); } - self.breakers.succeed(&parsed_url).await; + self.breakers.succeed(&parsed_url); let body = res .body() @@ -291,7 +275,7 @@ impl Requests { pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> { let parsed_url = url.parse::()?; - if !self.breakers.should_try(&parsed_url).await { + if !self.breakers.should_try(&parsed_url) { return Err(ErrorKind::Breaker.into()); } @@ -314,7 +298,7 @@ impl Requests { .await; if res.is_err() { - self.breakers.fail(&parsed_url).await; + self.breakers.fail(&parsed_url); self.count_err(); } @@ -341,12 +325,12 @@ impl Requests { } } - self.breakers.fail(&parsed_url).await; + self.breakers.fail(&parsed_url); return Err(ErrorKind::Status(url.to_string(), res.status()).into()); } - self.breakers.succeed(&parsed_url).await; + self.breakers.succeed(&parsed_url); let bytes = match res.body().limit(1024 * 1024 * 4).await { Err(e) => { @@ -366,7 +350,7 @@ impl Requests { where T: serde::ser::Serialize + std::fmt::Debug, { - if !self.breakers.should_try(&inbox).await { + if !self.breakers.should_try(&inbox) { return Err(ErrorKind::Breaker.into()); } @@ -393,7 +377,7 @@ impl Requests { if res.is_err() { self.count_err(); - self.breakers.fail(&inbox).await; + self.breakers.fail(&inbox); } let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?; @@ -409,11 +393,11 @@ impl Requests { } } - self.breakers.fail(&inbox).await; + self.breakers.fail(&inbox); return Err(ErrorKind::Status(inbox.to_string(), res.status()).into()); } - self.breakers.succeed(&inbox).await; + self.breakers.succeed(&inbox); Ok(()) } diff --git a/src/routes/actor.rs b/src/routes/actor.rs index 3f895b7..b97a608 100644 --- a/src/routes/actor.rs +++ b/src/routes/actor.rs @@ -24,8 +24,8 @@ pub(crate) async fn route( ApActor::new(config.generate_url(UrlKind::Inbox), Application::new()), PublicKey { public_key: PublicKeyInner { - id: config.generate_url(UrlKind::MainKey).into(), - owner: config.generate_url(UrlKind::Actor).into(), + id: config.generate_url(UrlKind::MainKey), + owner: config.generate_url(UrlKind::Actor), public_key_pem: state.public_key.to_public_key_pem()?, }, }, diff --git a/src/routes/nodeinfo.rs b/src/routes/nodeinfo.rs index 9a0bcd7..ec15e08 100644 --- a/src/routes/nodeinfo.rs +++ b/src/routes/nodeinfo.rs @@ -54,13 +54,13 @@ pub(crate) async fn route( .db .inboxes() .await - .unwrap_or(vec![]) + .unwrap_or_default() .iter() .filter_map(|listener| listener.domain()) .map(|s| s.to_owned()) .collect(), blocks: if config.publish_blocks() { - Some(state.db.blocks().await.unwrap_or(vec![])) + Some(state.db.blocks().await.unwrap_or_default()) } else { None }, diff --git a/src/routes/statics.rs b/src/routes/statics.rs index d79bb12..31e37c1 100644 --- a/src/routes/statics.rs +++ b/src/routes/statics.rs @@ -4,6 +4,7 @@ use actix_web::{ web, HttpResponse, }; +#[allow(clippy::async_yields_async)] #[tracing::instrument(name = "Statistics")] pub(crate) async fn route(filename: web::Path) -> HttpResponse { if let Some(data) = StaticFile::get(&filename.into_inner()) { diff --git a/templates/admin.rs.html b/templates/admin.rs.html index 53d5bce..21b8c42 100644 --- a/templates/admin.rs.html +++ b/templates/admin.rs.html @@ -11,10 +11,8 @@

@contact.display_name

- @if let Some(domain) = base.domain() { -

@@@contact.username@@@domain

- } else { -

@@@contact.username

- } +

+ @@@contact.username@if let Some(domain) = base.domain() {@@@domain} +

diff --git a/templates/index.rs.html b/templates/index.rs.html index 20aa855..8e16464 100644 --- a/templates/index.rs.html +++ b/templates/index.rs.html @@ -36,7 +36,7 @@ } else { @if let Some(inf) = node.info.as_ref() {
  • - @:info(&inf, &node.base) + @:info(inf, &node.base)
  • } }