From 6d3407701047dc4850de99b2fb2d4e734801d834 Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 17 Mar 2020 23:35:20 -0500 Subject: [PATCH] Big Cleanup --- src/apub.rs | 7 ++ src/db_actor.rs | 61 +++++++++++- src/error.rs | 26 ++++- src/inbox.rs | 142 ++++++++++---------------- src/main.rs | 23 ++--- src/requests.rs | 258 ++++++++++++++++++++++++++---------------------- src/state.rs | 41 +++----- src/verifier.rs | 17 ++-- 8 files changed, 306 insertions(+), 269 deletions(-) diff --git a/src/apub.rs b/src/apub.rs index 7cff20a..195a6a8 100644 --- a/src/apub.rs +++ b/src/apub.rs @@ -108,6 +108,13 @@ impl ValidObjects { } } + pub fn kind(&self) -> Option<&str> { + match self { + ValidObjects::Id(_) => None, + ValidObjects::Object(AnyExistingObject { kind, .. }) => Some(kind), + } + } + pub fn is_kind(&self, query_kind: &str) -> bool { match self { ValidObjects::Id(_) => false, diff --git a/src/db_actor.rs b/src/db_actor.rs index f98a195..b5319ae 100644 --- a/src/db_actor.rs +++ b/src/db_actor.rs @@ -1,9 +1,19 @@ -use crate::label::ArbiterLabel; +use crate::{ + db::{add_listener, remove_listener}, + error::MyError, + label::ArbiterLabel, +}; +use activitystreams::primitives::XsdAnyUri; use actix::prelude::*; use bb8_postgres::{bb8, tokio_postgres, PostgresConnectionManager}; use log::{error, info}; use tokio::sync::oneshot::{channel, Receiver}; +#[derive(Clone)] +pub struct Db { + actor: Addr, +} + pub type Pool = bb8::Pool>; pub enum DbActorState { @@ -17,11 +27,52 @@ pub struct DbActor { pub struct DbQuery(pub F); -impl DbActor { - pub fn new(config: tokio_postgres::Config) -> Addr { - Supervisor::start(|_| DbActor { +impl Db { + pub fn new(config: tokio_postgres::Config) -> Db { + let actor = Supervisor::start(|_| DbActor { pool: DbActorState::new_empty(config), - }) + }); + + Db { actor } + } + + pub async fn execute_inline(&self, f: F) -> Result + where + T: Send + 'static, + F: FnOnce(Pool) -> Fut + Send + 'static, + Fut: Future, + { + Ok(self.actor.send(DbQuery(f)).await?.await?) + } + + pub fn remove_listener(&self, inbox: XsdAnyUri) { + self.actor.do_send(DbQuery(move |pool: Pool| { + let inbox = inbox.clone(); + + async move { + let conn = pool.get().await?; + + remove_listener(&conn, &inbox).await.map_err(|e| { + error!("Error removing listener, {}", e); + e + }) + } + })); + } + + pub fn add_listener(&self, inbox: XsdAnyUri) { + self.actor.do_send(DbQuery(move |pool: Pool| { + let inbox = inbox.clone(); + + async move { + let conn = pool.get().await?; + + add_listener(&conn, &inbox).await.map_err(|e| { + error!("Error adding listener, {}", e); + e + }) + } + })); } } diff --git a/src/error.rs b/src/error.rs index 9f8bb5a..3814608 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,8 +1,10 @@ use activitystreams::primitives::XsdAnyUriError; +use actix::MailboxError; use actix_web::{error::ResponseError, http::StatusCode, HttpResponse}; use log::error; use rsa_pem::KeyError; use std::{convert::Infallible, io::Error}; +use tokio::sync::oneshot::error::RecvError; #[derive(Debug, thiserror::Error)] pub enum MyError { @@ -27,6 +29,9 @@ pub enum MyError { #[error("Couldn't parse the signature header")] HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue), + #[error("Failed to get output of db operation")] + Oneshot(#[from] RecvError), + #[error("Couldn't decode base64")] Base64(#[from] base64::DecodeError), @@ -42,12 +47,18 @@ pub enum MyError { #[error("Actor ({0}) tried to submit another actor's ({1}) payload")] BadActor(String, String), + #[error("Wrong ActivityPub kind, {0}")] + Kind(String), + + #[error("The requested actor's mailbox is closed")] + MailboxClosed, + + #[error("The requested actor's mailbox has timed out")] + MailboxTimeout, + #[error("Invalid algorithm provided to verifier")] Algorithm, - #[error("Wrong ActivityPub kind")] - Kind, - #[error("Object has already been relayed")] Duplicate, @@ -87,3 +98,12 @@ impl From for MyError { MyError::Rsa(e) } } + +impl From for MyError { + fn from(m: MailboxError) -> MyError { + match m { + MailboxError::Closed => MyError::MailboxClosed, + MailboxError::Timeout => MyError::MailboxTimeout, + } + } +} diff --git a/src/inbox.rs b/src/inbox.rs index 97e14f8..98cbfae 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -1,9 +1,8 @@ use crate::{ apub::{AcceptedActors, AcceptedObjects, ValidTypes}, - db::{add_listener, remove_listener}, - db_actor::{DbActor, DbQuery, Pool}, + db_actor::Db, error::MyError, - requests::{deliver, deliver_many, fetch_actor}, + requests::Requests, response, state::{State, UrlKind}, }; @@ -12,27 +11,20 @@ use activitystreams::{ context, primitives::XsdAnyUri, }; -use actix::Addr; -use actix_web::{client::Client, web, HttpResponse}; +use actix_web::{web, HttpResponse}; use futures::join; use http_signature_normalization_actix::middleware::SignatureVerified; -use log::error; pub async fn inbox( - db_actor: web::Data>, + db: web::Data, state: web::Data, - client: web::Data, + client: web::Data, input: web::Json, verified: SignatureVerified, ) -> Result { let input = input.into_inner(); - let actor = fetch_actor( - state.clone().into_inner(), - client.clone().into_inner(), - &input.actor, - ) - .await?; + let actor = client.fetch_actor(&input.actor).await?; let (is_blocked, is_whitelisted) = join!(state.is_blocked(&actor.id), state.is_whitelisted(&actor.id),); @@ -46,6 +38,7 @@ pub async fn inbox( } if actor.public_key.id.as_str() != verified.key_id() { + log::error!("Bad actor, more info: {:?}", input); return Err(MyError::BadActor( actor.public_key.id.to_string(), verified.key_id().to_owned(), @@ -54,81 +47,68 @@ pub async fn inbox( match input.kind { ValidTypes::Announce | ValidTypes::Create => { - handle_relay(state, client, input, actor).await + handle_relay(&state, &client, input, actor).await } - ValidTypes::Follow => handle_follow(db_actor, state, client, input, actor).await, + ValidTypes::Follow => handle_follow(&db, &state, &client, input, actor).await, ValidTypes::Delete | ValidTypes::Update => { - handle_forward(state, client, input, actor).await + handle_forward(&state, &client, input, actor).await } - ValidTypes::Undo => handle_undo(db_actor, state, client, input, actor).await, + ValidTypes::Undo => handle_undo(&db, &state, &client, input, actor).await, } } async fn handle_undo( - db_actor: web::Data>, - state: web::Data, - client: web::Data, + db: &Db, + state: &State, + client: &Requests, input: AcceptedObjects, actor: AcceptedActors, ) -> Result { if !input.object.is_kind("Follow") { - return Err(MyError::Kind); + return Err(MyError::Kind( + input.object.kind().unwrap_or("unknown").to_owned(), + )); } let my_id: XsdAnyUri = state.generate_url(UrlKind::Actor).parse()?; if !input.object.child_object_is(&my_id) { + log::error!("Wrong actor, more info: {:?}", input); return Err(MyError::WrongActor(input.object.id().to_string())); } let inbox = actor.inbox().to_owned(); + db.remove_listener(inbox); - db_actor.do_send(DbQuery(move |pool: Pool| { - let inbox = inbox.clone(); + let undo = generate_undo_follow(state, &actor.id, &my_id)?; - async move { - let conn = pool.get().await?; - - remove_listener(&conn, &inbox).await.map_err(|e| { - error!("Error removing listener, {}", e); - e - }) - } - })); - - let actor_inbox = actor.inbox().clone(); - let undo = generate_undo_follow(&state, &actor.id, &my_id)?; + let client2 = client.clone(); + let inbox = actor.inbox().clone(); let undo2 = undo.clone(); actix::Arbiter::spawn(async move { - let _ = deliver( - &state.into_inner(), - &client.into_inner(), - actor_inbox, - &undo2, - ) - .await; + let _ = client2.deliver(inbox, &undo2).await; }); Ok(response(undo)) } async fn handle_forward( - state: web::Data, - client: web::Data, + state: &State, + client: &Requests, input: AcceptedObjects, actor: AcceptedActors, ) -> Result { let object_id = input.object.id(); - let inboxes = get_inboxes(&state, &actor, &object_id).await?; - deliver_many(&state, &client, inboxes, input.clone()); + let inboxes = get_inboxes(state, &actor, &object_id).await?; + client.deliver_many(inboxes, input.clone()); Ok(response(input)) } async fn handle_relay( - state: web::Data, - client: web::Data, + state: &State, + client: &Requests, input: AcceptedObjects, actor: AcceptedActors, ) -> Result { @@ -140,9 +120,9 @@ async fn handle_relay( let activity_id: XsdAnyUri = state.generate_url(UrlKind::Activity).parse()?; - let announce = generate_announce(&state, &activity_id, object_id)?; - let inboxes = get_inboxes(&state, &actor, &object_id).await?; - deliver_many(&state, &client, inboxes, announce.clone()); + let announce = generate_announce(state, &activity_id, object_id)?; + let inboxes = get_inboxes(state, &actor, &object_id).await?; + client.deliver_many(inboxes, announce.clone()); state.cache(object_id.to_owned(), activity_id).await; @@ -150,9 +130,9 @@ async fn handle_relay( } async fn handle_follow( - db_actor: web::Data>, - state: web::Data, - client: web::Data, + db: &Db, + state: &State, + client: &Requests, input: AcceptedObjects, actor: AcceptedActors, ) -> Result { @@ -165,46 +145,26 @@ async fn handle_follow( let is_listener = state.is_listener(&actor.id).await; if !is_listener { + let follow = generate_follow(state, &actor.id, &my_id)?; + let inbox = actor.inbox().to_owned(); - db_actor.do_send(DbQuery(move |pool: Pool| { - let inbox = inbox.clone(); + db.add_listener(inbox); - async move { - let conn = pool.get().await?; - - add_listener(&conn, &inbox).await.map_err(|e| { - error!("Error adding listener, {}", e); - e - }) - } - })); - - let actor_inbox = actor.inbox().clone(); - let follow = generate_follow(&state, &actor.id, &my_id)?; - let state2 = state.clone(); let client2 = client.clone(); + let inbox = actor.inbox().clone(); + let follow2 = follow.clone(); actix::Arbiter::spawn(async move { - let _ = deliver( - &state2.into_inner(), - &client2.into_inner(), - actor_inbox, - &follow, - ) - .await; + let _ = client2.deliver(inbox, &follow2).await; }); } - let actor_inbox = actor.inbox().clone(); - let accept = generate_accept_follow(&state, &actor.id, &input.id, &my_id)?; + let accept = generate_accept_follow(state, &actor.id, &input.id, &my_id)?; + + let client2 = client.clone(); + let inbox = actor.inbox().clone(); let accept2 = accept.clone(); actix::Arbiter::spawn(async move { - let _ = deliver( - &state.into_inner(), - &client.into_inner(), - actor_inbox, - &accept2, - ) - .await; + let _ = client2.deliver(inbox, &accept2).await; }); Ok(response(accept)) @@ -212,7 +172,7 @@ async fn handle_follow( // Generate a type that says "I want to stop following you" fn generate_undo_follow( - state: &web::Data, + state: &State, actor_id: &XsdAnyUri, my_id: &XsdAnyUri, ) -> Result { @@ -240,7 +200,7 @@ fn generate_undo_follow( // Generate a type that says "Look at this object" fn generate_announce( - state: &web::Data, + state: &State, activity_id: &XsdAnyUri, object_id: &XsdAnyUri, ) -> Result { @@ -262,7 +222,7 @@ fn generate_announce( // Generate a type that says "I want to follow you" fn generate_follow( - state: &web::Data, + state: &State, actor_id: &XsdAnyUri, my_id: &XsdAnyUri, ) -> Result { @@ -284,7 +244,7 @@ fn generate_follow( // Generate a type that says "I accept your follow request" fn generate_accept_follow( - state: &web::Data, + state: &State, actor_id: &XsdAnyUri, input_id: &XsdAnyUri, my_id: &XsdAnyUri, @@ -311,7 +271,7 @@ fn generate_accept_follow( } async fn get_inboxes( - state: &web::Data, + state: &State, actor: &AcceptedActors, object_id: &XsdAnyUri, ) -> Result, MyError> { diff --git a/src/main.rs b/src/main.rs index 69542d8..b508e85 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,5 @@ use activitystreams::{actor::apub::Application, context, endpoint::EndpointProperties}; -use actix_web::{ - client::Client, middleware::Logger, web, App, HttpResponse, HttpServer, Responder, -}; +use actix_web::{middleware::Logger, web, App, HttpResponse, HttpServer, Responder}; use bb8_postgres::tokio_postgres; use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature}; use rsa_pem::KeyExt; @@ -21,7 +19,7 @@ mod webfinger; use self::{ apub::PublicKey, - db_actor::DbActor, + db_actor::Db, error::MyError, label::ArbiterLabelFactory, state::{State, UrlKind}, @@ -86,34 +84,31 @@ async fn main() -> Result<(), anyhow::Error> { let arbiter_labeler = ArbiterLabelFactory::new(); - let db_actor = DbActor::new(pg_config.clone()); + let db = Db::new(pg_config.clone()); arbiter_labeler.clone().set_label(); - let state: State = db_actor - .send(db_actor::DbQuery(move |pool| { - State::hydrate(use_https, use_whitelist, hostname, pool) - })) - .await? + let state: State = db + .execute_inline(move |pool| State::hydrate(use_https, use_whitelist, hostname, pool)) .await??; let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); HttpServer::new(move || { - let actor = DbActor::new(pg_config.clone()); arbiter_labeler.clone().set_label(); - let client = Client::default(); + let state = state.clone(); + let actor = Db::new(pg_config.clone()); App::new() .wrap(Logger::default()) .data(actor) .data(state.clone()) - .data(client.clone()) + .data(state.requests()) .service(web::resource("/").route(web::get().to(index))) .service( web::resource("/inbox") .wrap(VerifyDigest::new(Sha256::new())) .wrap(VerifySignature::new( - MyVerify(state.clone(), client), + MyVerify(state.requests()), Default::default(), )) .route(web::post().to(inbox::inbox)), diff --git a/src/requests.rs b/src/requests.rs index f69fd84..bbbe2fe 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -1,131 +1,157 @@ -use crate::{ - apub::AcceptedActors, - error::MyError, - state::{State, UrlKind}, -}; +use crate::{apub::AcceptedActors, error::MyError, state::ActorCache}; use activitystreams::primitives::XsdAnyUri; -use actix_web::{client::Client, web}; +use actix::Arbiter; +use actix_web::client::Client; +use futures::stream::StreamExt; +use http_signature_normalization_actix::prelude::*; use log::error; +use rsa::{hash::Hashes, padding::PaddingScheme, RSAPrivateKey}; +use sha2::{Digest, Sha256}; -pub async fn fetch_actor( - state: std::sync::Arc, - client: std::sync::Arc, - actor_id: &XsdAnyUri, -) -> Result { - use http_signature_normalization_actix::prelude::*; +#[derive(Clone)] +pub struct Requests { + client: Client, + key_id: String, + private_key: RSAPrivateKey, + actor_cache: ActorCache, + config: Config, +} - if let Some(actor) = state.get_actor(actor_id).await { - return Ok(actor); - } - - let key_id = state.generate_url(UrlKind::MainKey); - - let mut res = client - .get(actor_id.as_str()) - .header("Accept", "application/activity+json") - .signature( - &Config::default().dont_use_created_field(), +impl Requests { + pub fn new(key_id: String, private_key: RSAPrivateKey, actor_cache: ActorCache) -> Self { + Requests { + client: Client::default(), key_id, - |signing_string| state.sign(signing_string), - )? - .send() - .await - .map_err(|e| { - error!("Couldn't send request to {} for actor, {}", actor_id, e); - MyError::SendRequest - })?; - - if !res.status().is_success() { - error!("Invalid status code for actor fetch, {}", res.status()); - if let Ok(bytes) = res.body().await { - if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - error!("Response, {}", s); - } + private_key, + actor_cache, + config: Config::default().dont_use_created_field(), } - - return Err(MyError::Status); } - let actor: AcceptedActors = res.json().await.map_err(|e| { - error!("Coudn't fetch actor from {}, {}", actor_id, e); - MyError::ReceiveResponse - })?; - - state.cache_actor(actor_id.to_owned(), actor.clone()).await; - - Ok(actor) -} - -pub fn deliver_many( - state: &web::Data, - client: &web::Data, - inboxes: Vec, - item: T, -) where - T: serde::ser::Serialize + 'static, -{ - let client = client.clone().into_inner(); - let state = state.clone().into_inner(); - - actix::Arbiter::spawn(async move { - use futures::stream::StreamExt; - - let mut unordered = futures::stream::FuturesUnordered::new(); - - for inbox in inboxes { - unordered.push(deliver(&state, &client, inbox, &item)); + pub async fn fetch_actor(&self, actor_id: &XsdAnyUri) -> Result { + if let Some(actor) = self.get_actor(actor_id).await { + return Ok(actor); } - while let Some(_) = unordered.next().await {} - }); -} + let actor: AcceptedActors = self.fetch(actor_id.as_str()).await?; -pub async fn deliver( - state: &std::sync::Arc, - client: &std::sync::Arc, - inbox: XsdAnyUri, - item: &T, -) -> Result<(), MyError> -where - T: serde::ser::Serialize, -{ - use http_signature_normalization_actix::prelude::*; - use sha2::{Digest, Sha256}; + self.cache_actor(actor_id.to_owned(), actor.clone()).await; - let mut digest = Sha256::new(); - - let key_id = state.generate_url(UrlKind::MainKey); - - let item_string = serde_json::to_string(item)?; - - let mut res = client - .post(inbox.as_str()) - .header("Accept", "application/activity+json") - .header("Content-Type", "application/activity+json") - .header("User-Agent", "Aode Relay v0.1.0") - .signature_with_digest( - &Config::default().dont_use_created_field(), - &key_id, - &mut digest, - item_string, - |signing_string| state.sign(signing_string), - )? - .send() - .await - .map_err(|e| { - error!("Couldn't send deliver request to {}, {}", inbox, e); - MyError::SendRequest - })?; - - if !res.status().is_success() { - error!("Invalid response status from {}, {}", inbox, res.status()); - if let Ok(bytes) = res.body().await { - if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - error!("Response, {}", s); - } - } - return Err(MyError::Status); + Ok(actor) } - Ok(()) + pub async fn fetch(&self, url: &str) -> Result + where + T: serde::de::DeserializeOwned, + { + let mut res = self + .client + .get(url) + .header("Accept", "application/activity+json") + .signature(&self.config, &self.key_id, |signing_string| { + self.sign(signing_string) + })? + .send() + .await + .map_err(|e| { + error!("Couldn't send request to {}, {}", url, e); + MyError::SendRequest + })?; + + if !res.status().is_success() { + error!("Invalid status code for fetch, {}", res.status()); + if let Ok(bytes) = res.body().await { + if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { + error!("Response, {}", s); + } + } + + return Err(MyError::Status); + } + + res.json().await.map_err(|e| { + error!("Coudn't fetch json from {}, {}", url, e); + MyError::ReceiveResponse + }) + } + + pub fn deliver_many(&self, inboxes: Vec, item: T) + where + T: serde::ser::Serialize + 'static, + { + let this = self.clone(); + + Arbiter::spawn(async move { + let mut unordered = futures::stream::FuturesUnordered::new(); + + for inbox in inboxes { + unordered.push(this.deliver(inbox, &item)); + } + + while let Some(_) = unordered.next().await {} + }); + } + + pub async fn deliver(&self, inbox: XsdAnyUri, item: &T) -> Result<(), MyError> + where + T: serde::ser::Serialize, + { + let mut digest = Sha256::new(); + + let item_string = serde_json::to_string(item)?; + + let mut res = self + .client + .post(inbox.as_str()) + .header("Accept", "application/activity+json") + .header("Content-Type", "application/activity+json") + .header("User-Agent", "Aode Relay v0.1.0") + .signature_with_digest( + &self.config, + &self.key_id, + &mut digest, + item_string, + |signing_string| self.sign(signing_string), + )? + .send() + .await + .map_err(|e| { + error!("Couldn't send deliver request to {}, {}", inbox, e); + MyError::SendRequest + })?; + + if !res.status().is_success() { + error!("Invalid response status from {}, {}", inbox, res.status()); + if let Ok(bytes) = res.body().await { + if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { + error!("Response, {}", s); + } + } + return Err(MyError::Status); + } + + Ok(()) + } + + fn sign(&self, signing_string: &str) -> Result { + let hashed = Sha256::digest(signing_string.as_bytes()); + let bytes = + self.private_key + .sign(PaddingScheme::PKCS1v15, Some(&Hashes::SHA2_256), &hashed)?; + Ok(base64::encode(bytes)) + } + + async fn get_actor(&self, actor_id: &XsdAnyUri) -> Option { + let cache = self.actor_cache.clone(); + + let read_guard = cache.read().await; + read_guard.get(actor_id).cloned() + } + + async fn cache_actor(&self, actor_id: XsdAnyUri, actor: AcceptedActors) { + let cache = self.actor_cache.clone(); + + let mut write_guard = cache.write().await; + write_guard.insert(actor_id, actor, std::time::Duration::from_secs(3600)); + } } diff --git a/src/state.rs b/src/state.rs index b02497b..019f999 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,3 +1,4 @@ +use crate::{apub::AcceptedActors, db_actor::Pool, requests::Requests}; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; use bb8_postgres::tokio_postgres::Client; @@ -11,12 +12,12 @@ use tokio::sync::RwLock; use ttl_cache::TtlCache; use uuid::Uuid; -use crate::{apub::AcceptedActors, db_actor::Pool}; +pub type ActorCache = Arc>>; #[derive(Clone)] pub struct State { pub settings: Settings, - actor_cache: Arc>>, + actor_cache: ActorCache, actor_id_cache: Arc>>, blocks: Arc>>, whitelists: Arc>>, @@ -98,19 +99,17 @@ impl Settings { fn generate_resource(&self) -> String { format!("relay@{}", self.hostname) } - - fn sign(&self, signing_string: &str) -> Result { - use rsa::{hash::Hashes, padding::PaddingScheme}; - use sha2::{Digest, Sha256}; - let hashed = Sha256::digest(signing_string.as_bytes()); - let bytes = - self.private_key - .sign(PaddingScheme::PKCS1v15, Some(&Hashes::SHA2_256), &hashed)?; - Ok(base64::encode(bytes)) - } } impl State { + pub fn requests(&self) -> Requests { + Requests::new( + self.generate_url(UrlKind::MainKey), + self.settings.private_key.clone(), + self.actor_cache.clone(), + ) + } + pub fn generate_url(&self, kind: UrlKind) -> String { self.settings.generate_url(kind) } @@ -119,10 +118,6 @@ impl State { self.settings.generate_resource() } - pub fn sign(&self, signing_string: &str) -> Result { - self.settings.sign(signing_string) - } - pub async fn bust_whitelist(&self, whitelist: &str) { let hs = self.whitelists.clone(); @@ -196,20 +191,6 @@ impl State { read_guard.contains(actor_id) } - pub async fn get_actor(&self, actor_id: &XsdAnyUri) -> Option { - let cache = self.actor_cache.clone(); - - let read_guard = cache.read().await; - read_guard.get(actor_id).cloned() - } - - pub async fn cache_actor(&self, actor_id: XsdAnyUri, actor: AcceptedActors) { - let cache = self.actor_cache.clone(); - - let mut write_guard = cache.write().await; - write_guard.insert(actor_id, actor, std::time::Duration::from_secs(3600)); - } - pub async fn is_cached(&self, object_id: &XsdAnyUri) -> bool { let cache = self.actor_id_cache.clone(); diff --git a/src/verifier.rs b/src/verifier.rs index 4c1a5ae..dc31955 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -1,14 +1,13 @@ -use crate::{error::MyError, requests::fetch_actor, state::State}; -use actix_web::client::Client; +use crate::{error::MyError, requests::Requests}; use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; use log::{debug, error, warn}; use rsa::{hash::Hashes, padding::PaddingScheme, PublicKey, RSAPublicKey}; use rsa_pem::KeyExt; use sha2::{Digest, Sha256}; -use std::{future::Future, pin::Pin, sync::Arc}; +use std::{future::Future, pin::Pin}; #[derive(Clone)] -pub struct MyVerify(pub State, pub Client); +pub struct MyVerify(pub Requests); impl SignatureVerify for MyVerify { type Error = MyError; @@ -25,11 +24,10 @@ impl SignatureVerify for MyVerify { let signature = signature.to_owned(); let signing_string = signing_string.to_owned(); - let state = Arc::new(self.0.clone()); - let client = Arc::new(self.1.clone()); + let client = self.0.clone(); Box::pin(async move { - verify(state, client, algorithm, key_id, signature, signing_string) + verify(client, algorithm, key_id, signature, signing_string) .await .map_err(|e| { error!("Failed to verify, {}", e); @@ -40,15 +38,14 @@ impl SignatureVerify for MyVerify { } async fn verify( - state: Arc, - client: Arc, + client: Requests, algorithm: Option, key_id: String, signature: String, signing_string: String, ) -> Result { debug!("Fetching actor"); - let actor = fetch_actor(state, client, &key_id.parse()?).await?; + let actor = client.fetch_actor(&key_id.parse()?).await?; debug!("Parsing public key"); let public_key = RSAPublicKey::from_pem_pkcs8(&actor.public_key.public_key_pem)?;