relay/src/requests.rs

269 lines
7.8 KiB
Rust
Raw Normal View History

2020-03-23 22:17:53 +00:00
use crate::error::MyError;
2020-06-20 04:11:02 +00:00
use activitystreams_new::url::Url;
use actix_web::{client::Client, http::header::Date};
use bytes::Bytes;
2020-03-18 04:35:20 +00:00
use http_signature_normalization_actix::prelude::*;
use log::{debug, info, warn};
2020-07-25 14:33:35 +00:00
use rsa::{hash::Hash, padding::PaddingScheme, RSAPrivateKey};
2020-03-18 04:35:20 +00:00
use sha2::{Digest, Sha256};
use std::{
cell::RefCell,
rc::Rc,
sync::atomic::{AtomicUsize, Ordering},
time::SystemTime,
};
2020-03-18 04:35:20 +00:00
#[derive(Clone)]
pub struct Requests {
client: Rc<RefCell<Client>>,
consecutive_errors: Rc<AtomicUsize>,
error_limit: usize,
2020-03-18 04:35:20 +00:00
key_id: String,
user_agent: String,
2020-03-18 04:35:20 +00:00
private_key: RSAPrivateKey,
config: Config,
}
2020-03-18 04:35:20 +00:00
impl Requests {
2020-03-23 22:17:53 +00:00
pub fn new(key_id: String, private_key: RSAPrivateKey, user_agent: String) -> Self {
2020-03-18 04:35:20 +00:00
Requests {
client: Rc::new(RefCell::new(
Client::build()
.header("User-Agent", user_agent.clone())
.finish(),
)),
consecutive_errors: Rc::new(AtomicUsize::new(0)),
error_limit: 3,
2020-03-18 04:35:20 +00:00
key_id,
user_agent,
2020-03-18 04:35:20 +00:00
private_key,
config: Config::default().dont_use_created_field(),
}
}
fn count_err(&self) {
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
if count + 1 >= self.error_limit {
warn!("{} consecutive errors, rebuilding http client", count);
*self.client.borrow_mut() = Client::build()
.header("User-Agent", self.user_agent.clone())
.finish();
self.reset_err();
}
}
fn reset_err(&self) {
self.consecutive_errors.swap(0, Ordering::Relaxed);
}
pub async fn fetch_json<T>(&self, url: &str) -> Result<T, MyError>
where
T: serde::de::DeserializeOwned,
{
self.do_fetch(url, "application/json").await
}
2020-03-18 04:35:20 +00:00
pub async fn fetch<T>(&self, url: &str) -> Result<T, MyError>
where
T: serde::de::DeserializeOwned,
{
self.do_fetch(url, "application/activity+json").await
}
async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, MyError>
2020-03-18 04:35:20 +00:00
where
T: serde::de::DeserializeOwned,
{
2020-03-30 06:06:13 +00:00
let signer = self.signer();
let client: Client = self.client.borrow().clone();
2020-09-07 18:34:29 +00:00
let req = client.get(url);
let host = req
.get_uri()
.host()
.ok_or(MyError::Host(url.to_string()))?
.to_string();
2020-09-07 20:28:58 +00:00
debug!("Host: {}", host);
2020-09-07 18:34:29 +00:00
let res = req
.header("Host", host)
.header("Accept", accept)
.set(Date(SystemTime::now().into()))
2020-03-30 06:06:13 +00:00
.signature(
self.config.clone(),
self.key_id.clone(),
move |signing_string| signer.sign(signing_string),
)
.await?
2020-03-18 04:35:20 +00:00
.send()
.await;
if res.is_err() {
self.count_err();
}
let mut res = res.map_err(|e| MyError::SendRequest(url.to_string(), e.to_string()))?;
self.reset_err();
2020-03-18 04:35:20 +00:00
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
debug!("Response from {}, {}", url, s);
}
2020-03-18 04:35:20 +00:00
}
}
2020-03-18 04:35:20 +00:00
2020-07-25 15:13:00 +00:00
return Err(MyError::Status(url.to_string(), res.status()));
}
let body = res
.body()
.await
.map_err(|e| MyError::ReceiveResponse(url.to_string(), e.to_string()))?;
Ok(serde_json::from_slice(body.as_ref())?)
}
pub async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), MyError> {
info!("Fetching bytes for {}", url);
2020-03-30 06:06:13 +00:00
let signer = self.signer();
let client: Client = self.client.borrow().clone();
2020-09-07 18:34:29 +00:00
let req = client.get(url);
let host = req
.get_uri()
.host()
.ok_or(MyError::Host(url.to_string()))?
.to_string();
2020-09-07 20:28:58 +00:00
debug!("Host: {}", host);
2020-09-07 18:34:29 +00:00
let res = req
.header("Host", host)
2020-04-21 19:18:18 +00:00
.header("Accept", "*/*")
.set(Date(SystemTime::now().into()))
2020-03-30 06:06:13 +00:00
.signature(
self.config.clone(),
self.key_id.clone(),
move |signing_string| signer.sign(signing_string),
)
.await?
.send()
.await;
if res.is_err() {
self.count_err();
}
let mut res = res.map_err(|e| MyError::SendRequest(url.to_string(), e.to_string()))?;
self.reset_err();
let content_type = if let Some(content_type) = res.headers().get("content-type") {
if let Ok(s) = content_type.to_str() {
s.to_owned()
} else {
return Err(MyError::ContentType);
}
} else {
return Err(MyError::ContentType);
};
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
debug!("Response from {}, {}", url, s);
}
}
}
2020-07-25 15:13:00 +00:00
return Err(MyError::Status(url.to_string(), res.status()));
}
let bytes = match res.body().limit(1024 * 1024 * 4).await {
Err(e) => {
return Err(MyError::ReceiveResponse(url.to_string(), e.to_string()));
}
Ok(bytes) => bytes,
};
Ok((content_type, bytes))
}
2020-06-20 04:11:02 +00:00
pub async fn deliver<T>(&self, inbox: Url, item: &T) -> Result<(), MyError>
2020-03-18 04:35:20 +00:00
where
T: serde::ser::Serialize,
{
2020-03-30 06:06:13 +00:00
let signer = self.signer();
2020-03-18 04:35:20 +00:00
let item_string = serde_json::to_string(item)?;
let client: Client = self.client.borrow().clone();
2020-09-07 18:34:29 +00:00
let req = client.post(inbox.as_str());
let host = req
.get_uri()
.host()
.ok_or(MyError::Host(inbox.to_string()))?
.to_string();
2020-09-07 20:28:58 +00:00
debug!("Host: {}", host);
2020-09-07 18:34:29 +00:00
let res = req
.header("Host", host)
2020-03-18 04:35:20 +00:00
.header("Accept", "application/activity+json")
.header("Content-Type", "application/activity+json")
.set(Date(SystemTime::now().into()))
2020-03-18 04:35:20 +00:00
.signature_with_digest(
2020-03-30 06:06:13 +00:00
self.config.clone(),
self.key_id.clone(),
Sha256::new(),
2020-03-18 04:35:20 +00:00
item_string,
2020-03-30 06:06:13 +00:00
move |signing_string| signer.sign(signing_string),
)
.await?
2020-03-18 04:35:20 +00:00
.send()
.await;
if res.is_err() {
self.count_err();
}
let mut res = res.map_err(|e| MyError::SendRequest(inbox.to_string(), e.to_string()))?;
self.reset_err();
2020-03-18 04:35:20 +00:00
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
debug!("Response from {}, {}", inbox.as_str(), s);
}
2020-03-18 04:35:20 +00:00
}
}
2020-07-25 15:13:00 +00:00
return Err(MyError::Status(inbox.to_string(), res.status()));
2020-03-18 04:35:20 +00:00
}
2020-03-18 04:35:20 +00:00
Ok(())
}
2020-03-30 06:06:13 +00:00
fn signer(&self) -> Signer {
Signer {
private_key: self.private_key.clone(),
}
}
}
struct Signer {
private_key: RSAPrivateKey,
}
impl Signer {
fn sign(&self, signing_string: &str) -> Result<String, MyError> {
2020-03-18 04:35:20 +00:00
let hashed = Sha256::digest(signing_string.as_bytes());
2020-07-25 14:33:35 +00:00
let bytes = self.private_key.sign(
PaddingScheme::PKCS1v15Sign {
hash: Some(Hash::SHA2_256),
},
&hashed,
)?;
2020-03-18 04:35:20 +00:00
Ok(base64::encode(bytes))
}
}