Big Cleanup

This commit is contained in:
asonix 2020-03-17 23:35:20 -05:00
parent a4ec70d6ec
commit 6d34077010
8 changed files with 306 additions and 269 deletions

View file

@ -108,6 +108,13 @@ impl ValidObjects {
} }
} }
pub fn kind(&self) -> Option<&str> {
match self {
ValidObjects::Id(_) => None,
ValidObjects::Object(AnyExistingObject { kind, .. }) => Some(kind),
}
}
pub fn is_kind(&self, query_kind: &str) -> bool { pub fn is_kind(&self, query_kind: &str) -> bool {
match self { match self {
ValidObjects::Id(_) => false, ValidObjects::Id(_) => false,

View file

@ -1,9 +1,19 @@
use crate::label::ArbiterLabel; use crate::{
db::{add_listener, remove_listener},
error::MyError,
label::ArbiterLabel,
};
use activitystreams::primitives::XsdAnyUri;
use actix::prelude::*; use actix::prelude::*;
use bb8_postgres::{bb8, tokio_postgres, PostgresConnectionManager}; use bb8_postgres::{bb8, tokio_postgres, PostgresConnectionManager};
use log::{error, info}; use log::{error, info};
use tokio::sync::oneshot::{channel, Receiver}; use tokio::sync::oneshot::{channel, Receiver};
#[derive(Clone)]
pub struct Db {
actor: Addr<DbActor>,
}
pub type Pool = bb8::Pool<PostgresConnectionManager<tokio_postgres::tls::NoTls>>; pub type Pool = bb8::Pool<PostgresConnectionManager<tokio_postgres::tls::NoTls>>;
pub enum DbActorState { pub enum DbActorState {
@ -17,11 +27,52 @@ pub struct DbActor {
pub struct DbQuery<F>(pub F); pub struct DbQuery<F>(pub F);
impl DbActor { impl Db {
pub fn new(config: tokio_postgres::Config) -> Addr<Self> { pub fn new(config: tokio_postgres::Config) -> Db {
Supervisor::start(|_| DbActor { let actor = Supervisor::start(|_| DbActor {
pool: DbActorState::new_empty(config), pool: DbActorState::new_empty(config),
}) });
Db { actor }
}
pub async fn execute_inline<T, F, Fut>(&self, f: F) -> Result<T, MyError>
where
T: Send + 'static,
F: FnOnce(Pool) -> Fut + Send + 'static,
Fut: Future<Output = T>,
{
Ok(self.actor.send(DbQuery(f)).await?.await?)
}
pub fn remove_listener(&self, inbox: XsdAnyUri) {
self.actor.do_send(DbQuery(move |pool: Pool| {
let inbox = inbox.clone();
async move {
let conn = pool.get().await?;
remove_listener(&conn, &inbox).await.map_err(|e| {
error!("Error removing listener, {}", e);
e
})
}
}));
}
pub fn add_listener(&self, inbox: XsdAnyUri) {
self.actor.do_send(DbQuery(move |pool: Pool| {
let inbox = inbox.clone();
async move {
let conn = pool.get().await?;
add_listener(&conn, &inbox).await.map_err(|e| {
error!("Error adding listener, {}", e);
e
})
}
}));
} }
} }

View file

@ -1,8 +1,10 @@
use activitystreams::primitives::XsdAnyUriError; use activitystreams::primitives::XsdAnyUriError;
use actix::MailboxError;
use actix_web::{error::ResponseError, http::StatusCode, HttpResponse}; use actix_web::{error::ResponseError, http::StatusCode, HttpResponse};
use log::error; use log::error;
use rsa_pem::KeyError; use rsa_pem::KeyError;
use std::{convert::Infallible, io::Error}; use std::{convert::Infallible, io::Error};
use tokio::sync::oneshot::error::RecvError;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum MyError { pub enum MyError {
@ -27,6 +29,9 @@ pub enum MyError {
#[error("Couldn't parse the signature header")] #[error("Couldn't parse the signature header")]
HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue), HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue),
#[error("Failed to get output of db operation")]
Oneshot(#[from] RecvError),
#[error("Couldn't decode base64")] #[error("Couldn't decode base64")]
Base64(#[from] base64::DecodeError), Base64(#[from] base64::DecodeError),
@ -42,12 +47,18 @@ pub enum MyError {
#[error("Actor ({0}) tried to submit another actor's ({1}) payload")] #[error("Actor ({0}) tried to submit another actor's ({1}) payload")]
BadActor(String, String), BadActor(String, String),
#[error("Wrong ActivityPub kind, {0}")]
Kind(String),
#[error("The requested actor's mailbox is closed")]
MailboxClosed,
#[error("The requested actor's mailbox has timed out")]
MailboxTimeout,
#[error("Invalid algorithm provided to verifier")] #[error("Invalid algorithm provided to verifier")]
Algorithm, Algorithm,
#[error("Wrong ActivityPub kind")]
Kind,
#[error("Object has already been relayed")] #[error("Object has already been relayed")]
Duplicate, Duplicate,
@ -87,3 +98,12 @@ impl From<rsa::errors::Error> for MyError {
MyError::Rsa(e) MyError::Rsa(e)
} }
} }
impl From<MailboxError> for MyError {
fn from(m: MailboxError) -> MyError {
match m {
MailboxError::Closed => MyError::MailboxClosed,
MailboxError::Timeout => MyError::MailboxTimeout,
}
}
}

View file

@ -1,9 +1,8 @@
use crate::{ use crate::{
apub::{AcceptedActors, AcceptedObjects, ValidTypes}, apub::{AcceptedActors, AcceptedObjects, ValidTypes},
db::{add_listener, remove_listener}, db_actor::Db,
db_actor::{DbActor, DbQuery, Pool},
error::MyError, error::MyError,
requests::{deliver, deliver_many, fetch_actor}, requests::Requests,
response, response,
state::{State, UrlKind}, state::{State, UrlKind},
}; };
@ -12,27 +11,20 @@ use activitystreams::{
context, context,
primitives::XsdAnyUri, primitives::XsdAnyUri,
}; };
use actix::Addr; use actix_web::{web, HttpResponse};
use actix_web::{client::Client, web, HttpResponse};
use futures::join; use futures::join;
use http_signature_normalization_actix::middleware::SignatureVerified; use http_signature_normalization_actix::middleware::SignatureVerified;
use log::error;
pub async fn inbox( pub async fn inbox(
db_actor: web::Data<Addr<DbActor>>, db: web::Data<Db>,
state: web::Data<State>, state: web::Data<State>,
client: web::Data<Client>, client: web::Data<Requests>,
input: web::Json<AcceptedObjects>, input: web::Json<AcceptedObjects>,
verified: SignatureVerified, verified: SignatureVerified,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, MyError> {
let input = input.into_inner(); let input = input.into_inner();
let actor = fetch_actor( let actor = client.fetch_actor(&input.actor).await?;
state.clone().into_inner(),
client.clone().into_inner(),
&input.actor,
)
.await?;
let (is_blocked, is_whitelisted) = let (is_blocked, is_whitelisted) =
join!(state.is_blocked(&actor.id), state.is_whitelisted(&actor.id),); join!(state.is_blocked(&actor.id), state.is_whitelisted(&actor.id),);
@ -46,6 +38,7 @@ pub async fn inbox(
} }
if actor.public_key.id.as_str() != verified.key_id() { if actor.public_key.id.as_str() != verified.key_id() {
log::error!("Bad actor, more info: {:?}", input);
return Err(MyError::BadActor( return Err(MyError::BadActor(
actor.public_key.id.to_string(), actor.public_key.id.to_string(),
verified.key_id().to_owned(), verified.key_id().to_owned(),
@ -54,81 +47,68 @@ pub async fn inbox(
match input.kind { match input.kind {
ValidTypes::Announce | ValidTypes::Create => { ValidTypes::Announce | ValidTypes::Create => {
handle_relay(state, client, input, actor).await handle_relay(&state, &client, input, actor).await
} }
ValidTypes::Follow => handle_follow(db_actor, state, client, input, actor).await, ValidTypes::Follow => handle_follow(&db, &state, &client, input, actor).await,
ValidTypes::Delete | ValidTypes::Update => { ValidTypes::Delete | ValidTypes::Update => {
handle_forward(state, client, input, actor).await handle_forward(&state, &client, input, actor).await
} }
ValidTypes::Undo => handle_undo(db_actor, state, client, input, actor).await, ValidTypes::Undo => handle_undo(&db, &state, &client, input, actor).await,
} }
} }
async fn handle_undo( async fn handle_undo(
db_actor: web::Data<Addr<DbActor>>, db: &Db,
state: web::Data<State>, state: &State,
client: web::Data<Client>, client: &Requests,
input: AcceptedObjects, input: AcceptedObjects,
actor: AcceptedActors, actor: AcceptedActors,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, MyError> {
if !input.object.is_kind("Follow") { if !input.object.is_kind("Follow") {
return Err(MyError::Kind); return Err(MyError::Kind(
input.object.kind().unwrap_or("unknown").to_owned(),
));
} }
let my_id: XsdAnyUri = state.generate_url(UrlKind::Actor).parse()?; let my_id: XsdAnyUri = state.generate_url(UrlKind::Actor).parse()?;
if !input.object.child_object_is(&my_id) { if !input.object.child_object_is(&my_id) {
log::error!("Wrong actor, more info: {:?}", input);
return Err(MyError::WrongActor(input.object.id().to_string())); return Err(MyError::WrongActor(input.object.id().to_string()));
} }
let inbox = actor.inbox().to_owned(); let inbox = actor.inbox().to_owned();
db.remove_listener(inbox);
db_actor.do_send(DbQuery(move |pool: Pool| { let undo = generate_undo_follow(state, &actor.id, &my_id)?;
let inbox = inbox.clone();
async move { let client2 = client.clone();
let conn = pool.get().await?; let inbox = actor.inbox().clone();
remove_listener(&conn, &inbox).await.map_err(|e| {
error!("Error removing listener, {}", e);
e
})
}
}));
let actor_inbox = actor.inbox().clone();
let undo = generate_undo_follow(&state, &actor.id, &my_id)?;
let undo2 = undo.clone(); let undo2 = undo.clone();
actix::Arbiter::spawn(async move { actix::Arbiter::spawn(async move {
let _ = deliver( let _ = client2.deliver(inbox, &undo2).await;
&state.into_inner(),
&client.into_inner(),
actor_inbox,
&undo2,
)
.await;
}); });
Ok(response(undo)) Ok(response(undo))
} }
async fn handle_forward( async fn handle_forward(
state: web::Data<State>, state: &State,
client: web::Data<Client>, client: &Requests,
input: AcceptedObjects, input: AcceptedObjects,
actor: AcceptedActors, actor: AcceptedActors,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, MyError> {
let object_id = input.object.id(); let object_id = input.object.id();
let inboxes = get_inboxes(&state, &actor, &object_id).await?; let inboxes = get_inboxes(state, &actor, &object_id).await?;
deliver_many(&state, &client, inboxes, input.clone()); client.deliver_many(inboxes, input.clone());
Ok(response(input)) Ok(response(input))
} }
async fn handle_relay( async fn handle_relay(
state: web::Data<State>, state: &State,
client: web::Data<Client>, client: &Requests,
input: AcceptedObjects, input: AcceptedObjects,
actor: AcceptedActors, actor: AcceptedActors,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, MyError> {
@ -140,9 +120,9 @@ async fn handle_relay(
let activity_id: XsdAnyUri = state.generate_url(UrlKind::Activity).parse()?; let activity_id: XsdAnyUri = state.generate_url(UrlKind::Activity).parse()?;
let announce = generate_announce(&state, &activity_id, object_id)?; let announce = generate_announce(state, &activity_id, object_id)?;
let inboxes = get_inboxes(&state, &actor, &object_id).await?; let inboxes = get_inboxes(state, &actor, &object_id).await?;
deliver_many(&state, &client, inboxes, announce.clone()); client.deliver_many(inboxes, announce.clone());
state.cache(object_id.to_owned(), activity_id).await; state.cache(object_id.to_owned(), activity_id).await;
@ -150,9 +130,9 @@ async fn handle_relay(
} }
async fn handle_follow( async fn handle_follow(
db_actor: web::Data<Addr<DbActor>>, db: &Db,
state: web::Data<State>, state: &State,
client: web::Data<Client>, client: &Requests,
input: AcceptedObjects, input: AcceptedObjects,
actor: AcceptedActors, actor: AcceptedActors,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, MyError> {
@ -165,46 +145,26 @@ async fn handle_follow(
let is_listener = state.is_listener(&actor.id).await; let is_listener = state.is_listener(&actor.id).await;
if !is_listener { if !is_listener {
let follow = generate_follow(state, &actor.id, &my_id)?;
let inbox = actor.inbox().to_owned(); let inbox = actor.inbox().to_owned();
db_actor.do_send(DbQuery(move |pool: Pool| { db.add_listener(inbox);
let inbox = inbox.clone();
async move {
let conn = pool.get().await?;
add_listener(&conn, &inbox).await.map_err(|e| {
error!("Error adding listener, {}", e);
e
})
}
}));
let actor_inbox = actor.inbox().clone();
let follow = generate_follow(&state, &actor.id, &my_id)?;
let state2 = state.clone();
let client2 = client.clone(); let client2 = client.clone();
let inbox = actor.inbox().clone();
let follow2 = follow.clone();
actix::Arbiter::spawn(async move { actix::Arbiter::spawn(async move {
let _ = deliver( let _ = client2.deliver(inbox, &follow2).await;
&state2.into_inner(),
&client2.into_inner(),
actor_inbox,
&follow,
)
.await;
}); });
} }
let actor_inbox = actor.inbox().clone(); let accept = generate_accept_follow(state, &actor.id, &input.id, &my_id)?;
let accept = generate_accept_follow(&state, &actor.id, &input.id, &my_id)?;
let client2 = client.clone();
let inbox = actor.inbox().clone();
let accept2 = accept.clone(); let accept2 = accept.clone();
actix::Arbiter::spawn(async move { actix::Arbiter::spawn(async move {
let _ = deliver( let _ = client2.deliver(inbox, &accept2).await;
&state.into_inner(),
&client.into_inner(),
actor_inbox,
&accept2,
)
.await;
}); });
Ok(response(accept)) Ok(response(accept))
@ -212,7 +172,7 @@ async fn handle_follow(
// Generate a type that says "I want to stop following you" // Generate a type that says "I want to stop following you"
fn generate_undo_follow( fn generate_undo_follow(
state: &web::Data<State>, state: &State,
actor_id: &XsdAnyUri, actor_id: &XsdAnyUri,
my_id: &XsdAnyUri, my_id: &XsdAnyUri,
) -> Result<Undo, MyError> { ) -> Result<Undo, MyError> {
@ -240,7 +200,7 @@ fn generate_undo_follow(
// Generate a type that says "Look at this object" // Generate a type that says "Look at this object"
fn generate_announce( fn generate_announce(
state: &web::Data<State>, state: &State,
activity_id: &XsdAnyUri, activity_id: &XsdAnyUri,
object_id: &XsdAnyUri, object_id: &XsdAnyUri,
) -> Result<Announce, MyError> { ) -> Result<Announce, MyError> {
@ -262,7 +222,7 @@ fn generate_announce(
// Generate a type that says "I want to follow you" // Generate a type that says "I want to follow you"
fn generate_follow( fn generate_follow(
state: &web::Data<State>, state: &State,
actor_id: &XsdAnyUri, actor_id: &XsdAnyUri,
my_id: &XsdAnyUri, my_id: &XsdAnyUri,
) -> Result<Follow, MyError> { ) -> Result<Follow, MyError> {
@ -284,7 +244,7 @@ fn generate_follow(
// Generate a type that says "I accept your follow request" // Generate a type that says "I accept your follow request"
fn generate_accept_follow( fn generate_accept_follow(
state: &web::Data<State>, state: &State,
actor_id: &XsdAnyUri, actor_id: &XsdAnyUri,
input_id: &XsdAnyUri, input_id: &XsdAnyUri,
my_id: &XsdAnyUri, my_id: &XsdAnyUri,
@ -311,7 +271,7 @@ fn generate_accept_follow(
} }
async fn get_inboxes( async fn get_inboxes(
state: &web::Data<State>, state: &State,
actor: &AcceptedActors, actor: &AcceptedActors,
object_id: &XsdAnyUri, object_id: &XsdAnyUri,
) -> Result<Vec<XsdAnyUri>, MyError> { ) -> Result<Vec<XsdAnyUri>, MyError> {

View file

@ -1,7 +1,5 @@
use activitystreams::{actor::apub::Application, context, endpoint::EndpointProperties}; use activitystreams::{actor::apub::Application, context, endpoint::EndpointProperties};
use actix_web::{ use actix_web::{middleware::Logger, web, App, HttpResponse, HttpServer, Responder};
client::Client, middleware::Logger, web, App, HttpResponse, HttpServer, Responder,
};
use bb8_postgres::tokio_postgres; use bb8_postgres::tokio_postgres;
use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature}; use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature};
use rsa_pem::KeyExt; use rsa_pem::KeyExt;
@ -21,7 +19,7 @@ mod webfinger;
use self::{ use self::{
apub::PublicKey, apub::PublicKey,
db_actor::DbActor, db_actor::Db,
error::MyError, error::MyError,
label::ArbiterLabelFactory, label::ArbiterLabelFactory,
state::{State, UrlKind}, state::{State, UrlKind},
@ -86,34 +84,31 @@ async fn main() -> Result<(), anyhow::Error> {
let arbiter_labeler = ArbiterLabelFactory::new(); let arbiter_labeler = ArbiterLabelFactory::new();
let db_actor = DbActor::new(pg_config.clone()); let db = Db::new(pg_config.clone());
arbiter_labeler.clone().set_label(); arbiter_labeler.clone().set_label();
let state: State = db_actor let state: State = db
.send(db_actor::DbQuery(move |pool| { .execute_inline(move |pool| State::hydrate(use_https, use_whitelist, hostname, pool))
State::hydrate(use_https, use_whitelist, hostname, pool)
}))
.await?
.await??; .await??;
let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone());
HttpServer::new(move || { HttpServer::new(move || {
let actor = DbActor::new(pg_config.clone());
arbiter_labeler.clone().set_label(); arbiter_labeler.clone().set_label();
let client = Client::default(); let state = state.clone();
let actor = Db::new(pg_config.clone());
App::new() App::new()
.wrap(Logger::default()) .wrap(Logger::default())
.data(actor) .data(actor)
.data(state.clone()) .data(state.clone())
.data(client.clone()) .data(state.requests())
.service(web::resource("/").route(web::get().to(index))) .service(web::resource("/").route(web::get().to(index)))
.service( .service(
web::resource("/inbox") web::resource("/inbox")
.wrap(VerifyDigest::new(Sha256::new())) .wrap(VerifyDigest::new(Sha256::new()))
.wrap(VerifySignature::new( .wrap(VerifySignature::new(
MyVerify(state.clone(), client), MyVerify(state.requests()),
Default::default(), Default::default(),
)) ))
.route(web::post().to(inbox::inbox)), .route(web::post().to(inbox::inbox)),

View file

@ -1,131 +1,157 @@
use crate::{ use crate::{apub::AcceptedActors, error::MyError, state::ActorCache};
apub::AcceptedActors,
error::MyError,
state::{State, UrlKind},
};
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use actix_web::{client::Client, web}; use actix::Arbiter;
use actix_web::client::Client;
use futures::stream::StreamExt;
use http_signature_normalization_actix::prelude::*;
use log::error; use log::error;
use rsa::{hash::Hashes, padding::PaddingScheme, RSAPrivateKey};
use sha2::{Digest, Sha256};
pub async fn fetch_actor( #[derive(Clone)]
state: std::sync::Arc<State>, pub struct Requests {
client: std::sync::Arc<Client>, client: Client,
actor_id: &XsdAnyUri, key_id: String,
) -> Result<AcceptedActors, MyError> { private_key: RSAPrivateKey,
use http_signature_normalization_actix::prelude::*; actor_cache: ActorCache,
config: Config,
}
if let Some(actor) = state.get_actor(actor_id).await { impl Requests {
return Ok(actor); pub fn new(key_id: String, private_key: RSAPrivateKey, actor_cache: ActorCache) -> Self {
} Requests {
client: Client::default(),
let key_id = state.generate_url(UrlKind::MainKey);
let mut res = client
.get(actor_id.as_str())
.header("Accept", "application/activity+json")
.signature(
&Config::default().dont_use_created_field(),
key_id, key_id,
|signing_string| state.sign(signing_string), private_key,
)? actor_cache,
.send() config: Config::default().dont_use_created_field(),
.await
.map_err(|e| {
error!("Couldn't send request to {} for actor, {}", actor_id, e);
MyError::SendRequest
})?;
if !res.status().is_success() {
error!("Invalid status code for actor fetch, {}", res.status());
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
error!("Response, {}", s);
}
} }
return Err(MyError::Status);
} }
let actor: AcceptedActors = res.json().await.map_err(|e| { pub async fn fetch_actor(&self, actor_id: &XsdAnyUri) -> Result<AcceptedActors, MyError> {
error!("Coudn't fetch actor from {}, {}", actor_id, e); if let Some(actor) = self.get_actor(actor_id).await {
MyError::ReceiveResponse return Ok(actor);
})?;
state.cache_actor(actor_id.to_owned(), actor.clone()).await;
Ok(actor)
}
pub fn deliver_many<T>(
state: &web::Data<State>,
client: &web::Data<Client>,
inboxes: Vec<XsdAnyUri>,
item: T,
) where
T: serde::ser::Serialize + 'static,
{
let client = client.clone().into_inner();
let state = state.clone().into_inner();
actix::Arbiter::spawn(async move {
use futures::stream::StreamExt;
let mut unordered = futures::stream::FuturesUnordered::new();
for inbox in inboxes {
unordered.push(deliver(&state, &client, inbox, &item));
} }
while let Some(_) = unordered.next().await {} let actor: AcceptedActors = self.fetch(actor_id.as_str()).await?;
});
}
pub async fn deliver<T>( self.cache_actor(actor_id.to_owned(), actor.clone()).await;
state: &std::sync::Arc<State>,
client: &std::sync::Arc<Client>,
inbox: XsdAnyUri,
item: &T,
) -> Result<(), MyError>
where
T: serde::ser::Serialize,
{
use http_signature_normalization_actix::prelude::*;
use sha2::{Digest, Sha256};
let mut digest = Sha256::new(); Ok(actor)
let key_id = state.generate_url(UrlKind::MainKey);
let item_string = serde_json::to_string(item)?;
let mut res = client
.post(inbox.as_str())
.header("Accept", "application/activity+json")
.header("Content-Type", "application/activity+json")
.header("User-Agent", "Aode Relay v0.1.0")
.signature_with_digest(
&Config::default().dont_use_created_field(),
&key_id,
&mut digest,
item_string,
|signing_string| state.sign(signing_string),
)?
.send()
.await
.map_err(|e| {
error!("Couldn't send deliver request to {}, {}", inbox, e);
MyError::SendRequest
})?;
if !res.status().is_success() {
error!("Invalid response status from {}, {}", inbox, res.status());
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
error!("Response, {}", s);
}
}
return Err(MyError::Status);
} }
Ok(()) pub async fn fetch<T>(&self, url: &str) -> Result<T, MyError>
where
T: serde::de::DeserializeOwned,
{
let mut res = self
.client
.get(url)
.header("Accept", "application/activity+json")
.signature(&self.config, &self.key_id, |signing_string| {
self.sign(signing_string)
})?
.send()
.await
.map_err(|e| {
error!("Couldn't send request to {}, {}", url, e);
MyError::SendRequest
})?;
if !res.status().is_success() {
error!("Invalid status code for fetch, {}", res.status());
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
error!("Response, {}", s);
}
}
return Err(MyError::Status);
}
res.json().await.map_err(|e| {
error!("Coudn't fetch json from {}, {}", url, e);
MyError::ReceiveResponse
})
}
pub fn deliver_many<T>(&self, inboxes: Vec<XsdAnyUri>, item: T)
where
T: serde::ser::Serialize + 'static,
{
let this = self.clone();
Arbiter::spawn(async move {
let mut unordered = futures::stream::FuturesUnordered::new();
for inbox in inboxes {
unordered.push(this.deliver(inbox, &item));
}
while let Some(_) = unordered.next().await {}
});
}
pub async fn deliver<T>(&self, inbox: XsdAnyUri, item: &T) -> Result<(), MyError>
where
T: serde::ser::Serialize,
{
let mut digest = Sha256::new();
let item_string = serde_json::to_string(item)?;
let mut res = self
.client
.post(inbox.as_str())
.header("Accept", "application/activity+json")
.header("Content-Type", "application/activity+json")
.header("User-Agent", "Aode Relay v0.1.0")
.signature_with_digest(
&self.config,
&self.key_id,
&mut digest,
item_string,
|signing_string| self.sign(signing_string),
)?
.send()
.await
.map_err(|e| {
error!("Couldn't send deliver request to {}, {}", inbox, e);
MyError::SendRequest
})?;
if !res.status().is_success() {
error!("Invalid response status from {}, {}", inbox, res.status());
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
error!("Response, {}", s);
}
}
return Err(MyError::Status);
}
Ok(())
}
fn sign(&self, signing_string: &str) -> Result<String, crate::error::MyError> {
let hashed = Sha256::digest(signing_string.as_bytes());
let bytes =
self.private_key
.sign(PaddingScheme::PKCS1v15, Some(&Hashes::SHA2_256), &hashed)?;
Ok(base64::encode(bytes))
}
async fn get_actor(&self, actor_id: &XsdAnyUri) -> Option<AcceptedActors> {
let cache = self.actor_cache.clone();
let read_guard = cache.read().await;
read_guard.get(actor_id).cloned()
}
async fn cache_actor(&self, actor_id: XsdAnyUri, actor: AcceptedActors) {
let cache = self.actor_cache.clone();
let mut write_guard = cache.write().await;
write_guard.insert(actor_id, actor, std::time::Duration::from_secs(3600));
}
} }

View file

@ -1,3 +1,4 @@
use crate::{apub::AcceptedActors, db_actor::Pool, requests::Requests};
use activitystreams::primitives::XsdAnyUri; use activitystreams::primitives::XsdAnyUri;
use anyhow::Error; use anyhow::Error;
use bb8_postgres::tokio_postgres::Client; use bb8_postgres::tokio_postgres::Client;
@ -11,12 +12,12 @@ use tokio::sync::RwLock;
use ttl_cache::TtlCache; use ttl_cache::TtlCache;
use uuid::Uuid; use uuid::Uuid;
use crate::{apub::AcceptedActors, db_actor::Pool}; pub type ActorCache = Arc<RwLock<TtlCache<XsdAnyUri, AcceptedActors>>>;
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
pub settings: Settings, pub settings: Settings,
actor_cache: Arc<RwLock<TtlCache<XsdAnyUri, AcceptedActors>>>, actor_cache: ActorCache,
actor_id_cache: Arc<RwLock<LruCache<XsdAnyUri, XsdAnyUri>>>, actor_id_cache: Arc<RwLock<LruCache<XsdAnyUri, XsdAnyUri>>>,
blocks: Arc<RwLock<HashSet<String>>>, blocks: Arc<RwLock<HashSet<String>>>,
whitelists: Arc<RwLock<HashSet<String>>>, whitelists: Arc<RwLock<HashSet<String>>>,
@ -98,19 +99,17 @@ impl Settings {
fn generate_resource(&self) -> String { fn generate_resource(&self) -> String {
format!("relay@{}", self.hostname) format!("relay@{}", self.hostname)
} }
fn sign(&self, signing_string: &str) -> Result<String, crate::error::MyError> {
use rsa::{hash::Hashes, padding::PaddingScheme};
use sha2::{Digest, Sha256};
let hashed = Sha256::digest(signing_string.as_bytes());
let bytes =
self.private_key
.sign(PaddingScheme::PKCS1v15, Some(&Hashes::SHA2_256), &hashed)?;
Ok(base64::encode(bytes))
}
} }
impl State { impl State {
pub fn requests(&self) -> Requests {
Requests::new(
self.generate_url(UrlKind::MainKey),
self.settings.private_key.clone(),
self.actor_cache.clone(),
)
}
pub fn generate_url(&self, kind: UrlKind) -> String { pub fn generate_url(&self, kind: UrlKind) -> String {
self.settings.generate_url(kind) self.settings.generate_url(kind)
} }
@ -119,10 +118,6 @@ impl State {
self.settings.generate_resource() self.settings.generate_resource()
} }
pub fn sign(&self, signing_string: &str) -> Result<String, crate::error::MyError> {
self.settings.sign(signing_string)
}
pub async fn bust_whitelist(&self, whitelist: &str) { pub async fn bust_whitelist(&self, whitelist: &str) {
let hs = self.whitelists.clone(); let hs = self.whitelists.clone();
@ -196,20 +191,6 @@ impl State {
read_guard.contains(actor_id) read_guard.contains(actor_id)
} }
pub async fn get_actor(&self, actor_id: &XsdAnyUri) -> Option<AcceptedActors> {
let cache = self.actor_cache.clone();
let read_guard = cache.read().await;
read_guard.get(actor_id).cloned()
}
pub async fn cache_actor(&self, actor_id: XsdAnyUri, actor: AcceptedActors) {
let cache = self.actor_cache.clone();
let mut write_guard = cache.write().await;
write_guard.insert(actor_id, actor, std::time::Duration::from_secs(3600));
}
pub async fn is_cached(&self, object_id: &XsdAnyUri) -> bool { pub async fn is_cached(&self, object_id: &XsdAnyUri) -> bool {
let cache = self.actor_id_cache.clone(); let cache = self.actor_id_cache.clone();

View file

@ -1,14 +1,13 @@
use crate::{error::MyError, requests::fetch_actor, state::State}; use crate::{error::MyError, requests::Requests};
use actix_web::client::Client;
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm};
use log::{debug, error, warn}; use log::{debug, error, warn};
use rsa::{hash::Hashes, padding::PaddingScheme, PublicKey, RSAPublicKey}; use rsa::{hash::Hashes, padding::PaddingScheme, PublicKey, RSAPublicKey};
use rsa_pem::KeyExt; use rsa_pem::KeyExt;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{future::Future, pin::Pin, sync::Arc}; use std::{future::Future, pin::Pin};
#[derive(Clone)] #[derive(Clone)]
pub struct MyVerify(pub State, pub Client); pub struct MyVerify(pub Requests);
impl SignatureVerify for MyVerify { impl SignatureVerify for MyVerify {
type Error = MyError; type Error = MyError;
@ -25,11 +24,10 @@ impl SignatureVerify for MyVerify {
let signature = signature.to_owned(); let signature = signature.to_owned();
let signing_string = signing_string.to_owned(); let signing_string = signing_string.to_owned();
let state = Arc::new(self.0.clone()); let client = self.0.clone();
let client = Arc::new(self.1.clone());
Box::pin(async move { Box::pin(async move {
verify(state, client, algorithm, key_id, signature, signing_string) verify(client, algorithm, key_id, signature, signing_string)
.await .await
.map_err(|e| { .map_err(|e| {
error!("Failed to verify, {}", e); error!("Failed to verify, {}", e);
@ -40,15 +38,14 @@ impl SignatureVerify for MyVerify {
} }
async fn verify( async fn verify(
state: Arc<State>, client: Requests,
client: Arc<Client>,
algorithm: Option<Algorithm>, algorithm: Option<Algorithm>,
key_id: String, key_id: String,
signature: String, signature: String,
signing_string: String, signing_string: String,
) -> Result<bool, MyError> { ) -> Result<bool, MyError> {
debug!("Fetching actor"); debug!("Fetching actor");
let actor = fetch_actor(state, client, &key_id.parse()?).await?; let actor = client.fetch_actor(&key_id.parse()?).await?;
debug!("Parsing public key"); debug!("Parsing public key");
let public_key = RSAPublicKey::from_pem_pkcs8(&actor.public_key.public_key_pem)?; let public_key = RSAPublicKey::from_pem_pkcs8(&actor.public_key.public_key_pem)?;