mirror of
https://git.asonix.dog/asonix/relay.git
synced 2024-12-18 14:16:48 +00:00
Don't drop and rebuild clients, share clients better
This commit is contained in:
parent
246e79b261
commit
d40db33eb5
2 changed files with 11 additions and 35 deletions
|
@ -259,10 +259,12 @@ async fn do_server_main(
|
||||||
|
|
||||||
let bind_address = config.bind_address();
|
let bind_address = config.bind_address();
|
||||||
let server = HttpServer::new(move || {
|
let server = HttpServer::new(move || {
|
||||||
|
let requests = state.requests(&config);
|
||||||
|
|
||||||
let app = App::new()
|
let app = App::new()
|
||||||
.app_data(web::Data::new(db.clone()))
|
.app_data(web::Data::new(db.clone()))
|
||||||
.app_data(web::Data::new(state.clone()))
|
.app_data(web::Data::new(state.clone()))
|
||||||
.app_data(web::Data::new(state.requests(&config)))
|
.app_data(web::Data::new(requests.clone()))
|
||||||
.app_data(web::Data::new(actors.clone()))
|
.app_data(web::Data::new(actors.clone()))
|
||||||
.app_data(web::Data::new(config.clone()))
|
.app_data(web::Data::new(config.clone()))
|
||||||
.app_data(web::Data::new(job_server.clone()))
|
.app_data(web::Data::new(job_server.clone()))
|
||||||
|
@ -285,7 +287,7 @@ async fn do_server_main(
|
||||||
web::resource("/inbox")
|
web::resource("/inbox")
|
||||||
.wrap(config.digest_middleware())
|
.wrap(config.digest_middleware())
|
||||||
.wrap(VerifySignature::new(
|
.wrap(VerifySignature::new(
|
||||||
MyVerify(state.requests(&config), actors.clone(), state.clone()),
|
MyVerify(requests, actors.clone(), state.clone()),
|
||||||
Default::default(),
|
Default::default(),
|
||||||
))
|
))
|
||||||
.wrap(DebugPayload(config.debug()))
|
.wrap(DebugPayload(config.debug()))
|
||||||
|
|
|
@ -16,12 +16,7 @@ use rsa::{
|
||||||
RsaPrivateKey,
|
RsaPrivateKey,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
cell::RefCell,
|
sync::Arc,
|
||||||
rc::Rc,
|
|
||||||
sync::{
|
|
||||||
atomic::{AtomicUsize, Ordering},
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
time::{Duration, SystemTime},
|
time::{Duration, SystemTime},
|
||||||
};
|
};
|
||||||
use tracing_awc::Tracing;
|
use tracing_awc::Tracing;
|
||||||
|
@ -146,9 +141,7 @@ impl Default for Breaker {
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct Requests {
|
pub(crate) struct Requests {
|
||||||
pool_size: usize,
|
pool_size: usize,
|
||||||
client: Rc<RefCell<Client>>,
|
client: Client,
|
||||||
consecutive_errors: Rc<AtomicUsize>,
|
|
||||||
error_limit: usize,
|
|
||||||
key_id: String,
|
key_id: String,
|
||||||
user_agent: String,
|
user_agent: String,
|
||||||
private_key: RsaPrivateKey,
|
private_key: RsaPrivateKey,
|
||||||
|
@ -161,7 +154,6 @@ impl std::fmt::Debug for Requests {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("Requests")
|
f.debug_struct("Requests")
|
||||||
.field("pool_size", &self.pool_size)
|
.field("pool_size", &self.pool_size)
|
||||||
.field("error_limit", &self.error_limit)
|
|
||||||
.field("key_id", &self.key_id)
|
.field("key_id", &self.key_id)
|
||||||
.field("user_agent", &self.user_agent)
|
.field("user_agent", &self.user_agent)
|
||||||
.field("config", &self.config)
|
.field("config", &self.config)
|
||||||
|
@ -192,9 +184,7 @@ impl Requests {
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Requests {
|
Requests {
|
||||||
pool_size,
|
pool_size,
|
||||||
client: Rc::new(RefCell::new(build_client(&user_agent, pool_size))),
|
client: build_client(&user_agent, pool_size),
|
||||||
consecutive_errors: Rc::new(AtomicUsize::new(0)),
|
|
||||||
error_limit: 3,
|
|
||||||
key_id,
|
key_id,
|
||||||
user_agent,
|
user_agent,
|
||||||
private_key,
|
private_key,
|
||||||
|
@ -208,34 +198,18 @@ impl Requests {
|
||||||
self.breakers.succeed(iri);
|
self.breakers.succeed(iri);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn count_err(&self) {
|
|
||||||
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
|
|
||||||
if count + 1 >= self.error_limit {
|
|
||||||
tracing::warn!("{} consecutive errors, rebuilding http client", count + 1);
|
|
||||||
*self.client.borrow_mut() = build_client(&self.user_agent, self.pool_size);
|
|
||||||
self.reset_err();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reset_err(&self) {
|
|
||||||
self.consecutive_errors.swap(0, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn check_response(
|
async fn check_response(
|
||||||
&self,
|
&self,
|
||||||
parsed_url: &IriString,
|
parsed_url: &IriString,
|
||||||
res: Result<ClientResponse, SendRequestError>,
|
res: Result<ClientResponse, SendRequestError>,
|
||||||
) -> Result<ClientResponse, Error> {
|
) -> Result<ClientResponse, Error> {
|
||||||
if res.is_err() {
|
if res.is_err() {
|
||||||
self.count_err();
|
|
||||||
self.breakers.fail(&parsed_url);
|
self.breakers.fail(&parsed_url);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut res =
|
let mut res =
|
||||||
res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?;
|
res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?;
|
||||||
|
|
||||||
self.reset_err();
|
|
||||||
|
|
||||||
if res.status().is_server_error() {
|
if res.status().is_server_error() {
|
||||||
self.breakers.fail(&parsed_url);
|
self.breakers.fail(&parsed_url);
|
||||||
|
|
||||||
|
@ -325,8 +299,8 @@ impl Requests {
|
||||||
let signer = self.signer();
|
let signer = self.signer();
|
||||||
let span = tracing::Span::current();
|
let span = tracing::Span::current();
|
||||||
|
|
||||||
let client: Client = self.client.borrow().clone();
|
let res = self
|
||||||
let res = client
|
.client
|
||||||
.get(url.as_str())
|
.get(url.as_str())
|
||||||
.insert_header(("Accept", accept))
|
.insert_header(("Accept", accept))
|
||||||
.insert_header(Date(SystemTime::now().into()))
|
.insert_header(Date(SystemTime::now().into()))
|
||||||
|
@ -385,8 +359,8 @@ impl Requests {
|
||||||
let span = tracing::Span::current();
|
let span = tracing::Span::current();
|
||||||
let item_string = serde_json::to_string(item)?;
|
let item_string = serde_json::to_string(item)?;
|
||||||
|
|
||||||
let client: Client = self.client.borrow().clone();
|
let (req, body) = self
|
||||||
let (req, body) = client
|
.client
|
||||||
.post(inbox.as_str())
|
.post(inbox.as_str())
|
||||||
.insert_header(("Accept", accept))
|
.insert_header(("Accept", accept))
|
||||||
.insert_header(("Content-Type", content_type))
|
.insert_header(("Content-Type", content_type))
|
||||||
|
|
Loading…
Reference in a new issue