Clippy lints, dashmap for breakers

This commit is contained in:
Aode (lion) 2021-11-23 16:19:59 -06:00
parent e1c61d5b5f
commit 1dba31e3a0
18 changed files with 74 additions and 101 deletions

12
Cargo.lock generated
View file

@ -678,6 +678,16 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if",
"num_cpus",
]
[[package]] [[package]]
name = "der" name = "der"
version = "0.4.4" version = "0.4.4"
@ -1949,13 +1959,13 @@ dependencies = [
"actix-webfinger", "actix-webfinger",
"ammonia", "ammonia",
"anyhow", "anyhow",
"async-mutex",
"async-rwlock", "async-rwlock",
"awc", "awc",
"background-jobs", "background-jobs",
"base64", "base64",
"chrono", "chrono",
"config", "config",
"dashmap",
"dotenv", "dotenv",
"futures-util", "futures-util",
"http-signature-normalization-actix", "http-signature-normalization-actix",

View file

@ -20,12 +20,12 @@ actix-webfinger = "0.4.0-beta.3"
activitystreams = "0.7.0-alpha.10" activitystreams = "0.7.0-alpha.10"
activitystreams-ext = "0.1.0-alpha.2" activitystreams-ext = "0.1.0-alpha.2"
ammonia = "3.1.0" ammonia = "3.1.0"
async-mutex = "1.0.1"
async-rwlock = "1.3.0" async-rwlock = "1.3.0"
awc = { version = "3.0.0-beta.6", default-features = false, features = ["rustls"] } awc = { version = "3.0.0-beta.6", default-features = false, features = ["rustls"] }
base64 = "0.13" base64 = "0.13"
chrono = "0.4.19" chrono = "0.4.19"
config = "0.11.0" config = "0.11.0"
dashmap = "4.0.2"
dotenv = "0.15.0" dotenv = "0.15.0"
futures-util = "0.3.17" futures-util = "0.3.17"
lru = "0.7.0" lru = "0.7.0"

View file

@ -17,10 +17,7 @@ pub enum MaybeCached<T> {
impl<T> MaybeCached<T> { impl<T> MaybeCached<T> {
pub(crate) fn is_cached(&self) -> bool { pub(crate) fn is_cached(&self) -> bool {
match self { matches!(self, MaybeCached::Cached(_))
MaybeCached::Cached(_) => true,
_ => false,
}
} }
pub(crate) fn into_inner(self) -> T { pub(crate) fn into_inner(self) -> T {
@ -74,16 +71,16 @@ impl ActorCache {
let input_domain = id.domain().ok_or(ErrorKind::MissingDomain)?; let input_domain = id.domain().ok_or(ErrorKind::MissingDomain)?;
let accepted_actor_id = accepted_actor let accepted_actor_id = accepted_actor
.id(&input_domain)? .id(input_domain)?
.ok_or(ErrorKind::MissingId)?; .ok_or(ErrorKind::MissingId)?;
let inbox = get_inbox(&accepted_actor)?.clone(); let inbox = get_inbox(&accepted_actor)?.clone();
let actor = Actor { let actor = Actor {
id: accepted_actor_id.clone().into(), id: accepted_actor_id.clone(),
public_key: accepted_actor.ext_one.public_key.public_key_pem, public_key: accepted_actor.ext_one.public_key.public_key_pem,
public_key_id: accepted_actor.ext_one.public_key.id, public_key_id: accepted_actor.ext_one.public_key.id,
inbox: inbox.into(), inbox,
saved_at: SystemTime::now(), saved_at: SystemTime::now(),
}; };

View file

@ -46,9 +46,9 @@ impl NodeCache {
.await? .await?
.into_iter() .into_iter()
.map(move |actor_id| { .map(move |actor_id| {
let info = infos.get(&actor_id).map(|info| info.clone()); let info = infos.get(&actor_id).cloned();
let instance = instances.get(&actor_id).map(|instance| instance.clone()); let instance = instances.get(&actor_id).cloned();
let contact = contacts.get(&actor_id).map(|contact| contact.clone()); let contact = contacts.get(&actor_id).cloned();
Node::new(actor_id) Node::new(actor_id)
.info(info) .info(info)

View file

@ -107,10 +107,7 @@ impl std::fmt::Debug for Contact {
impl Inner { impl Inner {
fn connected_by_domain(&self, domains: &[String]) -> impl DoubleEndedIterator<Item = Url> { fn connected_by_domain(&self, domains: &[String]) -> impl DoubleEndedIterator<Item = Url> {
let reversed: Vec<_> = domains let reversed: Vec<_> = domains.iter().map(|s| domain_key(s.as_str())).collect();
.into_iter()
.map(|s| domain_key(s.as_str()))
.collect();
self.connected_actor_ids self.connected_actor_ids
.iter() .iter()
@ -147,7 +144,7 @@ impl Inner {
.filter_map(url_from_ivec) .filter_map(url_from_ivec)
} }
fn connected_actors<'a>(&'a self) -> impl DoubleEndedIterator<Item = Actor> + 'a { fn connected_actors(&self) -> impl DoubleEndedIterator<Item = Actor> + '_ {
self.connected_actor_ids self.connected_actor_ids
.iter() .iter()
.values() .values()
@ -159,7 +156,7 @@ impl Inner {
}) })
} }
fn connected_info<'a>(&'a self) -> impl DoubleEndedIterator<Item = (Url, Info)> + 'a { fn connected_info(&self) -> impl DoubleEndedIterator<Item = (Url, Info)> + '_ {
self.connected_actor_ids self.connected_actor_ids
.iter() .iter()
.values() .values()
@ -173,7 +170,7 @@ impl Inner {
}) })
} }
fn connected_instance<'a>(&'a self) -> impl DoubleEndedIterator<Item = (Url, Instance)> + 'a { fn connected_instance(&self) -> impl DoubleEndedIterator<Item = (Url, Instance)> + '_ {
self.connected_actor_ids self.connected_actor_ids
.iter() .iter()
.values() .values()
@ -187,7 +184,7 @@ impl Inner {
}) })
} }
fn connected_contact<'a>(&'a self) -> impl DoubleEndedIterator<Item = (Url, Contact)> + 'a { fn connected_contact(&self) -> impl DoubleEndedIterator<Item = (Url, Contact)> + '_ {
self.connected_actor_ids self.connected_actor_ids
.iter() .iter()
.values() .values()

View file

@ -17,7 +17,7 @@ pub(crate) struct Error {
impl std::fmt::Display for Error { impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}\n", self.kind)?; writeln!(f, "{}", self.kind)?;
std::fmt::Display::fmt(&self.context, f) std::fmt::Display::fmt(&self.context, f)
} }
} }
@ -171,7 +171,7 @@ impl ResponseError for Error {
serde_json::to_string(&serde_json::json!({ serde_json::to_string(&serde_json::json!({
"error": self.kind.to_string(), "error": self.kind.to_string(),
})) }))
.unwrap_or("{}".to_string()), .unwrap_or_else(|_| "{}".to_string()),
) )
} }
} }

