2022-12-21 22:51:17 +00:00
|
|
|
use crate::{
|
|
|
|
data::LastOnline,
|
|
|
|
error::{Error, ErrorKind},
|
|
|
|
};
|
2022-01-17 22:54:45 +00:00
|
|
|
use activitystreams::iri_string::types::IriString;
|
2022-11-18 04:39:26 +00:00
|
|
|
use actix_web::http::header::Date;
|
2023-06-23 18:46:13 +00:00
|
|
|
use awc::{error::SendRequestError, Client, ClientResponse, Connector};
|
2023-01-23 14:38:55 +00:00
|
|
|
use base64::{engine::general_purpose::STANDARD, Engine};
|
2021-11-23 22:19:59 +00:00
|
|
|
use dashmap::DashMap;
|
2023-07-26 23:03:21 +00:00
|
|
|
use http_signature_normalization_actix::{prelude::*, Canceled, Spawn};
|
2022-10-29 17:22:13 +00:00
|
|
|
use rand::thread_rng;
|
2023-01-23 14:56:18 +00:00
|
|
|
use rsa::{
|
|
|
|
pkcs1v15::SigningKey,
|
|
|
|
sha2::{Digest, Sha256},
|
2023-04-28 00:34:23 +00:00
|
|
|
signature::{RandomizedSigner, SignatureEncoding},
|
2023-01-23 14:56:18 +00:00
|
|
|
RsaPrivateKey,
|
|
|
|
};
|
2020-05-23 23:56:27 +00:00
|
|
|
use std::{
|
2023-07-26 23:03:21 +00:00
|
|
|
panic::AssertUnwindSafe,
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, Ordering},
|
|
|
|
Arc,
|
|
|
|
},
|
|
|
|
thread::JoinHandle,
|
2023-07-27 14:26:16 +00:00
|
|
|
time::{Duration, Instant, SystemTime},
|
2020-05-23 23:56:27 +00:00
|
|
|
};
|
2021-12-03 22:17:25 +00:00
|
|
|
use tracing_awc::Tracing;
|
2020-03-18 04:35:20 +00:00
|
|
|
|
2022-01-17 23:57:06 +00:00
|
|
|
const ONE_SECOND: u64 = 1;
|
|
|
|
const ONE_MINUTE: u64 = 60 * ONE_SECOND;
|
|
|
|
const ONE_HOUR: u64 = 60 * ONE_MINUTE;
|
|
|
|
const ONE_DAY: u64 = 24 * ONE_HOUR;
|
|
|
|
|
2020-12-23 18:06:15 +00:00
|
|
|
#[derive(Clone)]
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) struct Breakers {
|
2021-11-23 22:19:59 +00:00
|
|
|
inner: Arc<DashMap<String, Breaker>>,
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
|
2021-09-18 17:55:39 +00:00
|
|
|
impl std::fmt::Debug for Breakers {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
f.debug_struct("Breakers").finish()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-23 18:06:15 +00:00
|
|
|
impl Breakers {
|
2022-01-17 22:54:45 +00:00
|
|
|
fn should_try(&self, url: &IriString) -> bool {
|
|
|
|
if let Some(authority) = url.authority_str() {
|
|
|
|
if let Some(breaker) = self.inner.get(authority) {
|
2021-11-23 22:19:59 +00:00
|
|
|
breaker.should_try()
|
2020-12-29 17:27:14 +00:00
|
|
|
} else {
|
|
|
|
true
|
|
|
|
}
|
2020-12-23 18:06:15 +00:00
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-17 22:54:45 +00:00
|
|
|
fn fail(&self, url: &IriString) {
|
|
|
|
if let Some(authority) = url.authority_str() {
|
2021-02-10 05:45:13 +00:00
|
|
|
let should_write = {
|
2022-01-17 22:54:45 +00:00
|
|
|
if let Some(mut breaker) = self.inner.get_mut(authority) {
|
2021-11-23 22:19:59 +00:00
|
|
|
breaker.fail();
|
2022-11-16 17:23:36 +00:00
|
|
|
if !breaker.should_try() {
|
2023-01-29 19:21:36 +00:00
|
|
|
tracing::warn!("Failed breaker for {authority}");
|
2022-11-16 17:23:36 +00:00
|
|
|
}
|
2021-02-10 05:45:13 +00:00
|
|
|
false
|
|
|
|
} else {
|
|
|
|
true
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if should_write {
|
2022-01-17 22:54:45 +00:00
|
|
|
let mut breaker = self.inner.entry(authority.to_owned()).or_default();
|
2021-11-23 22:19:59 +00:00
|
|
|
breaker.fail();
|
2020-12-29 17:27:14 +00:00
|
|
|
}
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-17 22:54:45 +00:00
|
|
|
fn succeed(&self, url: &IriString) {
|
|
|
|
if let Some(authority) = url.authority_str() {
|
2021-02-10 05:45:13 +00:00
|
|
|
let should_write = {
|
2022-01-17 22:54:45 +00:00
|
|
|
if let Some(mut breaker) = self.inner.get_mut(authority) {
|
2021-11-23 22:19:59 +00:00
|
|
|
breaker.succeed();
|
2021-02-10 05:45:13 +00:00
|
|
|
false
|
|
|
|
} else {
|
|
|
|
true
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if should_write {
|
2022-01-17 22:54:45 +00:00
|
|
|
let mut breaker = self.inner.entry(authority.to_owned()).or_default();
|
2021-11-23 22:19:59 +00:00
|
|
|
breaker.succeed();
|
2020-12-29 17:27:14 +00:00
|
|
|
}
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Breakers {
|
|
|
|
fn default() -> Self {
|
|
|
|
Breakers {
|
2021-11-23 22:19:59 +00:00
|
|
|
inner: Arc::new(DashMap::new()),
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-18 17:55:39 +00:00
|
|
|
#[derive(Debug)]
|
2020-12-23 18:06:15 +00:00
|
|
|
struct Breaker {
|
|
|
|
failures: usize,
|
2022-01-17 23:57:06 +00:00
|
|
|
last_attempt: SystemTime,
|
|
|
|
last_success: SystemTime,
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Breaker {
|
2022-11-16 17:23:36 +00:00
|
|
|
const FAILURE_WAIT: Duration = Duration::from_secs(ONE_DAY);
|
|
|
|
const FAILURE_THRESHOLD: usize = 10;
|
2020-12-23 18:06:15 +00:00
|
|
|
|
|
|
|
fn should_try(&self) -> bool {
|
2022-11-16 17:23:36 +00:00
|
|
|
self.failures < Self::FAILURE_THRESHOLD
|
|
|
|
|| self.last_attempt + Self::FAILURE_WAIT < SystemTime::now()
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn fail(&mut self) {
|
|
|
|
self.failures += 1;
|
2022-01-17 23:57:06 +00:00
|
|
|
self.last_attempt = SystemTime::now();
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn succeed(&mut self) {
|
|
|
|
self.failures = 0;
|
2022-01-17 23:57:06 +00:00
|
|
|
self.last_attempt = SystemTime::now();
|
|
|
|
self.last_success = SystemTime::now();
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Breaker {
|
|
|
|
fn default() -> Self {
|
2022-01-17 23:57:06 +00:00
|
|
|
let now = SystemTime::now();
|
2020-12-23 18:06:15 +00:00
|
|
|
|
|
|
|
Breaker {
|
|
|
|
failures: 0,
|
|
|
|
last_attempt: now,
|
|
|
|
last_success: now,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-18 04:35:20 +00:00
|
|
|
#[derive(Clone)]
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) struct Requests {
|
2023-06-23 18:46:13 +00:00
|
|
|
pool_size: usize,
|
2023-06-23 19:27:20 +00:00
|
|
|
client: Client,
|
2020-03-18 04:35:20 +00:00
|
|
|
key_id: String,
|
2020-05-23 23:56:27 +00:00
|
|
|
user_agent: String,
|
2021-08-01 20:12:06 +00:00
|
|
|
private_key: RsaPrivateKey,
|
2023-07-26 23:03:21 +00:00
|
|
|
config: Config<Spawner>,
|
2020-12-23 18:06:15 +00:00
|
|
|
breakers: Breakers,
|
2022-12-21 22:51:17 +00:00
|
|
|
last_online: Arc<LastOnline>,
|
2020-03-18 04:35:20 +00:00
|
|
|
}
|
2020-03-17 17:15:16 +00:00
|
|
|
|
2021-09-18 17:55:39 +00:00
|
|
|
impl std::fmt::Debug for Requests {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
f.debug_struct("Requests")
|
2023-06-23 18:46:13 +00:00
|
|
|
.field("pool_size", &self.pool_size)
|
2021-09-18 17:55:39 +00:00
|
|
|
.field("key_id", &self.key_id)
|
|
|
|
.field("user_agent", &self.user_agent)
|
|
|
|
.field("config", &self.config)
|
|
|
|
.field("breakers", &self.breakers)
|
|
|
|
.finish()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-23 20:01:56 +00:00
|
|
|
thread_local! {
|
|
|
|
static CLIENT: std::cell::OnceCell<Client> = std::cell::OnceCell::new();
|
|
|
|
}
|
|
|
|
|
2023-07-25 21:06:56 +00:00
|
|
|
pub(crate) fn build_client(user_agent: &str, pool_size: usize, timeout_seconds: u64) -> Client {
|
2023-06-23 20:01:56 +00:00
|
|
|
CLIENT.with(|client| {
|
|
|
|
client
|
|
|
|
.get_or_init(|| {
|
|
|
|
let connector = Connector::new().limit(pool_size);
|
|
|
|
|
|
|
|
Client::builder()
|
|
|
|
.connector(connector)
|
|
|
|
.wrap(Tracing)
|
|
|
|
.add_default_header(("User-Agent", user_agent.to_string()))
|
2023-07-25 21:06:56 +00:00
|
|
|
.timeout(Duration::from_secs(timeout_seconds))
|
2023-06-23 20:01:56 +00:00
|
|
|
.finish()
|
|
|
|
})
|
|
|
|
.clone()
|
|
|
|
})
|
2022-11-16 04:56:38 +00:00
|
|
|
}
|
|
|
|
|
2020-03-18 04:35:20 +00:00
|
|
|
impl Requests {
|
2023-07-26 23:04:09 +00:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2021-02-10 04:17:20 +00:00
|
|
|
pub(crate) fn new(
|
2020-12-23 18:06:15 +00:00
|
|
|
key_id: String,
|
2021-08-01 20:12:06 +00:00
|
|
|
private_key: RsaPrivateKey,
|
2020-12-23 18:06:15 +00:00
|
|
|
user_agent: String,
|
|
|
|
breakers: Breakers,
|
2022-12-21 22:51:17 +00:00
|
|
|
last_online: Arc<LastOnline>,
|
2023-06-23 18:46:13 +00:00
|
|
|
pool_size: usize,
|
2023-07-25 21:06:56 +00:00
|
|
|
timeout_seconds: u64,
|
2023-07-26 23:03:21 +00:00
|
|
|
spawner: Spawner,
|
2020-12-23 18:06:15 +00:00
|
|
|
) -> Self {
|
2020-03-18 04:35:20 +00:00
|
|
|
Requests {
|
2023-06-23 18:46:13 +00:00
|
|
|
pool_size,
|
2023-07-25 21:06:56 +00:00
|
|
|
client: build_client(&user_agent, pool_size, timeout_seconds),
|
2020-03-18 04:35:20 +00:00
|
|
|
key_id,
|
2020-05-23 23:56:27 +00:00
|
|
|
user_agent,
|
2020-03-18 04:35:20 +00:00
|
|
|
private_key,
|
2023-07-26 23:03:21 +00:00
|
|
|
config: Config::new().mastodon_compat().spawner(spawner),
|
2020-12-23 18:06:15 +00:00
|
|
|
breakers,
|
2022-12-21 22:51:17 +00:00
|
|
|
last_online,
|
2020-03-18 04:35:20 +00:00
|
|
|
}
|
2020-03-17 17:15:16 +00:00
|
|
|
}
|
|
|
|
|
2022-11-16 20:29:57 +00:00
|
|
|
pub(crate) fn reset_breaker(&self, iri: &IriString) {
|
|
|
|
self.breakers.succeed(iri);
|
|
|
|
}
|
|
|
|
|
2022-11-16 17:23:36 +00:00
|
|
|
async fn check_response(
|
|
|
|
&self,
|
|
|
|
parsed_url: &IriString,
|
2022-11-16 18:38:34 +00:00
|
|
|
res: Result<ClientResponse, SendRequestError>,
|
|
|
|
) -> Result<ClientResponse, Error> {
|
|
|
|
if res.is_err() {
|
|
|
|
self.breakers.fail(&parsed_url);
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut res =
|
|
|
|
res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?;
|
|
|
|
|
2023-02-25 21:02:16 +00:00
|
|
|
if res.status().is_server_error() {
|
2022-11-16 17:23:36 +00:00
|
|
|
self.breakers.fail(&parsed_url);
|
|
|
|
|
|
|
|
if let Ok(bytes) = res.body().await {
|
|
|
|
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
|
|
|
if !s.is_empty() {
|
2023-07-27 14:26:16 +00:00
|
|
|
tracing::debug!("Response from {parsed_url}, {s}");
|
2022-11-16 17:23:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into());
|
|
|
|
}
|
|
|
|
|
2022-12-21 22:51:17 +00:00
|
|
|
self.last_online.mark_seen(&parsed_url);
|
2022-11-16 17:23:36 +00:00
|
|
|
self.breakers.succeed(&parsed_url);
|
|
|
|
|
2022-11-16 18:38:34 +00:00
|
|
|
Ok(res)
|
2022-11-16 17:23:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
|
2023-02-25 21:02:16 +00:00
|
|
|
pub(crate) async fn fetch_json<T>(&self, url: &IriString) -> Result<T, Error>
|
2020-07-10 22:24:47 +00:00
|
|
|
where
|
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
|
{
|
|
|
|
self.do_fetch(url, "application/json").await
|
|
|
|
}
|
|
|
|
|
2022-12-17 11:57:37 +00:00
|
|
|
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
|
2023-02-25 21:02:16 +00:00
|
|
|
pub(crate) async fn fetch_json_msky<T>(&self, url: &IriString) -> Result<T, Error>
|
2022-12-17 11:57:37 +00:00
|
|
|
where
|
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
|
{
|
2023-02-25 21:02:16 +00:00
|
|
|
let mut res = self
|
|
|
|
.do_deliver(
|
|
|
|
url,
|
|
|
|
&serde_json::json!({}),
|
|
|
|
"application/json",
|
|
|
|
"application/json",
|
|
|
|
)
|
|
|
|
.await?;
|
2022-12-17 11:57:37 +00:00
|
|
|
|
2023-02-25 21:02:16 +00:00
|
|
|
let body = res
|
|
|
|
.body()
|
|
|
|
.await
|
|
|
|
.map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
|
2020-07-10 22:24:47 +00:00
|
|
|
|
2023-02-25 21:02:16 +00:00
|
|
|
Ok(serde_json::from_slice(body.as_ref())?)
|
2022-12-17 11:57:37 +00:00
|
|
|
}
|
|
|
|
|
2023-02-25 21:02:16 +00:00
|
|
|
#[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
|
|
|
|
pub(crate) async fn fetch<T>(&self, url: &IriString) -> Result<T, Error>
|
2022-12-17 11:57:37 +00:00
|
|
|
where
|
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
|
{
|
2023-02-25 21:02:16 +00:00
|
|
|
self.do_fetch(url, "application/activity+json").await
|
2022-12-17 11:57:37 +00:00
|
|
|
}
|
|
|
|
|
2023-02-25 21:02:16 +00:00
|
|
|
async fn do_fetch<T>(&self, url: &IriString, accept: &str) -> Result<T, Error>
|
2020-03-18 04:35:20 +00:00
|
|
|
where
|
|
|
|
T: serde::de::DeserializeOwned,
|
|
|
|
{
|
2023-02-25 21:02:16 +00:00
|
|
|
let mut res = self.do_fetch_response(url, accept).await?;
|
2020-12-23 18:06:15 +00:00
|
|
|
|
2020-07-10 22:47:41 +00:00
|
|
|
let body = res
|
|
|
|
.body()
|
2020-04-21 21:39:02 +00:00
|
|
|
.await
|
2021-09-18 17:55:39 +00:00
|
|
|
.map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
|
2020-07-10 22:47:41 +00:00
|
|
|
|
|
|
|
Ok(serde_json::from_slice(body.as_ref())?)
|
2020-03-17 21:53:31 +00:00
|
|
|
}
|
|
|
|
|
2022-11-18 04:39:26 +00:00
|
|
|
#[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
|
2023-02-25 21:02:16 +00:00
|
|
|
pub(crate) async fn fetch_response(&self, url: &IriString) -> Result<ClientResponse, Error> {
|
|
|
|
self.do_fetch_response(url, "*/*").await
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) async fn do_fetch_response(
|
|
|
|
&self,
|
|
|
|
url: &IriString,
|
|
|
|
accept: &str,
|
|
|
|
) -> Result<ClientResponse, Error> {
|
|
|
|
if !self.breakers.should_try(url) {
|
2021-09-18 17:55:39 +00:00
|
|
|
return Err(ErrorKind::Breaker.into());
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
|
2020-03-30 06:06:13 +00:00
|
|
|
let signer = self.signer();
|
2022-11-16 17:23:36 +00:00
|
|
|
let span = tracing::Span::current();
|
2020-03-30 06:06:13 +00:00
|
|
|
|
2023-06-23 19:27:20 +00:00
|
|
|
let res = self
|
|
|
|
.client
|
2022-11-18 04:39:26 +00:00
|
|
|
.get(url.as_str())
|
2023-02-25 21:02:16 +00:00
|
|
|
.insert_header(("Accept", accept))
|
2021-02-11 00:00:11 +00:00
|
|
|
.insert_header(Date(SystemTime::now().into()))
|
2022-11-18 04:39:26 +00:00
|
|
|
.no_decompress()
|
2020-03-30 06:06:13 +00:00
|
|
|
.signature(
|
|
|
|
self.config.clone(),
|
|
|
|
self.key_id.clone(),
|
2022-11-16 17:23:36 +00:00
|
|
|
move |signing_string| {
|
|
|
|
span.record("signing_string", signing_string);
|
|
|
|
span.in_scope(|| signer.sign(signing_string))
|
|
|
|
},
|
2020-03-30 06:06:13 +00:00
|
|
|
)
|
|
|
|
.await?
|
2020-03-26 03:26:45 +00:00
|
|
|
.send()
|
2020-05-23 23:56:27 +00:00
|
|
|
.await;
|
|
|
|
|
2023-02-25 21:02:16 +00:00
|
|
|
let res = self.check_response(url, res).await?;
|
2020-03-26 03:26:45 +00:00
|
|
|
|
2022-11-18 04:39:26 +00:00
|
|
|
Ok(res)
|
2020-03-26 03:26:45 +00:00
|
|
|
}
|
|
|
|
|
2021-09-21 19:32:25 +00:00
|
|
|
#[tracing::instrument(
|
|
|
|
"Deliver to Inbox",
|
2022-11-01 20:57:33 +00:00
|
|
|
skip_all,
|
2022-11-16 17:23:36 +00:00
|
|
|
fields(inbox = inbox.to_string().as_str(), signing_string)
|
2021-09-21 19:32:25 +00:00
|
|
|
)]
|
2023-02-25 21:02:16 +00:00
|
|
|
pub(crate) async fn deliver<T>(&self, inbox: &IriString, item: &T) -> Result<(), Error>
|
|
|
|
where
|
|
|
|
T: serde::ser::Serialize + std::fmt::Debug,
|
|
|
|
{
|
|
|
|
self.do_deliver(
|
|
|
|
inbox,
|
|
|
|
item,
|
|
|
|
"application/activity+json",
|
|
|
|
"application/activity+json",
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn do_deliver<T>(
|
|
|
|
&self,
|
|
|
|
inbox: &IriString,
|
|
|
|
item: &T,
|
|
|
|
content_type: &str,
|
|
|
|
accept: &str,
|
|
|
|
) -> Result<ClientResponse, Error>
|
2020-03-18 04:35:20 +00:00
|
|
|
where
|
2021-09-21 18:26:31 +00:00
|
|
|
T: serde::ser::Serialize + std::fmt::Debug,
|
2020-03-18 04:35:20 +00:00
|
|
|
{
|
2021-11-23 22:19:59 +00:00
|
|
|
if !self.breakers.should_try(&inbox) {
|
2021-09-18 17:55:39 +00:00
|
|
|
return Err(ErrorKind::Breaker.into());
|
2020-12-23 18:06:15 +00:00
|
|
|
}
|
|
|
|
|
2020-03-30 06:06:13 +00:00
|
|
|
let signer = self.signer();
|
2022-11-16 17:23:36 +00:00
|
|
|
let span = tracing::Span::current();
|
2020-03-18 04:35:20 +00:00
|
|
|
let item_string = serde_json::to_string(item)?;
|
|
|
|
|
2023-06-23 19:27:20 +00:00
|
|
|
let (req, body) = self
|
|
|
|
.client
|
2020-09-07 21:51:02 +00:00
|
|
|
.post(inbox.as_str())
|
2023-02-25 21:02:16 +00:00
|
|
|
.insert_header(("Accept", accept))
|
|
|
|
.insert_header(("Content-Type", content_type))
|
2021-02-11 00:00:11 +00:00
|
|
|
.insert_header(Date(SystemTime::now().into()))
|
2020-03-18 04:35:20 +00:00
|
|
|
.signature_with_digest(
|
2020-03-30 06:06:13 +00:00
|
|
|
self.config.clone(),
|
|
|
|
self.key_id.clone(),
|
|
|
|
Sha256::new(),
|
2020-03-18 04:35:20 +00:00
|
|
|
item_string,
|
2022-11-16 17:23:36 +00:00
|
|
|
move |signing_string| {
|
|
|
|
span.record("signing_string", signing_string);
|
|
|
|
span.in_scope(|| signer.sign(signing_string))
|
|
|
|
},
|
2020-03-30 06:06:13 +00:00
|
|
|
)
|
|
|
|
.await?
|
2021-09-21 16:21:06 +00:00
|
|
|
.split();
|
|
|
|
|
2021-12-03 22:17:25 +00:00
|
|
|
let res = req.send_body(body).await;
|
2020-05-23 23:56:27 +00:00
|
|
|
|
2023-02-25 21:02:16 +00:00
|
|
|
let res = self.check_response(inbox, res).await?;
|
2020-12-23 18:06:15 +00:00
|
|
|
|
2023-02-25 21:02:16 +00:00
|
|
|
Ok(res)
|
2020-03-18 04:35:20 +00:00
|
|
|
}
|
2020-03-17 17:15:16 +00:00
|
|
|
|
2020-03-30 06:06:13 +00:00
|
|
|
fn signer(&self) -> Signer {
|
|
|
|
Signer {
|
|
|
|
private_key: self.private_key.clone(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
struct Signer {
|
2021-08-01 20:12:06 +00:00
|
|
|
private_key: RsaPrivateKey,
|
2020-03-30 06:06:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Signer {
|
2021-09-18 17:55:39 +00:00
|
|
|
fn sign(&self, signing_string: &str) -> Result<String, Error> {
|
2023-04-28 00:34:23 +00:00
|
|
|
let signing_key = SigningKey::<Sha256>::new(self.private_key.clone());
|
2023-01-23 14:56:18 +00:00
|
|
|
let signature =
|
|
|
|
signing_key.try_sign_with_rng(&mut thread_rng(), signing_string.as_bytes())?;
|
2023-04-28 00:34:23 +00:00
|
|
|
Ok(STANDARD.encode(signature.to_bytes().as_ref()))
|
2020-03-18 04:35:20 +00:00
|
|
|
}
|
2020-03-17 17:15:16 +00:00
|
|
|
}
|
2023-07-26 23:03:21 +00:00
|
|
|
|
|
|
|
fn signature_thread(
|
|
|
|
receiver: flume::Receiver<Box<dyn FnOnce() + Send>>,
|
|
|
|
shutdown: flume::Receiver<()>,
|
2023-07-27 14:26:16 +00:00
|
|
|
id: usize,
|
2023-07-26 23:03:21 +00:00
|
|
|
) {
|
2023-07-27 14:26:16 +00:00
|
|
|
let guard = MetricsGuard::guard(id);
|
2023-07-26 23:03:21 +00:00
|
|
|
let stopping = AtomicBool::new(false);
|
2023-07-27 14:26:16 +00:00
|
|
|
|
2023-07-26 23:03:21 +00:00
|
|
|
while !stopping.load(Ordering::Acquire) {
|
|
|
|
flume::Selector::new()
|
|
|
|
.recv(&receiver, |res| match res {
|
|
|
|
Ok(f) => {
|
2023-07-27 14:26:16 +00:00
|
|
|
let start = Instant::now();
|
|
|
|
metrics::increment_counter!("relay.signature-thread.operation.start", "id" => id.to_string());
|
2023-07-26 23:03:21 +00:00
|
|
|
let res = std::panic::catch_unwind(AssertUnwindSafe(move || {
|
|
|
|
(f)();
|
|
|
|
}));
|
2023-07-27 14:26:16 +00:00
|
|
|
metrics::increment_counter!("relay.signature-thread.operation.end", "complete" => res.is_ok().to_string(), "id" => id.to_string());
|
|
|
|
metrics::histogram!("relay.signature-thread.operation.duration", start.elapsed().as_secs_f64(), "id" => id.to_string());
|
2023-07-26 23:03:21 +00:00
|
|
|
|
|
|
|
if let Err(e) = res {
|
|
|
|
tracing::warn!("Signature fn panicked: {e:?}");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(_) => {
|
|
|
|
tracing::warn!("Receive error, stopping");
|
|
|
|
stopping.store(true, Ordering::Release)
|
|
|
|
}
|
|
|
|
})
|
2023-07-27 14:26:16 +00:00
|
|
|
.recv(&shutdown, |res| {
|
|
|
|
if res.is_ok() {
|
|
|
|
tracing::warn!("Stopping");
|
|
|
|
} else {
|
|
|
|
tracing::warn!("Shutdown receive error, stopping");
|
|
|
|
}
|
2023-07-26 23:03:21 +00:00
|
|
|
stopping.store(true, Ordering::Release)
|
|
|
|
})
|
|
|
|
.wait();
|
|
|
|
}
|
2023-07-27 14:26:16 +00:00
|
|
|
|
|
|
|
guard.disarm();
|
2023-07-26 23:03:21 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub(crate) struct Spawner {
|
|
|
|
sender: flume::Sender<Box<dyn FnOnce() + Send>>,
|
|
|
|
threads: Option<Arc<Vec<JoinHandle<()>>>>,
|
|
|
|
shutdown: flume::Sender<()>,
|
|
|
|
}
|
|
|
|
|
2023-07-27 14:26:16 +00:00
|
|
|
struct MetricsGuard {
|
|
|
|
id: usize,
|
|
|
|
start: Instant,
|
|
|
|
armed: bool,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl MetricsGuard {
|
|
|
|
fn guard(id: usize) -> Self {
|
|
|
|
metrics::increment_counter!("relay.signature-thread.launched", "id" => id.to_string());
|
|
|
|
|
|
|
|
Self {
|
|
|
|
id,
|
|
|
|
start: Instant::now(),
|
|
|
|
armed: true,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn disarm(mut self) {
|
|
|
|
self.armed = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for MetricsGuard {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
metrics::increment_counter!("relay.signature-thread.closed", "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
|
|
|
|
metrics::histogram!("relay.signature-thread.duration", self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-26 23:03:21 +00:00
|
|
|
impl Spawner {
|
2023-07-27 04:04:04 +00:00
|
|
|
pub(crate) fn build(threads: usize) -> std::io::Result<Self> {
|
2023-07-26 23:03:21 +00:00
|
|
|
let (sender, receiver) = flume::bounded(8);
|
|
|
|
let (shutdown, shutdown_rx) = flume::bounded(threads);
|
|
|
|
|
2023-07-27 03:51:07 +00:00
|
|
|
tracing::warn!("Launching {threads} signature threads");
|
|
|
|
|
2023-07-26 23:03:21 +00:00
|
|
|
let threads = (0..threads)
|
|
|
|
.map(|i| {
|
|
|
|
let receiver = receiver.clone();
|
|
|
|
let shutdown_rx = shutdown_rx.clone();
|
|
|
|
std::thread::Builder::new()
|
|
|
|
.name(format!("signature-thread-{i}"))
|
|
|
|
.spawn(move || {
|
2023-07-27 14:26:16 +00:00
|
|
|
signature_thread(receiver, shutdown_rx, i);
|
2023-07-26 23:03:21 +00:00
|
|
|
})
|
|
|
|
})
|
|
|
|
.collect::<Result<Vec<_>, _>>()?;
|
|
|
|
|
|
|
|
Ok(Spawner {
|
|
|
|
sender,
|
|
|
|
threads: Some(Arc::new(threads)),
|
|
|
|
shutdown,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for Spawner {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
if let Some(threads) = self.threads.take().and_then(Arc::into_inner) {
|
|
|
|
for _ in &threads {
|
|
|
|
let _ = self.shutdown.send(());
|
|
|
|
}
|
|
|
|
|
|
|
|
for thread in threads {
|
|
|
|
let _ = thread.join();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-27 14:26:16 +00:00
|
|
|
async fn timer<Fut>(fut: Fut) -> Fut::Output
|
|
|
|
where
|
|
|
|
Fut: std::future::Future,
|
|
|
|
{
|
|
|
|
let id = uuid::Uuid::new_v4();
|
|
|
|
|
|
|
|
metrics::increment_counter!("relay.spawner.wait-timer.start");
|
|
|
|
|
|
|
|
let mut interval = actix_rt::time::interval(Duration::from_secs(5));
|
|
|
|
|
|
|
|
// pass the first tick (instant)
|
|
|
|
interval.tick().await;
|
|
|
|
|
|
|
|
let mut fut = std::pin::pin!(fut);
|
|
|
|
|
|
|
|
let mut counter = 0;
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
out = &mut fut => {
|
|
|
|
metrics::increment_counter!("relay.spawner.wait-timer.end");
|
|
|
|
return out;
|
|
|
|
}
|
|
|
|
_ = interval.tick() => {
|
|
|
|
counter += 1;
|
|
|
|
metrics::increment_counter!("relay.spawner.wait-timer.pending");
|
|
|
|
tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-26 23:03:21 +00:00
|
|
|
impl Spawn for Spawner {
|
|
|
|
type Future<T> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, Canceled>>>>;
|
|
|
|
|
|
|
|
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
|
|
|
|
where
|
|
|
|
Func: FnOnce() -> Out + Send + 'static,
|
|
|
|
Out: Send + 'static,
|
|
|
|
{
|
|
|
|
let sender = self.sender.clone();
|
|
|
|
|
|
|
|
Box::pin(async move {
|
|
|
|
let (tx, rx) = flume::bounded(1);
|
|
|
|
|
|
|
|
let _ = sender
|
|
|
|
.send_async(Box::new(move || {
|
|
|
|
if tx.send((func)()).is_err() {
|
|
|
|
tracing::warn!("Requestor hung up");
|
2023-07-27 14:26:16 +00:00
|
|
|
metrics::increment_counter!("relay.spawner.disconnected");
|
2023-07-26 23:03:21 +00:00
|
|
|
}
|
|
|
|
}))
|
|
|
|
.await;
|
|
|
|
|
2023-07-27 14:26:16 +00:00
|
|
|
timer(rx.recv_async()).await.map_err(|_| Canceled)
|
2023-07-26 23:03:21 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|