Simplify signature thread

This commit is contained in:
asonix 2023-07-27 10:19:20 -05:00
parent fff9bf112d
commit 769f7451f9
7 changed files with 170 additions and 192 deletions

View file

@ -4,6 +4,7 @@ use crate::{
db::Db, db::Db,
error::Error, error::Error,
requests::{Breakers, Requests}, requests::{Breakers, Requests},
spawner::Spawner,
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::web; use actix_web::web;
@ -40,7 +41,7 @@ impl State {
self.node_cache.clone() self.node_cache.clone()
} }
pub(crate) fn requests(&self, config: &Config, spawner: crate::requests::Spawner) -> Requests { pub(crate) fn requests(&self, config: &Config, spawner: Spawner) -> Requests {
Requests::new( Requests::new(
config.generate_url(UrlKind::MainKey).to_string(), config.generate_url(UrlKind::MainKey).to_string(),
self.private_key.clone(), self.private_key.clone(),

View file

@ -14,7 +14,7 @@ use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled,
use std::{convert::Infallible, str::FromStr, time::Instant}; use std::{convert::Infallible, str::FromStr, time::Instant};
use tracing_error::SpanTrace; use tracing_error::SpanTrace;
use crate::{db::Db, requests::Spawner}; use crate::{db::Db, spawner::Spawner};
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct AdminConfig { pub(crate) struct AdminConfig {

View file

@ -17,7 +17,8 @@ use crate::{
data::{ActorCache, MediaCache, NodeCache, State}, data::{ActorCache, MediaCache, NodeCache, State},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
requests::{Requests, Spawner}, requests::Requests,
spawner::Spawner,
}; };
use background_jobs::{ use background_jobs::{
memory_storage::{ActixTimer, Storage}, memory_storage::{ActixTimer, Storage},

View file

@ -31,10 +31,9 @@ mod jobs;
mod middleware; mod middleware;
mod requests; mod requests;
mod routes; mod routes;
mod spawner;
mod telegram; mod telegram;
use crate::requests::Spawner;
use self::{ use self::{
args::Args, args::Args,
config::Config, config::Config,
@ -43,6 +42,7 @@ use self::{
jobs::create_workers, jobs::create_workers,
middleware::{DebugPayload, MyVerify, RelayResolver, Timings}, middleware::{DebugPayload, MyVerify, RelayResolver, Timings},
routes::{actor, healthz, inbox, index, nodeinfo, nodeinfo_meta, statics}, routes::{actor, healthz, inbox, index, nodeinfo, nodeinfo_meta, statics},
spawner::Spawner,
}; };
fn init_subscriber( fn init_subscriber(

View file

@ -2,7 +2,8 @@ use crate::{
apub::AcceptedActors, apub::AcceptedActors,
data::{ActorCache, State}, data::{ActorCache, State},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
requests::{Requests, Spawner}, requests::Requests,
spawner::Spawner,
}; };
use activitystreams::{base::BaseExt, iri, iri_string::types::IriString}; use activitystreams::{base::BaseExt, iri, iri_string::types::IriString};
use base64::{engine::general_purpose::STANDARD, Engine}; use base64::{engine::general_purpose::STANDARD, Engine};

View file

@ -1,13 +1,14 @@
use crate::{ use crate::{
data::LastOnline, data::LastOnline,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
spawner::Spawner,
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::http::header::Date; use actix_web::http::header::Date;
use awc::{error::SendRequestError, Client, ClientResponse, Connector}; use awc::{error::SendRequestError, Client, ClientResponse, Connector};
use base64::{engine::general_purpose::STANDARD, Engine}; use base64::{engine::general_purpose::STANDARD, Engine};
use dashmap::DashMap; use dashmap::DashMap;
use http_signature_normalization_actix::{prelude::*, Canceled, Spawn}; use http_signature_normalization_actix::prelude::*;
use rand::thread_rng; use rand::thread_rng;
use rsa::{ use rsa::{
pkcs1v15::SigningKey, pkcs1v15::SigningKey,
@ -16,13 +17,8 @@ use rsa::{
RsaPrivateKey, RsaPrivateKey,
}; };
use std::{ use std::{
panic::AssertUnwindSafe, sync::Arc,
sync::{ time::{Duration, SystemTime},
atomic::{AtomicBool, Ordering},
Arc,
},
thread::JoinHandle,
time::{Duration, Instant, SystemTime},
}; };
use tracing_awc::Tracing; use tracing_awc::Tracing;
@ -422,181 +418,3 @@ impl Signer {
Ok(STANDARD.encode(signature.to_bytes().as_ref())) Ok(STANDARD.encode(signature.to_bytes().as_ref()))
} }
} }
fn signature_thread(
receiver: flume::Receiver<Box<dyn FnOnce() + Send>>,
shutdown: flume::Receiver<()>,
id: usize,
) {
let guard = MetricsGuard::guard(id);
let stopping = AtomicBool::new(false);
while !stopping.load(Ordering::Acquire) {
flume::Selector::new()
.recv(&receiver, |res| match res {
Ok(f) => {
let start = Instant::now();
metrics::increment_counter!("relay.signature-thread.operation.start", "id" => id.to_string());
let res = std::panic::catch_unwind(AssertUnwindSafe(move || {
(f)();
}));
metrics::increment_counter!("relay.signature-thread.operation.end", "complete" => res.is_ok().to_string(), "id" => id.to_string());
metrics::histogram!("relay.signature-thread.operation.duration", start.elapsed().as_secs_f64(), "id" => id.to_string());
if let Err(e) = res {
tracing::warn!("Signature fn panicked: {e:?}");
}
}
Err(_) => {
tracing::warn!("Receive error, stopping");
stopping.store(true, Ordering::Release)
}
})
.recv(&shutdown, |res| {
if res.is_ok() {
tracing::warn!("Stopping");
} else {
tracing::warn!("Shutdown receive error, stopping");
}
stopping.store(true, Ordering::Release)
})
.wait();
}
guard.disarm();
}
#[derive(Clone, Debug)]
pub(crate) struct Spawner {
sender: flume::Sender<Box<dyn FnOnce() + Send>>,
threads: Option<Arc<Vec<JoinHandle<()>>>>,
shutdown: flume::Sender<()>,
}
struct MetricsGuard {
id: usize,
start: Instant,
armed: bool,
}
impl MetricsGuard {
fn guard(id: usize) -> Self {
metrics::increment_counter!("relay.signature-thread.launched", "id" => id.to_string());
Self {
id,
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::increment_counter!("relay.signature-thread.closed", "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
metrics::histogram!("relay.signature-thread.duration", self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
}
}
impl Spawner {
pub(crate) fn build(threads: usize) -> std::io::Result<Self> {
let (sender, receiver) = flume::bounded(8);
let (shutdown, shutdown_rx) = flume::bounded(threads);
tracing::warn!("Launching {threads} signature threads");
let threads = (0..threads)
.map(|i| {
let receiver = receiver.clone();
let shutdown_rx = shutdown_rx.clone();
std::thread::Builder::new()
.name(format!("signature-thread-{i}"))
.spawn(move || {
signature_thread(receiver, shutdown_rx, i);
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Spawner {
sender,
threads: Some(Arc::new(threads)),
shutdown,
})
}
}
impl Drop for Spawner {
fn drop(&mut self) {
if let Some(threads) = self.threads.take().and_then(Arc::into_inner) {
for _ in &threads {
let _ = self.shutdown.send(());
}
for thread in threads {
let _ = thread.join();
}
}
}
}
async fn timer<Fut>(fut: Fut) -> Fut::Output
where
Fut: std::future::Future,
{
let id = uuid::Uuid::new_v4();
metrics::increment_counter!("relay.spawner.wait-timer.start");
let mut interval = actix_rt::time::interval(Duration::from_secs(5));
// pass the first tick (instant)
interval.tick().await;
let mut fut = std::pin::pin!(fut);
let mut counter = 0;
loop {
tokio::select! {
out = &mut fut => {
metrics::increment_counter!("relay.spawner.wait-timer.end");
return out;
}
_ = interval.tick() => {
counter += 1;
metrics::increment_counter!("relay.spawner.wait-timer.pending");
tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5);
}
}
}
}
impl Spawn for Spawner {
type Future<T> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, Canceled>>>>;
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
where
Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static,
{
let sender = self.sender.clone();
Box::pin(async move {
let (tx, rx) = flume::bounded(1);
let _ = sender
.send_async(Box::new(move || {
if tx.send((func)()).is_err() {
tracing::warn!("Requestor hung up");
metrics::increment_counter!("relay.spawner.disconnected");
}
}))
.await;
timer(rx.recv_async()).await.map_err(|_| Canceled)
})
}
}

157
src/spawner.rs Normal file
View file

@ -0,0 +1,157 @@
use http_signature_normalization_actix::{Canceled, Spawn};
use std::{
panic::AssertUnwindSafe,
sync::Arc,
thread::JoinHandle,
time::{Duration, Instant},
};
fn signature_thread(receiver: flume::Receiver<Box<dyn FnOnce() + Send>>, id: usize) {
let guard = MetricsGuard::guard(id);
while let Ok(f) = receiver.recv() {
let start = Instant::now();
metrics::increment_counter!("relay.signature-thread.operation.start", "id" => id.to_string());
let res = std::panic::catch_unwind(AssertUnwindSafe(move || {
(f)();
}));
metrics::increment_counter!("relay.signature-thread.operation.end", "complete" => res.is_ok().to_string(), "id" => id.to_string());
metrics::histogram!("relay.signature-thread.operation.duration", start.elapsed().as_secs_f64(), "id" => id.to_string());
if let Err(e) = res {
tracing::warn!("Signature fn panicked: {e:?}");
}
}
guard.disarm();
}
#[derive(Clone, Debug)]
pub(crate) struct Spawner {
sender: Option<flume::Sender<Box<dyn FnOnce() + Send>>>,
threads: Option<Arc<Vec<JoinHandle<()>>>>,
}
struct MetricsGuard {
id: usize,
start: Instant,
armed: bool,
}
impl MetricsGuard {
fn guard(id: usize) -> Self {
metrics::increment_counter!("relay.signature-thread.launched", "id" => id.to_string());
Self {
id,
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::increment_counter!("relay.signature-thread.closed", "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
metrics::histogram!("relay.signature-thread.duration", self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
tracing::warn!("Stopping signature thread");
}
}
impl Spawner {
pub(crate) fn build(threads: usize) -> std::io::Result<Self> {
let (sender, receiver) = flume::bounded(8);
tracing::warn!("Launching {threads} signature threads");
let threads = (0..threads)
.map(|i| {
let receiver = receiver.clone();
std::thread::Builder::new()
.name(format!("signature-thread-{i}"))
.spawn(move || {
signature_thread(receiver, i);
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Spawner {
sender: Some(sender),
threads: Some(Arc::new(threads)),
})
}
}
impl Drop for Spawner {
fn drop(&mut self) {
self.sender.take();
if let Some(threads) = self.threads.take().and_then(Arc::into_inner) {
for thread in threads {
let _ = thread.join();
}
}
}
}
async fn timer<Fut>(fut: Fut) -> Fut::Output
where
Fut: std::future::Future,
{
let id = uuid::Uuid::new_v4();
metrics::increment_counter!("relay.spawner.wait-timer.start");
let mut interval = actix_rt::time::interval(Duration::from_secs(5));
// pass the first tick (instant)
interval.tick().await;
let mut fut = std::pin::pin!(fut);
let mut counter = 0;
loop {
tokio::select! {
out = &mut fut => {
metrics::increment_counter!("relay.spawner.wait-timer.end");
return out;
}
_ = interval.tick() => {
counter += 1;
metrics::increment_counter!("relay.spawner.wait-timer.pending");
tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5);
}
}
}
}
impl Spawn for Spawner {
type Future<T> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, Canceled>>>>;
fn spawn_blocking<Func, Out>(&self, func: Func) -> Self::Future<Out>
where
Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static,
{
let sender = self.sender.as_ref().expect("Sender exists").clone();
Box::pin(async move {
let (tx, rx) = flume::bounded(1);
let _ = sender
.send_async(Box::new(move || {
if tx.try_send((func)()).is_err() {
tracing::warn!("Requestor hung up");
metrics::increment_counter!("relay.spawner.disconnected");
}
}))
.await;
timer(rx.recv_async()).await.map_err(|_| Canceled)
})
}
}