From 5011e05c3d22cca7a11386054d5df7b4ae9544a1 Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 15 Nov 2022 19:56:13 -0600 Subject: [PATCH] Clean tracing a bit more --- src/data/actor.rs | 8 ++++---- src/data/media.rs | 8 ++++---- src/data/node.rs | 12 +++++++----- src/data/state.rs | 5 +++-- src/error.rs | 7 ++++++- src/jobs/deliver.rs | 8 +++++++- src/jobs/instance.rs | 12 ++++++++++-- src/jobs/nodeinfo.rs | 9 ++++++++- src/middleware/verifier.rs | 2 +- src/requests.rs | 11 +++++------ src/routes/inbox.rs | 2 +- 11 files changed, 56 insertions(+), 28 deletions(-) diff --git a/src/data/actor.rs b/src/data/actor.rs index 50f0195..c0b3ddd 100644 --- a/src/data/actor.rs +++ b/src/data/actor.rs @@ -37,7 +37,7 @@ impl ActorCache { ActorCache { db } } - #[tracing::instrument(name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))] + #[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))] pub(crate) async fn get( &self, id: &IriString, @@ -54,7 +54,7 @@ impl ActorCache { .map(MaybeCached::Fetched) } - #[tracing::instrument(name = "Add Connection", skip(self))] + #[tracing::instrument(level = "debug", name = "Add Connection", skip(self))] pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> { let add_connection = self.db.add_connection(actor.id.clone()); let save_actor = self.db.save_actor(actor); @@ -64,12 +64,12 @@ impl ActorCache { Ok(()) } - #[tracing::instrument(name = "Remove Connection", skip(self))] + #[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))] pub(crate) async fn remove_connection(&self, actor: &Actor) -> Result<(), Error> { self.db.remove_connection(actor.id.clone()).await } - #[tracing::instrument(name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))] + #[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))] pub(crate) async fn get_no_cache( &self, id: &IriString, diff --git a/src/data/media.rs b/src/data/media.rs index 18bc6e4..4a12f41 100644 --- a/src/data/media.rs +++ b/src/data/media.rs @@ -19,17 +19,17 @@ impl MediaCache { MediaCache { db } } - #[tracing::instrument(name = "Get media uuid", skip_all, fields(url = url.to_string().as_str()))] + #[tracing::instrument(level = "debug", name = "Get media uuid", skip_all, fields(url = url.to_string().as_str()))] pub(crate) async fn get_uuid(&self, url: IriString) -> Result, Error> { self.db.media_id(url).await } - #[tracing::instrument(name = "Get media url", skip(self))] + #[tracing::instrument(level = "debug", name = "Get media url", skip(self))] pub(crate) async fn get_url(&self, uuid: Uuid) -> Result, Error> { self.db.media_url(uuid).await } - #[tracing::instrument(name = "Is media outdated", skip(self))] + #[tracing::instrument(level = "debug", name = "Is media outdated", skip(self))] pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result { if let Some(meta) = self.db.media_meta(uuid).await? { if meta.saved_at + MEDIA_DURATION > SystemTime::now() { @@ -40,7 +40,7 @@ impl MediaCache { Ok(true) } - #[tracing::instrument(name = "Get media bytes", skip(self))] + #[tracing::instrument(level = "debug", name = "Get media bytes", skip(self))] pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result, Error> { if let Some(meta) = self.db.media_meta(uuid).await? { if meta.saved_at + MEDIA_DURATION > SystemTime::now() { diff --git a/src/data/node.rs b/src/data/node.rs index 730da30..815cc7c 100644 --- a/src/data/node.rs +++ b/src/data/node.rs @@ -34,7 +34,7 @@ impl NodeCache { NodeCache { db } } - #[tracing::instrument(name = "Get nodes", skip(self))] + #[tracing::instrument(level = "debug", name = "Get nodes", skip(self))] pub(crate) async fn nodes(&self) -> Result, Error> { let infos = self.db.connected_info(); let instances = self.db.connected_instance(); @@ -59,7 +59,7 @@ impl NodeCache { Ok(vec) } - #[tracing::instrument(name = "Is NodeInfo Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] + #[tracing::instrument(level = "debug", name = "Is NodeInfo Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: IriString) -> bool { self.db .info(actor_id) @@ -68,7 +68,7 @@ impl NodeCache { .unwrap_or(true) } - #[tracing::instrument(name = "Is Contact Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] + #[tracing::instrument(level = "debug", name = "Is Contact Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] pub(crate) async fn is_contact_outdated(&self, actor_id: IriString) -> bool { self.db .contact(actor_id) @@ -77,7 +77,7 @@ impl NodeCache { .unwrap_or(true) } - #[tracing::instrument(name = "Is Instance Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] + #[tracing::instrument(level = "debug", name = "Is Instance Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] pub(crate) async fn is_instance_outdated(&self, actor_id: IriString) -> bool { self.db .instance(actor_id) @@ -86,7 +86,7 @@ impl NodeCache { .unwrap_or(true) } - #[tracing::instrument(name = "Save node info", skip_all, fields(actor_id = actor_id.to_string().as_str(), software, version, reg))] + #[tracing::instrument(level = "debug", name = "Save node info", skip_all, fields(actor_id = actor_id.to_string().as_str(), software, version, reg))] pub(crate) async fn set_info( &self, actor_id: IriString, @@ -108,6 +108,7 @@ impl NodeCache { } #[tracing::instrument( + level = "debug", name = "Save instance info", skip_all, fields( @@ -144,6 +145,7 @@ impl NodeCache { } #[tracing::instrument( + level = "debug", name = "Save contact info", skip_all, fields( diff --git a/src/data/state.rs b/src/data/state.rs index f583281..93a3f9a 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -48,6 +48,7 @@ impl State { } #[tracing::instrument( + level = "debug", name = "Get inboxes for other domains", skip_all, fields( @@ -85,10 +86,10 @@ impl State { self.object_cache.write().await.put(object_id, actor_id); } - #[tracing::instrument(name = "Building state", skip_all)] + #[tracing::instrument(level = "debug", name = "Building state", skip_all)] pub(crate) async fn build(db: Db) -> Result { let private_key = if let Ok(Some(key)) = db.private_key().await { - tracing::info!("Using existing key"); + tracing::debug!("Using existing key"); key } else { tracing::info!("Generating new keys"); diff --git a/src/error.rs b/src/error.rs index 473ccf1..4e251ec 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,6 @@ use actix_web::{ }; use http_signature_normalization_actix::PrepareSignError; use std::{convert::Infallible, fmt::Debug, io}; -use tracing::error; use tracing_error::SpanTrace; pub(crate) struct Error { @@ -15,6 +14,12 @@ pub(crate) struct Error { kind: ErrorKind, } +impl Error { + pub(crate) fn is_breaker(&self) -> bool { + matches!(self.kind, ErrorKind::Breaker) + } +} + impl std::fmt::Debug for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "{:?}", self.kind) diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index 15ce3cf..6916b98 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -35,7 +35,13 @@ impl Deliver { #[tracing::instrument(name = "Deliver", skip(state))] async fn permform(self, state: JobState) -> Result<(), Error> { - state.requests.deliver(self.to, &self.data).await?; + if let Err(e) = state.requests.deliver(self.to, &self.data).await { + if e.is_breaker() { + tracing::debug!("Not trying due to failed breaker"); + return Ok(()); + } + return Err(e); + } Ok(()) } } diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 063a1ea..040897f 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -47,10 +47,18 @@ impl QueryInstance { let scheme = self.actor_id.scheme_str(); let instance_uri = iri!(format!("{}://{}/api/v1/instance", scheme, authority)); - let instance = state + let instance = match state .requests .fetch_json::(instance_uri.as_str()) - .await?; + .await + { + Ok(instance) => instance, + Err(e) if e.is_breaker() => { + tracing::debug!("Not retrying due to failed breaker"); + return Ok(()); + } + Err(e) => return Err(e), + }; let description = instance.short_description.unwrap_or(instance.description); diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index 60fe60c..77f10c8 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -52,7 +52,14 @@ impl QueryNodeinfo { return Ok(()); }; - let nodeinfo = state.requests.fetch_json::(&href).await?; + let nodeinfo = match state.requests.fetch_json::(&href).await { + Ok(nodeinfo) => nodeinfo, + Err(e) if e.is_breaker() => { + tracing::debug!("Not retrying due to failed breaker"); + return Ok(()); + } + Err(e) => return Err(e), + }; state .node_cache diff --git a/src/middleware/verifier.rs b/src/middleware/verifier.rs index b371fbd..66df9b6 100644 --- a/src/middleware/verifier.rs +++ b/src/middleware/verifier.rs @@ -16,7 +16,7 @@ use std::{future::Future, pin::Pin}; pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State); impl MyVerify { - #[tracing::instrument("Verify signature", skip(self))] + #[tracing::instrument("Verify signature", skip(self, signature))] async fn verify( &self, algorithm: Option, diff --git a/src/requests.rs b/src/requests.rs index 98e3a2d..f4da6bc 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -20,7 +20,6 @@ use std::{ }, time::{Duration, SystemTime}, }; -use tracing::{debug, info, warn}; use tracing_awc::Tracing; const ONE_SECOND: u64 = 1; @@ -193,7 +192,7 @@ impl Requests { fn count_err(&self) { let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed); if count + 1 >= self.error_limit { - warn!("{} consecutive errors, rebuilding http client", count); + tracing::warn!("{} consecutive errors, rebuilding http client", count); *self.client.borrow_mut() = Client::builder() .wrap(Tracing) .add_default_header(("User-Agent", self.user_agent.clone())) @@ -261,7 +260,7 @@ impl Requests { if let Ok(bytes) = res.body().await { if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { if !s.is_empty() { - debug!("Response from {}, {}", url, s); + tracing::warn!("Response from {}, {}", url, s); } } } @@ -289,7 +288,7 @@ impl Requests { return Err(ErrorKind::Breaker.into()); } - info!("Fetching bytes for {}", url); + tracing::info!("Fetching bytes for {}", url); let signer = self.signer(); let client: Client = self.client.borrow().clone(); @@ -329,7 +328,7 @@ impl Requests { if let Ok(bytes) = res.body().await { if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { if !s.is_empty() { - debug!("Response from {}, {}", url, s); + tracing::warn!("Response from {}, {}", url, s); } } } @@ -400,7 +399,7 @@ impl Requests { if let Ok(bytes) = res.body().await { if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { if !s.is_empty() { - warn!("Response from {}, {}", inbox.as_str(), s); + tracing::warn!("Response from {}, {}", inbox.as_str(), s); } } } diff --git a/src/routes/inbox.rs b/src/routes/inbox.rs index 32d874e..1e1193c 100644 --- a/src/routes/inbox.rs +++ b/src/routes/inbox.rs @@ -16,7 +16,7 @@ use activitystreams::{ use actix_web::{web, HttpResponse}; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; -#[tracing::instrument(name = "Inbox", skip(actors, client, jobs, config, state))] +#[tracing::instrument(name = "Inbox", skip(actors, client, jobs, config, state, verified))] pub(crate) async fn route( state: web::Data, actors: web::Data,