Improve concurrency

This commit is contained in:
asonix 2022-11-02 13:55:45 -05:00
parent 32f5a0670f
commit a2bd41a00f
8 changed files with 38 additions and 34 deletions

View file

@ -1,8 +1,8 @@
use activitystreams::{ use activitystreams::{
activity::ActorAndObject, activity::ActorAndObject,
actor::{Actor, ApActor}, actor::{Actor, ApActor},
unparsed::UnparsedMutExt,
iri_string::types::IriString, iri_string::types::IriString,
unparsed::UnparsedMutExt,
}; };
use activitystreams_ext::{Ext1, UnparsedExtension}; use activitystreams_ext::{Ext1, UnparsedExtension};

View file

@ -56,8 +56,12 @@ impl ActorCache {
#[tracing::instrument(name = "Add Connection", skip(self))] #[tracing::instrument(name = "Add Connection", skip(self))]
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> { pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
self.db.add_connection(actor.id.clone()).await?; let add_connection = self.db.add_connection(actor.id.clone());
self.db.save_actor(actor).await let save_actor = self.db.save_actor(actor);
tokio::try_join!(add_connection, save_actor)?;
Ok(())
} }
#[tracing::instrument(name = "Remove Connection", skip(self))] #[tracing::instrument(name = "Remove Connection", skip(self))]

View file

@ -36,9 +36,11 @@ impl NodeCache {
#[tracing::instrument(name = "Get nodes", skip(self))] #[tracing::instrument(name = "Get nodes", skip(self))]
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> { pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
let infos = self.db.connected_info().await?; let infos = self.db.connected_info();
let instances = self.db.connected_instance().await?; let instances = self.db.connected_instance();
let contacts = self.db.connected_contact().await?; let contacts = self.db.connected_contact();
let (infos, instances, contacts) = tokio::try_join!(infos, instances, contacts)?;
let vec = self let vec = self
.db .db

View file

@ -7,12 +7,11 @@ use crate::{
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::web; use actix_web::web;
use async_rwlock::RwLock;
use lru::LruCache; use lru::LruCache;
use rand::thread_rng; use rand::thread_rng;
use rsa::{RsaPrivateKey, RsaPublicKey}; use rsa::{RsaPrivateKey, RsaPublicKey};
use std::sync::Arc; use std::sync::Arc;
use tracing::info; use tokio::sync::RwLock;
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
@ -89,10 +88,10 @@ impl State {
#[tracing::instrument(name = "Building state", skip_all)] #[tracing::instrument(name = "Building state", skip_all)]
pub(crate) async fn build(db: Db) -> Result<Self, Error> { pub(crate) async fn build(db: Db) -> Result<Self, Error> {
let private_key = if let Ok(Some(key)) = db.private_key().await { let private_key = if let Ok(Some(key)) = db.private_key().await {
info!("Using existing key"); tracing::info!("Using existing key");
key key
} else { } else {
info!("Generating new keys"); tracing::info!("Generating new keys");
let key = web::block(move || { let key = web::block(move || {
let mut rng = thread_rng(); let mut rng = thread_rng();
RsaPrivateKey::new(&mut rng, 4096) RsaPrivateKey::new(&mut rng, 4096)

View file

@ -12,7 +12,6 @@ use std::{
future::{ready, Ready}, future::{ready, Ready},
task::{Context, Poll}, task::{Context, Poll},
}; };
use tracing::info;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct DebugPayload(pub bool); pub(crate) struct DebugPayload(pub bool);
@ -63,7 +62,7 @@ where
}) })
.map_ok(|bytes| { .map_ok(|bytes| {
let bytes = bytes.freeze(); let bytes = bytes.freeze();
info!("{}", String::from_utf8_lossy(&bytes)); tracing::info!("{}", String::from_utf8_lossy(&bytes));
bytes bytes
}), }),
)), )),

View file

@ -15,7 +15,6 @@ use activitystreams::{
}; };
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
use tracing::error;
#[tracing::instrument(name = "Inbox", skip(actors, client, jobs, config, state))] #[tracing::instrument(name = "Inbox", skip(actors, client, jobs, config, state))]
pub(crate) async fn route( pub(crate) async fn route(
@ -37,8 +36,10 @@ pub(crate) async fn route(
.await? .await?
.into_inner(); .into_inner();
let is_allowed = state.db.is_allowed(actor.id.clone()).await?; let is_allowed = state.db.is_allowed(actor.id.clone());
let is_connected = state.db.is_connected(actor.id.clone()).await?; let is_connected = state.db.is_connected(actor.id.clone());
let (is_allowed, is_connected) = tokio::try_join!(is_allowed, is_connected)?;
if !is_allowed { if !is_allowed {
return Err(ErrorKind::NotAllowed(actor.id.to_string()).into()); return Err(ErrorKind::NotAllowed(actor.id.to_string()).into());
@ -53,7 +54,7 @@ pub(crate) async fn route(
} else if config.validate_signatures() { } else if config.validate_signatures() {
if let Some((verified, _)) = verified { if let Some((verified, _)) = verified {
if actor.public_key_id.as_str() != verified.key_id() { if actor.public_key_id.as_str() != verified.key_id() {
error!("Bad actor, more info: {:?}", input); tracing::error!("Bad actor, more info: {:?}", input);
return Err(ErrorKind::BadActor( return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(), actor.public_key_id.to_string(),
verified.key_id().to_owned(), verified.key_id().to_owned(),

View file

@ -6,7 +6,6 @@ use crate::{
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use rand::{seq::SliceRandom, thread_rng}; use rand::{seq::SliceRandom, thread_rng};
use std::io::BufWriter; use std::io::BufWriter;
use tracing::error;
#[tracing::instrument(name = "Index", skip(config, state))] #[tracing::instrument(name = "Index", skip(config, state))]
pub(crate) async fn route( pub(crate) async fn route(
@ -19,7 +18,7 @@ pub(crate) async fn route(
crate::templates::index(&mut buf, &nodes, &config)?; crate::templates::index(&mut buf, &nodes, &config)?;
let buf = buf.into_inner().map_err(|e| { let buf = buf.into_inner().map_err(|e| {
error!("Error rendering template, {}", e.error()); tracing::error!("Error rendering template, {}", e.error());
ErrorKind::FlushBuffer ErrorKind::FlushBuffer
})?; })?;

View file

@ -29,6 +29,21 @@ pub(crate) async fn route(
config: web::Data<Config>, config: web::Data<Config>,
state: web::Data<State>, state: web::Data<State>,
) -> web::Json<NodeInfo> { ) -> web::Json<NodeInfo> {
let (inboxes, blocks) = tokio::join!(state.db.inboxes(), async {
if config.publish_blocks() {
Some(state.db.blocks().await.unwrap_or_default())
} else {
None
}
});
let peers = inboxes
.unwrap_or_default()
.iter()
.filter_map(|listener| listener.authority_str())
.map(|s| s.to_owned())
.collect();
web::Json(NodeInfo { web::Json(NodeInfo {
version: NodeInfoVersion, version: NodeInfoVersion,
software: Software { software: Software {
@ -50,22 +65,7 @@ pub(crate) async fn route(
local_posts: 0, local_posts: 0,
local_comments: 0, local_comments: 0,
}, },
metadata: Metadata { metadata: Metadata { peers, blocks },
peers: state
.db
.inboxes()
.await
.unwrap_or_default()
.iter()
.filter_map(|listener| listener.authority_str())
.map(|s| s.to_owned())
.collect(),
blocks: if config.publish_blocks() {
Some(state.db.blocks().await.unwrap_or_default())
} else {
None
},
},
}) })
} }