View file

@ -46,7 +46,7 @@ impl QueryContact {
.await?; .await?;
let (username, display_name, url, avatar) = let (username, display_name, url, avatar) =
to_contact(contact).ok_or_else(|| ErrorKind::Extract("contact"))?; to_contact(contact).ok_or(ErrorKind::Extract("contact"))?;
state state
.node_cache .node_cache

View file

@ -51,18 +51,18 @@ impl QueryInstance {
.await?; .await?;
let description = if instance.description.is_empty() { let description = if instance.description.is_empty() {
instance.short_description.unwrap_or(String::new()) instance.short_description.unwrap_or_default()
} else { } else {
instance.description instance.description
}; };
if let Some(mut contact) = instance.contact { if let Some(mut contact) = instance.contact {
let uuid = if let Some(uuid) = state.media.get_uuid(contact.avatar.clone()).await? { let uuid = if let Some(uuid) = state.media.get_uuid(contact.avatar.clone()).await? {
contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).into(); contact.avatar = state.config.generate_url(UrlKind::Media(uuid));
uuid uuid
} else { } else {
let uuid = state.media.store_url(contact.avatar.clone()).await?; let uuid = state.media.store_url(contact.avatar.clone()).await?;
contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).into(); contact.avatar = state.config.generate_url(UrlKind::Media(uuid));
uuid uuid
}; };

View file

@ -15,7 +15,6 @@ pub(crate) use self::{
use crate::{ use crate::{
config::Config, config::Config,
data::{ActorCache, MediaCache, NodeCache, State}, data::{ActorCache, MediaCache, NodeCache, State},
db::Db,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
jobs::process_listeners::Listeners, jobs::process_listeners::Listeners,
requests::Requests, requests::Requests,
@ -24,7 +23,6 @@ use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig};
use std::time::Duration; use std::time::Duration;
pub(crate) fn create_workers( pub(crate) fn create_workers(
db: Db,
state: State, state: State,
actors: ActorCache, actors: ActorCache,
media: MediaCache, media: MediaCache,
@ -32,7 +30,6 @@ pub(crate) fn create_workers(
) -> JobServer { ) -> JobServer {
let queue_handle = WorkerConfig::new(Storage::new(), move |queue_handle| { let queue_handle = WorkerConfig::new(Storage::new(), move |queue_handle| {
JobState::new( JobState::new(
db.clone(),
state.clone(), state.clone(),
actors.clone(), actors.clone(),
JobServer::new(queue_handle), JobServer::new(queue_handle),
@ -62,7 +59,6 @@ pub(crate) fn create_workers(
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct JobState { pub(crate) struct JobState {
db: Db,
requests: Requests, requests: Requests,
state: State, state: State,
actors: ActorCache, actors: ActorCache,
@ -87,7 +83,6 @@ impl std::fmt::Debug for JobServer {
impl JobState { impl JobState {
fn new( fn new(
db: Db,
state: State, state: State,
actors: ActorCache, actors: ActorCache,
job_server: JobServer, job_server: JobServer,
@ -97,7 +92,6 @@ impl JobState {
JobState { JobState {
requests: state.requests(&config), requests: state.requests(&config),
node_cache: state.node_cache(), node_cache: state.node_cache(),
db,
actors, actors,
config, config,
media, media,

View file

@ -66,7 +66,8 @@ impl QueryNodeinfo {
if let Some(contact_id) = accounts.get(0) { if let Some(contact_id) = accounts.get(0) {
state state
.job_server .job_server
.queue(QueryContact::new(self.actor_id, contact_id.clone())).await?; .queue(QueryContact::new(self.actor_id, contact_id.clone()))
.await?;
} }
} }
@ -129,10 +130,7 @@ enum MaybeSupported<T> {
impl<T> MaybeSupported<T> { impl<T> MaybeSupported<T> {
fn is_supported(&self) -> bool { fn is_supported(&self) -> bool {
match self { matches!(self, MaybeSupported::Supported(_))
MaybeSupported::Supported(_) => true,
_ => false,
}
} }
} }

View file

@ -97,13 +97,7 @@ async fn main() -> Result<(), anyhow::Error> {
let state = State::build(db.clone()).await?; let state = State::build(db.clone()).await?;
let actors = ActorCache::new(db.clone()); let actors = ActorCache::new(db.clone());
let job_server = create_workers( let job_server = create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
db.clone(),
state.clone(),
actors.clone(),
media.clone(),
config.clone(),
);
let bind_address = config.bind_address(); let bind_address = config.bind_address();
HttpServer::new(move || { HttpServer::new(move || {

View file

@ -65,7 +65,7 @@ impl MyVerify {
.fetch::<PublicKeyResponse>(public_key_id.as_str()) .fetch::<PublicKeyResponse>(public_key_id.as_str())
.await? .await?
.actor_id() .actor_id()
.ok_or_else(|| ErrorKind::MissingId)? .ok_or(ErrorKind::MissingId)?
}; };
// Previously we verified the sig from an actor's local cache // Previously we verified the sig from an actor's local cache
@ -90,14 +90,14 @@ enum PublicKeyResponse {
#[allow(dead_code)] #[allow(dead_code)]
public_key_pem: String, public_key_pem: String,
}, },
Actor(AcceptedActors), Actor(Box<AcceptedActors>),
} }
impl PublicKeyResponse { impl PublicKeyResponse {
fn actor_id(&self) -> Option<Url> { fn actor_id(&self) -> Option<Url> {
match self { match self {
PublicKeyResponse::PublicKey { owner, .. } => Some(owner.clone()), PublicKeyResponse::PublicKey { owner, .. } => Some(owner.clone()),
PublicKeyResponse::Actor(actor) => actor.id_unchecked().map(|url| url.clone()), PublicKeyResponse::Actor(actor) => actor.id_unchecked().cloned(),
} }
} }
} }

View file

@ -1,16 +1,14 @@
use crate::error::{Error, ErrorKind}; use crate::error::{Error, ErrorKind};
use activitystreams::url::Url; use activitystreams::url::Url;
use actix_web::{http::header::Date, web::Bytes}; use actix_web::{http::header::Date, web::Bytes};
use async_mutex::Mutex;
use async_rwlock::RwLock;
use awc::Client; use awc::Client;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use dashmap::DashMap;
use http_signature_normalization_actix::prelude::*; use http_signature_normalization_actix::prelude::*;
use rsa::{hash::Hash, padding::PaddingScheme, RsaPrivateKey}; use rsa::{hash::Hash, padding::PaddingScheme, RsaPrivateKey};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{ use std::{
cell::RefCell, cell::RefCell,
collections::HashMap,
rc::Rc, rc::Rc,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@ -23,7 +21,7 @@ use tracing_awc::Propagate;
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Breakers { pub(crate) struct Breakers {
inner: Arc<RwLock<HashMap<String, Arc<Mutex<Breaker>>>>>, inner: Arc<DashMap<String, Breaker>>,
} }
impl std::fmt::Debug for Breakers { impl std::fmt::Debug for Breakers {
@ -33,10 +31,10 @@ impl std::fmt::Debug for Breakers {
} }
impl Breakers { impl Breakers {
async fn should_try(&self, url: &Url) -> bool { fn should_try(&self, url: &Url) -> bool {
if let Some(domain) = url.domain() { if let Some(domain) = url.domain() {
if let Some(breaker) = self.inner.read().await.get(domain) { if let Some(breaker) = self.inner.get(domain) {
breaker.lock().await.should_try() breaker.should_try()
} else { } else {
true true
} }
@ -45,15 +43,11 @@ impl Breakers {
} }
} }
async fn fail(&self, url: &Url) { fn fail(&self, url: &Url) {
if let Some(domain) = url.domain() { if let Some(domain) = url.domain() {
let should_write = { let should_write = {
let read = self.inner.read().await; if let Some(mut breaker) = self.inner.get_mut(domain) {
breaker.fail();
if let Some(breaker) = read.get(domain) {
let owned_breaker = Arc::clone(&breaker);
drop(breaker);
owned_breaker.lock().await.fail();
false false
} else { } else {
true true
@ -61,24 +55,17 @@ impl Breakers {
}; };
if should_write { if should_write {
let mut hm = self.inner.write().await; let mut breaker = self.inner.entry(domain.to_owned()).or_default();
let breaker = hm breaker.fail();
.entry(domain.to_owned())
.or_insert(Arc::new(Mutex::new(Breaker::default())));
breaker.lock().await.fail();
} }
} }
} }
async fn succeed(&self, url: &Url) { fn succeed(&self, url: &Url) {
if let Some(domain) = url.domain() { if let Some(domain) = url.domain() {
let should_write = { let should_write = {
let read = self.inner.read().await; if let Some(mut breaker) = self.inner.get_mut(domain) {
breaker.succeed();
if let Some(breaker) = read.get(domain) {
let owned_breaker = Arc::clone(&breaker);
drop(breaker);
owned_breaker.lock().await.succeed();
false false
} else { } else {
true true
@ -86,11 +73,8 @@ impl Breakers {
}; };
if should_write { if should_write {
let mut hm = self.inner.write().await; let mut breaker = self.inner.entry(domain.to_owned()).or_default();
let breaker = hm breaker.succeed();
.entry(domain.to_owned())
.or_insert(Arc::new(Mutex::new(Breaker::default())));
breaker.lock().await.succeed();
} }
} }
} }
@ -99,7 +83,7 @@ impl Breakers {
impl Default for Breakers { impl Default for Breakers {
fn default() -> Self { fn default() -> Self {
Breakers { Breakers {
inner: Arc::new(RwLock::new(HashMap::new())), inner: Arc::new(DashMap::new()),
} }
} }
} }
@ -233,7 +217,7 @@ impl Requests {
{ {
let parsed_url = url.parse::<Url>()?; let parsed_url = url.parse::<Url>()?;
if !self.breakers.should_try(&parsed_url).await { if !self.breakers.should_try(&parsed_url) {
return Err(ErrorKind::Breaker.into()); return Err(ErrorKind::Breaker.into());
} }
@ -256,7 +240,7 @@ impl Requests {
if res.is_err() { if res.is_err() {
self.count_err(); self.count_err();
self.breakers.fail(&parsed_url).await; self.breakers.fail(&parsed_url);
} }
let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?; let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?;
@ -272,12 +256,12 @@ impl Requests {
} }
} }
self.breakers.fail(&parsed_url).await; self.breakers.fail(&parsed_url);
return Err(ErrorKind::Status(url.to_string(), res.status()).into()); return Err(ErrorKind::Status(url.to_string(), res.status()).into());
} }
self.breakers.succeed(&parsed_url).await; self.breakers.succeed(&parsed_url);
let body = res let body = res
.body() .body()
@ -291,7 +275,7 @@ impl Requests {
pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> { pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> {
let parsed_url = url.parse::<Url>()?; let parsed_url = url.parse::<Url>()?;
if !self.breakers.should_try(&parsed_url).await { if !self.breakers.should_try(&parsed_url) {
return Err(ErrorKind::Breaker.into()); return Err(ErrorKind::Breaker.into());
} }
@ -314,7 +298,7 @@ impl Requests {
.await; .await;
if res.is_err() { if res.is_err() {
self.breakers.fail(&parsed_url).await; self.breakers.fail(&parsed_url);
self.count_err(); self.count_err();
} }
@ -341,12 +325,12 @@ impl Requests {
} }
} }
self.breakers.fail(&parsed_url).await; self.breakers.fail(&parsed_url);
return Err(ErrorKind::Status(url.to_string(), res.status()).into()); return Err(ErrorKind::Status(url.to_string(), res.status()).into());
} }
self.breakers.succeed(&parsed_url).await; self.breakers.succeed(&parsed_url);
let bytes = match res.body().limit(1024 * 1024 * 4).await { let bytes = match res.body().limit(1024 * 1024 * 4).await {
Err(e) => { Err(e) => {
@ -366,7 +350,7 @@ impl Requests {
where where
T: serde::ser::Serialize + std::fmt::Debug, T: serde::ser::Serialize + std::fmt::Debug,
{ {
if !self.breakers.should_try(&inbox).await { if !self.breakers.should_try(&inbox) {
return Err(ErrorKind::Breaker.into()); return Err(ErrorKind::Breaker.into());
} }
@ -393,7 +377,7 @@ impl Requests {
if res.is_err() { if res.is_err() {
self.count_err(); self.count_err();
self.breakers.fail(&inbox).await; self.breakers.fail(&inbox);
} }
let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?; let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?;
@ -409,11 +393,11 @@ impl Requests {
} }
} }
self.breakers.fail(&inbox).await; self.breakers.fail(&inbox);
return Err(ErrorKind::Status(inbox.to_string(), res.status()).into()); return Err(ErrorKind::Status(inbox.to_string(), res.status()).into());
} }
self.breakers.succeed(&inbox).await; self.breakers.succeed(&inbox);
Ok(()) Ok(())
} }

View file

@ -24,8 +24,8 @@ pub(crate) async fn route(
ApActor::new(config.generate_url(UrlKind::Inbox), Application::new()), ApActor::new(config.generate_url(UrlKind::Inbox), Application::new()),
PublicKey { PublicKey {
public_key: PublicKeyInner { public_key: PublicKeyInner {
id: config.generate_url(UrlKind::MainKey).into(), id: config.generate_url(UrlKind::MainKey),
owner: config.generate_url(UrlKind::Actor).into(), owner: config.generate_url(UrlKind::Actor),
public_key_pem: state.public_key.to_public_key_pem()?, public_key_pem: state.public_key.to_public_key_pem()?,
}, },
}, },

View file

@ -54,13 +54,13 @@ pub(crate) async fn route(
.db .db
.inboxes() .inboxes()
.await .await
.unwrap_or(vec![]) .unwrap_or_default()
.iter() .iter()
.filter_map(|listener| listener.domain()) .filter_map(|listener| listener.domain())
.map(|s| s.to_owned()) .map(|s| s.to_owned())
.collect(), .collect(),
blocks: if config.publish_blocks() { blocks: if config.publish_blocks() {
Some(state.db.blocks().await.unwrap_or(vec![])) Some(state.db.blocks().await.unwrap_or_default())
} else { } else {
None None
}, },

View file

@ -4,6 +4,7 @@ use actix_web::{
web, HttpResponse, web, HttpResponse,
}; };
#[allow(clippy::async_yields_async)]
#[tracing::instrument(name = "Statistics")] #[tracing::instrument(name = "Statistics")]
pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse { pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse {
if let Some(data) = StaticFile::get(&filename.into_inner()) { if let Some(data) = StaticFile::get(&filename.into_inner()) {

View file

@ -11,10 +11,8 @@
</div> </div>
<div class="right"> <div class="right">
<p class="display-name"><a href="@contact.url">@contact.display_name</a></p> <p class="display-name"><a href="@contact.url">@contact.display_name</a></p>
@if let Some(domain) = base.domain() { <p class="username">
<p class="username">@@@contact.username@@@domain</p> @@@contact.username@if let Some(domain) = base.domain() {@@@domain}
} else { </p>
<p class="username">@@@contact.username</p>
}
</div> </div>
</div> </div>

View file

@ -36,7 +36,7 @@
} else { } else {
@if let Some(inf) = node.info.as_ref() { @if let Some(inf) = node.info.as_ref() {
<li> <li>
@:info(&inf, &node.base) @:info(inf, &node.base)
</li> </li>
} }
} }