forked from mirrors/relay
Clean tracing a bit more
This commit is contained in:
parent
2df34c9e55
commit
5011e05c3d
11 changed files with 56 additions and 28 deletions
|
@ -37,7 +37,7 @@ impl ActorCache {
|
||||||
ActorCache { db }
|
ActorCache { db }
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))]
|
#[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))]
|
||||||
pub(crate) async fn get(
|
pub(crate) async fn get(
|
||||||
&self,
|
&self,
|
||||||
id: &IriString,
|
id: &IriString,
|
||||||
|
@ -54,7 +54,7 @@ impl ActorCache {
|
||||||
.map(MaybeCached::Fetched)
|
.map(MaybeCached::Fetched)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Add Connection", skip(self))]
|
#[tracing::instrument(level = "debug", name = "Add Connection", skip(self))]
|
||||||
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
|
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
|
||||||
let add_connection = self.db.add_connection(actor.id.clone());
|
let add_connection = self.db.add_connection(actor.id.clone());
|
||||||
let save_actor = self.db.save_actor(actor);
|
let save_actor = self.db.save_actor(actor);
|
||||||
|
@ -64,12 +64,12 @@ impl ActorCache {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Remove Connection", skip(self))]
|
#[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))]
|
||||||
pub(crate) async fn remove_connection(&self, actor: &Actor) -> Result<(), Error> {
|
pub(crate) async fn remove_connection(&self, actor: &Actor) -> Result<(), Error> {
|
||||||
self.db.remove_connection(actor.id.clone()).await
|
self.db.remove_connection(actor.id.clone()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))]
|
#[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))]
|
||||||
pub(crate) async fn get_no_cache(
|
pub(crate) async fn get_no_cache(
|
||||||
&self,
|
&self,
|
||||||
id: &IriString,
|
id: &IriString,
|
||||||
|
|
|
@ -19,17 +19,17 @@ impl MediaCache {
|
||||||
MediaCache { db }
|
MediaCache { db }
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Get media uuid", skip_all, fields(url = url.to_string().as_str()))]
|
#[tracing::instrument(level = "debug", name = "Get media uuid", skip_all, fields(url = url.to_string().as_str()))]
|
||||||
pub(crate) async fn get_uuid(&self, url: IriString) -> Result<Option<Uuid>, Error> {
|
pub(crate) async fn get_uuid(&self, url: IriString) -> Result<Option<Uuid>, Error> {
|
||||||
self.db.media_id(url).await
|
self.db.media_id(url).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Get media url", skip(self))]
|
#[tracing::instrument(level = "debug", name = "Get media url", skip(self))]
|
||||||
pub(crate) async fn get_url(&self, uuid: Uuid) -> Result<Option<IriString>, Error> {
|
pub(crate) async fn get_url(&self, uuid: Uuid) -> Result<Option<IriString>, Error> {
|
||||||
self.db.media_url(uuid).await
|
self.db.media_url(uuid).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Is media outdated", skip(self))]
|
#[tracing::instrument(level = "debug", name = "Is media outdated", skip(self))]
|
||||||
pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result<bool, Error> {
|
pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result<bool, Error> {
|
||||||
if let Some(meta) = self.db.media_meta(uuid).await? {
|
if let Some(meta) = self.db.media_meta(uuid).await? {
|
||||||
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
|
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
|
||||||
|
@ -40,7 +40,7 @@ impl MediaCache {
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Get media bytes", skip(self))]
|
#[tracing::instrument(level = "debug", name = "Get media bytes", skip(self))]
|
||||||
pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result<Option<(String, Bytes)>, Error> {
|
pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result<Option<(String, Bytes)>, Error> {
|
||||||
if let Some(meta) = self.db.media_meta(uuid).await? {
|
if let Some(meta) = self.db.media_meta(uuid).await? {
|
||||||
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
|
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
|
||||||
|
|
|
@ -34,7 +34,7 @@ impl NodeCache {
|
||||||
NodeCache { db }
|
NodeCache { db }
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Get nodes", skip(self))]
|
#[tracing::instrument(level = "debug", name = "Get nodes", skip(self))]
|
||||||
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
|
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
|
||||||
let infos = self.db.connected_info();
|
let infos = self.db.connected_info();
|
||||||
let instances = self.db.connected_instance();
|
let instances = self.db.connected_instance();
|
||||||
|
@ -59,7 +59,7 @@ impl NodeCache {
|
||||||
Ok(vec)
|
Ok(vec)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Is NodeInfo Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
|
#[tracing::instrument(level = "debug", name = "Is NodeInfo Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
|
||||||
pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: IriString) -> bool {
|
pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: IriString) -> bool {
|
||||||
self.db
|
self.db
|
||||||
.info(actor_id)
|
.info(actor_id)
|
||||||
|
@ -68,7 +68,7 @@ impl NodeCache {
|
||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Is Contact Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
|
#[tracing::instrument(level = "debug", name = "Is Contact Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
|
||||||
pub(crate) async fn is_contact_outdated(&self, actor_id: IriString) -> bool {
|
pub(crate) async fn is_contact_outdated(&self, actor_id: IriString) -> bool {
|
||||||
self.db
|
self.db
|
||||||
.contact(actor_id)
|
.contact(actor_id)
|
||||||
|
@ -77,7 +77,7 @@ impl NodeCache {
|
||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Is Instance Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
|
#[tracing::instrument(level = "debug", name = "Is Instance Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
|
||||||
pub(crate) async fn is_instance_outdated(&self, actor_id: IriString) -> bool {
|
pub(crate) async fn is_instance_outdated(&self, actor_id: IriString) -> bool {
|
||||||
self.db
|
self.db
|
||||||
.instance(actor_id)
|
.instance(actor_id)
|
||||||
|
@ -86,7 +86,7 @@ impl NodeCache {
|
||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Save node info", skip_all, fields(actor_id = actor_id.to_string().as_str(), software, version, reg))]
|
#[tracing::instrument(level = "debug", name = "Save node info", skip_all, fields(actor_id = actor_id.to_string().as_str(), software, version, reg))]
|
||||||
pub(crate) async fn set_info(
|
pub(crate) async fn set_info(
|
||||||
&self,
|
&self,
|
||||||
actor_id: IriString,
|
actor_id: IriString,
|
||||||
|
@ -108,6 +108,7 @@ impl NodeCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
|
level = "debug",
|
||||||
name = "Save instance info",
|
name = "Save instance info",
|
||||||
skip_all,
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
|
@ -144,6 +145,7 @@ impl NodeCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
|
level = "debug",
|
||||||
name = "Save contact info",
|
name = "Save contact info",
|
||||||
skip_all,
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
|
|
|
@ -48,6 +48,7 @@ impl State {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
|
level = "debug",
|
||||||
name = "Get inboxes for other domains",
|
name = "Get inboxes for other domains",
|
||||||
skip_all,
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
|
@ -85,10 +86,10 @@ impl State {
|
||||||
self.object_cache.write().await.put(object_id, actor_id);
|
self.object_cache.write().await.put(object_id, actor_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Building state", skip_all)]
|
#[tracing::instrument(level = "debug", name = "Building state", skip_all)]
|
||||||
pub(crate) async fn build(db: Db) -> Result<Self, Error> {
|
pub(crate) async fn build(db: Db) -> Result<Self, Error> {
|
||||||
let private_key = if let Ok(Some(key)) = db.private_key().await {
|
let private_key = if let Ok(Some(key)) = db.private_key().await {
|
||||||
tracing::info!("Using existing key");
|
tracing::debug!("Using existing key");
|
||||||
key
|
key
|
||||||
} else {
|
} else {
|
||||||
tracing::info!("Generating new keys");
|
tracing::info!("Generating new keys");
|
||||||
|
|
|
@ -7,7 +7,6 @@ use actix_web::{
|
||||||
};
|
};
|
||||||
use http_signature_normalization_actix::PrepareSignError;
|
use http_signature_normalization_actix::PrepareSignError;
|
||||||
use std::{convert::Infallible, fmt::Debug, io};
|
use std::{convert::Infallible, fmt::Debug, io};
|
||||||
use tracing::error;
|
|
||||||
use tracing_error::SpanTrace;
|
use tracing_error::SpanTrace;
|
||||||
|
|
||||||
pub(crate) struct Error {
|
pub(crate) struct Error {
|
||||||
|
@ -15,6 +14,12 @@ pub(crate) struct Error {
|
||||||
kind: ErrorKind,
|
kind: ErrorKind,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Error {
|
||||||
|
pub(crate) fn is_breaker(&self) -> bool {
|
||||||
|
matches!(self.kind, ErrorKind::Breaker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for Error {
|
impl std::fmt::Debug for Error {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
writeln!(f, "{:?}", self.kind)
|
writeln!(f, "{:?}", self.kind)
|
||||||
|
|
|
@ -35,7 +35,13 @@ impl Deliver {
|
||||||
|
|
||||||
#[tracing::instrument(name = "Deliver", skip(state))]
|
#[tracing::instrument(name = "Deliver", skip(state))]
|
||||||
async fn permform(self, state: JobState) -> Result<(), Error> {
|
async fn permform(self, state: JobState) -> Result<(), Error> {
|
||||||
state.requests.deliver(self.to, &self.data).await?;
|
if let Err(e) = state.requests.deliver(self.to, &self.data).await {
|
||||||
|
if e.is_breaker() {
|
||||||
|
tracing::debug!("Not trying due to failed breaker");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,10 +47,18 @@ impl QueryInstance {
|
||||||
let scheme = self.actor_id.scheme_str();
|
let scheme = self.actor_id.scheme_str();
|
||||||
let instance_uri = iri!(format!("{}://{}/api/v1/instance", scheme, authority));
|
let instance_uri = iri!(format!("{}://{}/api/v1/instance", scheme, authority));
|
||||||
|
|
||||||
let instance = state
|
let instance = match state
|
||||||
.requests
|
.requests
|
||||||
.fetch_json::<Instance>(instance_uri.as_str())
|
.fetch_json::<Instance>(instance_uri.as_str())
|
||||||
.await?;
|
.await
|
||||||
|
{
|
||||||
|
Ok(instance) => instance,
|
||||||
|
Err(e) if e.is_breaker() => {
|
||||||
|
tracing::debug!("Not retrying due to failed breaker");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
};
|
||||||
|
|
||||||
let description = instance.short_description.unwrap_or(instance.description);
|
let description = instance.short_description.unwrap_or(instance.description);
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,14 @@ impl QueryNodeinfo {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
let nodeinfo = state.requests.fetch_json::<Nodeinfo>(&href).await?;
|
let nodeinfo = match state.requests.fetch_json::<Nodeinfo>(&href).await {
|
||||||
|
Ok(nodeinfo) => nodeinfo,
|
||||||
|
Err(e) if e.is_breaker() => {
|
||||||
|
tracing::debug!("Not retrying due to failed breaker");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
};
|
||||||
|
|
||||||
state
|
state
|
||||||
.node_cache
|
.node_cache
|
||||||
|
|
|
@ -16,7 +16,7 @@ use std::{future::Future, pin::Pin};
|
||||||
pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State);
|
pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State);
|
||||||
|
|
||||||
impl MyVerify {
|
impl MyVerify {
|
||||||
#[tracing::instrument("Verify signature", skip(self))]
|
#[tracing::instrument("Verify signature", skip(self, signature))]
|
||||||
async fn verify(
|
async fn verify(
|
||||||
&self,
|
&self,
|
||||||
algorithm: Option<Algorithm>,
|
algorithm: Option<Algorithm>,
|
||||||
|
|
|
@ -20,7 +20,6 @@ use std::{
|
||||||
},
|
},
|
||||||
time::{Duration, SystemTime},
|
time::{Duration, SystemTime},
|
||||||
};
|
};
|
||||||
use tracing::{debug, info, warn};
|
|
||||||
use tracing_awc::Tracing;
|
use tracing_awc::Tracing;
|
||||||
|
|
||||||
const ONE_SECOND: u64 = 1;
|
const ONE_SECOND: u64 = 1;
|
||||||
|
@ -193,7 +192,7 @@ impl Requests {
|
||||||
fn count_err(&self) {
|
fn count_err(&self) {
|
||||||
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
|
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
|
||||||
if count + 1 >= self.error_limit {
|
if count + 1 >= self.error_limit {
|
||||||
warn!("{} consecutive errors, rebuilding http client", count);
|
tracing::warn!("{} consecutive errors, rebuilding http client", count);
|
||||||
*self.client.borrow_mut() = Client::builder()
|
*self.client.borrow_mut() = Client::builder()
|
||||||
.wrap(Tracing)
|
.wrap(Tracing)
|
||||||
.add_default_header(("User-Agent", self.user_agent.clone()))
|
.add_default_header(("User-Agent", self.user_agent.clone()))
|
||||||
|
@ -261,7 +260,7 @@ impl Requests {
|
||||||
if let Ok(bytes) = res.body().await {
|
if let Ok(bytes) = res.body().await {
|
||||||
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
||||||
if !s.is_empty() {
|
if !s.is_empty() {
|
||||||
debug!("Response from {}, {}", url, s);
|
tracing::warn!("Response from {}, {}", url, s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -289,7 +288,7 @@ impl Requests {
|
||||||
return Err(ErrorKind::Breaker.into());
|
return Err(ErrorKind::Breaker.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Fetching bytes for {}", url);
|
tracing::info!("Fetching bytes for {}", url);
|
||||||
let signer = self.signer();
|
let signer = self.signer();
|
||||||
|
|
||||||
let client: Client = self.client.borrow().clone();
|
let client: Client = self.client.borrow().clone();
|
||||||
|
@ -329,7 +328,7 @@ impl Requests {
|
||||||
if let Ok(bytes) = res.body().await {
|
if let Ok(bytes) = res.body().await {
|
||||||
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
||||||
if !s.is_empty() {
|
if !s.is_empty() {
|
||||||
debug!("Response from {}, {}", url, s);
|
tracing::warn!("Response from {}, {}", url, s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -400,7 +399,7 @@ impl Requests {
|
||||||
if let Ok(bytes) = res.body().await {
|
if let Ok(bytes) = res.body().await {
|
||||||
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
|
||||||
if !s.is_empty() {
|
if !s.is_empty() {
|
||||||
warn!("Response from {}, {}", inbox.as_str(), s);
|
tracing::warn!("Response from {}, {}", inbox.as_str(), s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ use activitystreams::{
|
||||||
use actix_web::{web, HttpResponse};
|
use actix_web::{web, HttpResponse};
|
||||||
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
|
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
|
||||||
|
|
||||||
#[tracing::instrument(name = "Inbox", skip(actors, client, jobs, config, state))]
|
#[tracing::instrument(name = "Inbox", skip(actors, client, jobs, config, state, verified))]
|
||||||
pub(crate) async fn route(
|
pub(crate) async fn route(
|
||||||
state: web::Data<State>,
|
state: web::Data<State>,
|
||||||
actors: web::Data<ActorCache>,
|
actors: web::Data<ActorCache>,
|
||||||
|
|
Loading…
Reference in a new issue