diff --git a/src/main.rs b/src/main.rs index 10f764c..64945ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -259,10 +259,12 @@ async fn do_server_main( let bind_address = config.bind_address(); let server = HttpServer::new(move || { + let requests = state.requests(&config); + let app = App::new() .app_data(web::Data::new(db.clone())) .app_data(web::Data::new(state.clone())) - .app_data(web::Data::new(state.requests(&config))) + .app_data(web::Data::new(requests.clone())) .app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(config.clone())) .app_data(web::Data::new(job_server.clone())) @@ -285,7 +287,7 @@ async fn do_server_main( web::resource("/inbox") .wrap(config.digest_middleware()) .wrap(VerifySignature::new( - MyVerify(state.requests(&config), actors.clone(), state.clone()), + MyVerify(requests, actors.clone(), state.clone()), Default::default(), )) .wrap(DebugPayload(config.debug())) diff --git a/src/requests.rs b/src/requests.rs index efd3a99..b71e6d1 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -16,12 +16,7 @@ use rsa::{ RsaPrivateKey, }; use std::{ - cell::RefCell, - rc::Rc, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, + sync::Arc, time::{Duration, SystemTime}, }; use tracing_awc::Tracing; @@ -146,9 +141,7 @@ impl Default for Breaker { #[derive(Clone)] pub(crate) struct Requests { pool_size: usize, - client: Rc>, - consecutive_errors: Rc, - error_limit: usize, + client: Client, key_id: String, user_agent: String, private_key: RsaPrivateKey, @@ -161,7 +154,6 @@ impl std::fmt::Debug for Requests { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Requests") .field("pool_size", &self.pool_size) - .field("error_limit", &self.error_limit) .field("key_id", &self.key_id) .field("user_agent", &self.user_agent) .field("config", &self.config) @@ -192,9 +184,7 @@ impl Requests { ) -> Self { Requests { pool_size, - client: Rc::new(RefCell::new(build_client(&user_agent, pool_size))), - consecutive_errors: Rc::new(AtomicUsize::new(0)), - error_limit: 3, + client: build_client(&user_agent, pool_size), key_id, user_agent, private_key, @@ -208,34 +198,18 @@ impl Requests { self.breakers.succeed(iri); } - fn count_err(&self) { - let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed); - if count + 1 >= self.error_limit { - tracing::warn!("{} consecutive errors, rebuilding http client", count + 1); - *self.client.borrow_mut() = build_client(&self.user_agent, self.pool_size); - self.reset_err(); - } - } - - fn reset_err(&self) { - self.consecutive_errors.swap(0, Ordering::Relaxed); - } - async fn check_response( &self, parsed_url: &IriString, res: Result, ) -> Result { if res.is_err() { - self.count_err(); self.breakers.fail(&parsed_url); } let mut res = res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?; - self.reset_err(); - if res.status().is_server_error() { self.breakers.fail(&parsed_url); @@ -325,8 +299,8 @@ impl Requests { let signer = self.signer(); let span = tracing::Span::current(); - let client: Client = self.client.borrow().clone(); - let res = client + let res = self + .client .get(url.as_str()) .insert_header(("Accept", accept)) .insert_header(Date(SystemTime::now().into())) @@ -385,8 +359,8 @@ impl Requests { let span = tracing::Span::current(); let item_string = serde_json::to_string(item)?; - let client: Client = self.client.borrow().clone(); - let (req, body) = client + let (req, body) = self + .client .post(inbox.as_str()) .insert_header(("Accept", accept)) .insert_header(("Content-Type", content_type))