Move requests to requests module, respond with 200

This commit is contained in:
asonix 2020-03-17 12:15:16 -05:00
parent 179a12d01e
commit 9fc507210a
4 changed files with 132 additions and 120 deletions

View file

@ -1,7 +1,10 @@
use crate::{
apub::{AcceptedActors, AcceptedObjects, ValidTypes},
db::{add_listener, remove_listener},
db_actor::{DbActor, DbQuery, Pool},
error::MyError,
requests::{deliver, deliver_many, fetch_actor},
response,
state::{State, UrlKind},
};
use activitystreams::{
@ -41,15 +44,6 @@ pub async fn inbox(
}
}
pub fn response<T>(item: T) -> HttpResponse
where
T: serde::ser::Serialize,
{
HttpResponse::Accepted()
.content_type("application/activity+json")
.json(item)
}
async fn handle_undo(
db_actor: web::Data<Addr<DbActor>>,
state: web::Data<State>,
@ -69,9 +63,7 @@ async fn handle_undo(
async move {
let conn = pool.get().await?;
crate::db::remove_listener(&conn, &inbox)
.await
.map_err(|e| {
remove_listener(&conn, &inbox).await.map_err(|e| {
error!("Error removing listener, {}", e);
e
})
@ -189,7 +181,7 @@ async fn handle_follow(
async move {
let conn = pool.get().await?;
crate::db::add_listener(&conn, &inbox).await.map_err(|e| {
add_listener(&conn, &inbox).await.map_err(|e| {
error!("Error adding listener, {}", e);
e
})
@ -225,106 +217,6 @@ async fn handle_follow(
Ok(response(accept))
}
pub async fn fetch_actor(
state: std::sync::Arc<State>,
client: std::sync::Arc<Client>,
actor_id: &XsdAnyUri,
) -> Result<AcceptedActors, MyError> {
if let Some(actor) = state.get_actor(actor_id).await {
return Ok(actor);
}
let actor: AcceptedActors = client
.get(actor_id.as_str())
.header("Accept", "application/activity+json")
.send()
.await
.map_err(|e| {
error!("Couldn't send request to {} for actor, {}", actor_id, e);
MyError::SendRequest
})?
.json()
.await
.map_err(|e| {
error!("Coudn't fetch actor from {}, {}", actor_id, e);
MyError::ReceiveResponse
})?;
state.cache_actor(actor_id.to_owned(), actor.clone()).await;
Ok(actor)
}
fn deliver_many<T>(
state: web::Data<State>,
client: web::Data<Client>,
inboxes: Vec<XsdAnyUri>,
item: T,
) where
T: serde::ser::Serialize + 'static,
{
let client = client.into_inner();
let state = state.into_inner();
actix::Arbiter::spawn(async move {
use futures::stream::StreamExt;
let mut unordered = futures::stream::FuturesUnordered::new();
for inbox in inboxes {
unordered.push(deliver(&state, &client, inbox, &item));
}
while let Some(_) = unordered.next().await {}
});
}
async fn deliver<T>(
state: &std::sync::Arc<State>,
client: &std::sync::Arc<Client>,
inbox: XsdAnyUri,
item: &T,
) -> Result<(), MyError>
where
T: serde::ser::Serialize,
{
use http_signature_normalization_actix::prelude::*;
use sha2::{Digest, Sha256};
let config = Config::default();
let mut digest = Sha256::new();
let key_id = state.generate_url(UrlKind::Actor);
let item_string = serde_json::to_string(item)?;
let res = client
.post(inbox.as_str())
.header("Accept", "application/activity+json")
.header("Content-Type", "application/activity+json")
.header("User-Agent", "Aode Relay v0.1.0")
.signature_with_digest(
&config,
&key_id,
&mut digest,
item_string,
|signing_string| state.sign(signing_string),
)?
.send()
.await
.map_err(|e| {
error!("Couldn't send deliver request to {}, {}", inbox, e);
MyError::SendRequest
})?;
if !res.status().is_success() {
error!("Invalid response status from {}, {}", inbox, res.status());
return Err(MyError::Status);
}
Ok(())
}
async fn get_inboxes(
state: &web::Data<State>,
actor: &AcceptedActors,

View file

@ -1,5 +1,7 @@
use activitystreams::{actor::apub::Application, context, endpoint::EndpointProperties};
use actix_web::{client::Client, middleware::Logger, web, App, HttpServer, Responder};
use actix_web::{
client::Client, middleware::Logger, web, App, HttpResponse, HttpServer, Responder,
};
use bb8_postgres::tokio_postgres;
use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature};
use rsa_pem::KeyExt;
@ -12,6 +14,7 @@ mod error;
mod inbox;
mod label;
mod notify;
mod requests;
mod state;
mod verifier;
mod webfinger;
@ -26,6 +29,15 @@ use self::{
webfinger::RelayResolver,
};
pub fn response<T>(item: T) -> HttpResponse
where
T: serde::ser::Serialize,
{
HttpResponse::Ok()
.content_type("application/activity+json")
.json(item)
}
async fn index() -> impl Responder {
"hewwo, mr obama"
}
@ -59,7 +71,7 @@ async fn actor_route(state: web::Data<State>) -> Result<impl Responder, MyError>
public_key_pem: state.settings.public_key.to_pem_pkcs8()?,
};
Ok(inbox::response(public_key.extend(application)))
Ok(response(public_key.extend(application)))
}
#[actix_rt::main]

108
src/requests.rs Normal file
View file

@ -0,0 +1,108 @@
use crate::{
apub::AcceptedActors,
error::MyError,
state::{State, UrlKind},
};
use activitystreams::primitives::XsdAnyUri;
use actix_web::{client::Client, web};
use log::error;
pub async fn fetch_actor(
state: std::sync::Arc<State>,
client: std::sync::Arc<Client>,
actor_id: &XsdAnyUri,
) -> Result<AcceptedActors, MyError> {
if let Some(actor) = state.get_actor(actor_id).await {
return Ok(actor);
}
let actor: AcceptedActors = client
.get(actor_id.as_str())
.header("Accept", "application/activity+json")
.send()
.await
.map_err(|e| {
error!("Couldn't send request to {} for actor, {}", actor_id, e);
MyError::SendRequest
})?
.json()
.await
.map_err(|e| {
error!("Coudn't fetch actor from {}, {}", actor_id, e);
MyError::ReceiveResponse
})?;
state.cache_actor(actor_id.to_owned(), actor.clone()).await;
Ok(actor)
}
pub fn deliver_many<T>(
state: web::Data<State>,
client: web::Data<Client>,
inboxes: Vec<XsdAnyUri>,
item: T,
) where
T: serde::ser::Serialize + 'static,
{
let client = client.into_inner();
let state = state.into_inner();
actix::Arbiter::spawn(async move {
use futures::stream::StreamExt;
let mut unordered = futures::stream::FuturesUnordered::new();
for inbox in inboxes {
unordered.push(deliver(&state, &client, inbox, &item));
}
while let Some(_) = unordered.next().await {}
});
}
pub async fn deliver<T>(
state: &std::sync::Arc<State>,
client: &std::sync::Arc<Client>,
inbox: XsdAnyUri,
item: &T,
) -> Result<(), MyError>
where
T: serde::ser::Serialize,
{
use http_signature_normalization_actix::prelude::*;
use sha2::{Digest, Sha256};
let config = Config::default();
let mut digest = Sha256::new();
let key_id = state.generate_url(UrlKind::Actor);
let item_string = serde_json::to_string(item)?;
let res = client
.post(inbox.as_str())
.header("Accept", "application/activity+json")
.header("Content-Type", "application/activity+json")
.header("User-Agent", "Aode Relay v0.1.0")
.signature_with_digest(
&config,
&key_id,
&mut digest,
item_string,
|signing_string| state.sign(signing_string),
)?
.send()
.await
.map_err(|e| {
error!("Couldn't send deliver request to {}, {}", inbox, e);
MyError::SendRequest
})?;
if !res.status().is_success() {
error!("Invalid response status from {}, {}", inbox, res.status());
return Err(MyError::Status);
}
Ok(())
}

View file

@ -1,4 +1,4 @@
use crate::{error::MyError, state::State};
use crate::{error::MyError, requests::fetch_actor, state::State};
use actix_web::client::Client;
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm};
use rsa::{hash::Hashes, padding::PaddingScheme, PublicKey, RSAPublicKey};
@ -28,7 +28,7 @@ impl SignatureVerify for MyVerify {
let client = Arc::new(self.1.clone());
Box::pin(async move {
let actor = crate::inbox::fetch_actor(state, client, &key_id.parse()?).await?;
let actor = fetch_actor(state, client, &key_id.parse()?).await?;
let public_key = actor.public_key.ok_or(MyError::MissingKey)?;