forked from mirrors/relay
Log cleanup
This commit is contained in:
parent
e1e77b0bdf
commit
bc263701e2
22 changed files with 70 additions and 49 deletions
|
@ -229,11 +229,12 @@ impl Config {
|
|||
self.do_generate_url(kind).expect("Generated valid IRI")
|
||||
}
|
||||
|
||||
#[tracing::instrument(fields(base_uri = tracing::field::debug(&self.base_uri), kind = tracing::field::debug(&kind)))]
|
||||
#[tracing::instrument(level = "debug", skip_all, fields(base_uri = tracing::field::debug(&self.base_uri), kind = tracing::field::debug(&kind)))]
|
||||
fn do_generate_url(&self, kind: UrlKind) -> Result<IriString, Error> {
|
||||
let iri = match kind {
|
||||
UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref())
|
||||
.try_resolve(IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref())?,
|
||||
UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref()).try_resolve(
|
||||
IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref(),
|
||||
)?,
|
||||
UrlKind::Actor => FixedBaseResolver::new(self.base_uri.as_ref())
|
||||
.try_resolve(IriRelativeStr::new("actor")?.as_ref())?,
|
||||
UrlKind::Followers => FixedBaseResolver::new(self.base_uri.as_ref())
|
||||
|
|
|
@ -37,7 +37,7 @@ impl ActorCache {
|
|||
ActorCache { db }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Get Actor", fields(id = id.to_string().as_str(), requests))]
|
||||
#[tracing::instrument(name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))]
|
||||
pub(crate) async fn get(
|
||||
&self,
|
||||
id: &IriString,
|
||||
|
@ -54,18 +54,18 @@ impl ActorCache {
|
|||
.map(MaybeCached::Fetched)
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Add Connection")]
|
||||
#[tracing::instrument(name = "Add Connection", skip(self))]
|
||||
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
|
||||
self.db.add_connection(actor.id.clone()).await?;
|
||||
self.db.save_actor(actor).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Remove Connection")]
|
||||
#[tracing::instrument(name = "Remove Connection", skip(self))]
|
||||
pub(crate) async fn remove_connection(&self, actor: &Actor) -> Result<(), Error> {
|
||||
self.db.remove_connection(actor.id.clone()).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Fetch remote actor", fields(id = id.to_string().as_str(), requests))]
|
||||
#[tracing::instrument(name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))]
|
||||
pub(crate) async fn get_no_cache(
|
||||
&self,
|
||||
id: &IriString,
|
||||
|
|
|
@ -19,17 +19,17 @@ impl MediaCache {
|
|||
MediaCache { db }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Get media uuid", fields(url = url.to_string().as_str()))]
|
||||
#[tracing::instrument(name = "Get media uuid", skip_all, fields(url = url.to_string().as_str()))]
|
||||
pub(crate) async fn get_uuid(&self, url: IriString) -> Result<Option<Uuid>, Error> {
|
||||
self.db.media_id(url).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Get media url")]
|
||||
#[tracing::instrument(name = "Get media url", skip(self))]
|
||||
pub(crate) async fn get_url(&self, uuid: Uuid) -> Result<Option<IriString>, Error> {
|
||||
self.db.media_url(uuid).await
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Is media outdated")]
|
||||
#[tracing::instrument(name = "Is media outdated", skip(self))]
|
||||
pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result<bool, Error> {
|
||||
if let Some(meta) = self.db.media_meta(uuid).await? {
|
||||
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
|
||||
|
@ -40,7 +40,7 @@ impl MediaCache {
|
|||
Ok(true)
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Get media bytes")]
|
||||
#[tracing::instrument(name = "Get media bytes", skip(self))]
|
||||
pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result<Option<(String, Bytes)>, Error> {
|
||||
if let Some(meta) = self.db.media_meta(uuid).await? {
|
||||
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
|
||||
|
@ -55,7 +55,7 @@ impl MediaCache {
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Store media url", fields(url = url.to_string().as_str()))]
|
||||
#[tracing::instrument(name = "Store media url", skip_all, fields(url = url.to_string().as_str()))]
|
||||
pub(crate) async fn store_url(&self, url: IriString) -> Result<Uuid, Error> {
|
||||
let uuid = Uuid::new_v4();
|
||||
|
||||
|
@ -64,7 +64,7 @@ impl MediaCache {
|
|||
Ok(uuid)
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "store media bytes", skip(bytes))]
|
||||
#[tracing::instrument(name = "store media bytes", skip(self, bytes))]
|
||||
pub(crate) async fn store_bytes(
|
||||
&self,
|
||||
uuid: Uuid,
|
||||
|
|
|
@ -34,7 +34,7 @@ impl NodeCache {
|
|||
NodeCache { db }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Get nodes")]
|
||||
#[tracing::instrument(name = "Get nodes", skip(self))]
|
||||
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
|
||||
let infos = self.db.connected_info().await?;
|
||||
let instances = self.db.connected_instance().await?;
|
||||
|
@ -57,7 +57,7 @@ impl NodeCache {
|
|||
Ok(vec)
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Is NodeInfo Outdated", fields(actor_id = actor_id.to_string().as_str()))]
|
||||
#[tracing::instrument(name = "Is NodeInfo Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
|
||||
pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: IriString) -> bool {
|
||||
self.db
|
||||
.info(actor_id)
|
||||
|
@ -66,7 +66,7 @@ impl NodeCache {
|
|||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Is Contact Outdated", fields(actor_id = actor_id.to_string().as_str()))]
|
||||
#[tracing::instrument(name = "Is Contact Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
|
||||
pub(crate) async fn is_contact_outdated(&self, actor_id: IriString) -> bool {
|
||||
self.db
|
||||
.contact(actor_id)
|
||||
|
@ -75,7 +75,7 @@ impl NodeCache {
|
|||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Is Instance Outdated", fields(actor_id = actor_id.to_string().as_str()))]
|
||||
#[tracing::instrument(name = "Is Instance Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
|
||||
pub(crate) async fn is_instance_outdated(&self, actor_id: IriString) -> bool {
|
||||
self.db
|
||||
.instance(actor_id)
|
||||
|
@ -84,7 +84,7 @@ impl NodeCache {
|
|||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Save node info", fields(actor_id = actor_id.to_string().as_str(), software, version, reg))]
|
||||
#[tracing::instrument(name = "Save node info", skip_all, fields(actor_id = actor_id.to_string().as_str(), software, version, reg))]
|
||||
pub(crate) async fn set_info(
|
||||
&self,
|
||||
actor_id: IriString,
|
||||
|
@ -107,6 +107,7 @@ impl NodeCache {
|
|||
|
||||
#[tracing::instrument(
|
||||
name = "Save instance info",
|
||||
skip_all,
|
||||
fields(
|
||||
actor_id = actor_id.to_string().as_str(),
|
||||
title,
|
||||
|
@ -142,6 +143,7 @@ impl NodeCache {
|
|||
|
||||
#[tracing::instrument(
|
||||
name = "Save contact info",
|
||||
skip_all,
|
||||
fields(
|
||||
actor_id = actor_id.to_string().as_str(),
|
||||
username,
|
||||
|
|
|
@ -50,6 +50,7 @@ impl State {
|
|||
|
||||
#[tracing::instrument(
|
||||
name = "Get inboxes for other domains",
|
||||
skip_all,
|
||||
fields(
|
||||
existing_inbox = existing_inbox.to_string().as_str(),
|
||||
authority
|
||||
|
@ -85,7 +86,7 @@ impl State {
|
|||
self.object_cache.write().await.put(object_id, actor_id);
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Building state")]
|
||||
#[tracing::instrument(name = "Building state", skip_all)]
|
||||
pub(crate) async fn build(db: Db) -> Result<Self, Error> {
|
||||
let private_key = if let Ok(Some(key)) = db.private_key().await {
|
||||
info!("Using existing key");
|
||||
|
|
|
@ -31,7 +31,7 @@ impl Announce {
|
|||
Announce { object_id, actor }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Announce")]
|
||||
#[tracing::instrument(name = "Announce", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
let activity_id = state.config.generate_url(UrlKind::Activity);
|
||||
|
||||
|
@ -39,7 +39,8 @@ impl Announce {
|
|||
let inboxes = get_inboxes(&state.state, &self.actor, &self.object_id).await?;
|
||||
state
|
||||
.job_server
|
||||
.queue(DeliverMany::new(inboxes, announce)?).await?;
|
||||
.queue(DeliverMany::new(inboxes, announce)?)
|
||||
.await?;
|
||||
|
||||
state.state.cache(self.object_id, activity_id).await;
|
||||
Ok(())
|
||||
|
|
|
@ -7,8 +7,8 @@ use crate::{
|
|||
};
|
||||
use activitystreams::{
|
||||
activity::{Accept as AsAccept, Follow as AsFollow},
|
||||
prelude::*,
|
||||
iri_string::types::IriString,
|
||||
prelude::*,
|
||||
};
|
||||
use background_jobs::ActixJob;
|
||||
use std::{future::Future, pin::Pin};
|
||||
|
@ -24,7 +24,7 @@ impl Follow {
|
|||
Follow { input, actor }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Follow")]
|
||||
#[tracing::instrument(name = "Follow", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
let my_id = state.config.generate_url(UrlKind::Actor);
|
||||
|
||||
|
@ -35,7 +35,8 @@ impl Follow {
|
|||
let follow = generate_follow(&state.config, &self.actor.id, &my_id)?;
|
||||
state
|
||||
.job_server
|
||||
.queue(Deliver::new(self.actor.inbox.clone(), follow)?).await?;
|
||||
.queue(Deliver::new(self.actor.inbox.clone(), follow)?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
state.actors.add_connection(self.actor.clone()).await?;
|
||||
|
@ -49,20 +50,29 @@ impl Follow {
|
|||
|
||||
state
|
||||
.job_server
|
||||
.queue(Deliver::new(self.actor.inbox, accept)?).await?;
|
||||
.queue(Deliver::new(self.actor.inbox, accept)?)
|
||||
.await?;
|
||||
|
||||
state
|
||||
.job_server
|
||||
.queue(QueryInstance::new(self.actor.id.clone())).await?;
|
||||
.queue(QueryInstance::new(self.actor.id.clone()))
|
||||
.await?;
|
||||
|
||||
state.job_server.queue(QueryNodeinfo::new(self.actor.id)).await?;
|
||||
state
|
||||
.job_server
|
||||
.queue(QueryNodeinfo::new(self.actor.id))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Generate a type that says "I want to follow you"
|
||||
fn generate_follow(config: &Config, actor_id: &IriString, my_id: &IriString) -> Result<AsFollow, Error> {
|
||||
fn generate_follow(
|
||||
config: &Config,
|
||||
actor_id: &IriString,
|
||||
my_id: &IriString,
|
||||
) -> Result<AsFollow, Error> {
|
||||
let follow = AsFollow::new(my_id.clone(), actor_id.clone());
|
||||
|
||||
prepare_activity(
|
||||
|
|
|
@ -19,7 +19,7 @@ impl Forward {
|
|||
Forward { input, actor }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Forward")]
|
||||
#[tracing::instrument(name = "Forward", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
let object_id = self
|
||||
.input
|
||||
|
|
|
@ -11,14 +11,17 @@ use std::{future::Future, pin::Pin};
|
|||
pub(crate) struct Reject(pub(crate) Actor);
|
||||
|
||||
impl Reject {
|
||||
#[tracing::instrument(name = "Reject")]
|
||||
#[tracing::instrument(name = "Reject", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
state.actors.remove_connection(&self.0).await?;
|
||||
|
||||
let my_id = state.config.generate_url(UrlKind::Actor);
|
||||
let undo = generate_undo_follow(&state.config, &self.0.id, &my_id)?;
|
||||
|
||||
state.job_server.queue(Deliver::new(self.0.inbox, undo)?).await?;
|
||||
state
|
||||
.job_server
|
||||
.queue(Deliver::new(self.0.inbox, undo)?)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ impl Undo {
|
|||
Undo { input, actor }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Undo")]
|
||||
#[tracing::instrument(name = "Undo", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
let was_following = state.state.db.is_connected(self.actor.id.clone()).await?;
|
||||
|
||||
|
@ -30,7 +30,8 @@ impl Undo {
|
|||
let undo = generate_undo_follow(&state.config, &self.actor.id, &my_id)?;
|
||||
state
|
||||
.job_server
|
||||
.queue(Deliver::new(self.actor.inbox, undo)?).await?;
|
||||
.queue(Deliver::new(self.actor.inbox, undo)?)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -13,7 +13,7 @@ impl CacheMedia {
|
|||
CacheMedia { uuid }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Cache media")]
|
||||
#[tracing::instrument(name = "Cache media", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
if !state.media.is_outdated(self.uuid).await? {
|
||||
return Ok(());
|
||||
|
|
|
@ -29,7 +29,7 @@ impl Deliver {
|
|||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Deliver")]
|
||||
#[tracing::instrument(name = "Deliver", skip(state))]
|
||||
async fn permform(self, state: JobState) -> Result<(), Error> {
|
||||
state.requests.deliver(self.to, &self.data).await?;
|
||||
Ok(())
|
||||
|
|
|
@ -40,7 +40,7 @@ impl DeliverMany {
|
|||
})
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Deliver many")]
|
||||
#[tracing::instrument(name = "Deliver many", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
for inbox in self.to {
|
||||
state
|
||||
|
|
|
@ -25,7 +25,7 @@ impl QueryInstance {
|
|||
QueryInstance { actor_id }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Query instance")]
|
||||
#[tracing::instrument(name = "Query instance", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
let contact_outdated = state
|
||||
.node_cache
|
||||
|
|
|
@ -24,7 +24,7 @@ impl QueryNodeinfo {
|
|||
QueryNodeinfo { actor_id }
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Query node info")]
|
||||
#[tracing::instrument(name = "Query node info", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
if !state
|
||||
.node_cache
|
||||
|
|
|
@ -9,12 +9,13 @@ use std::{future::Future, pin::Pin};
|
|||
pub(crate) struct Listeners;
|
||||
|
||||
impl Listeners {
|
||||
#[tracing::instrument(name = "Spawn query instances")]
|
||||
#[tracing::instrument(name = "Spawn query instances", skip(state))]
|
||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
||||
for actor_id in state.state.db.connected_ids().await? {
|
||||
state
|
||||
.job_server
|
||||
.queue(QueryInstance::new(actor_id.clone())).await?;
|
||||
.queue(QueryInstance::new(actor_id.clone()))
|
||||
.await?;
|
||||
state.job_server.queue(QueryNodeinfo::new(actor_id)).await?;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ use std::{future::Future, pin::Pin};
|
|||
pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State);
|
||||
|
||||
impl MyVerify {
|
||||
#[tracing::instrument("Verify signature")]
|
||||
#[tracing::instrument("Verify signature", skip(self))]
|
||||
async fn verify(
|
||||
&self,
|
||||
algorithm: Option<Algorithm>,
|
||||
|
|
|
@ -203,7 +203,7 @@ impl Requests {
|
|||
self.consecutive_errors.swap(0, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Fetch Json")]
|
||||
#[tracing::instrument(name = "Fetch Json", skip(self))]
|
||||
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
|
@ -211,7 +211,7 @@ impl Requests {
|
|||
self.do_fetch(url, "application/json").await
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Fetch Activity+Json")]
|
||||
#[tracing::instrument(name = "Fetch Activity+Json", skip(self))]
|
||||
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
|
@ -278,7 +278,7 @@ impl Requests {
|
|||
Ok(serde_json::from_slice(body.as_ref())?)
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "Fetch Bytes")]
|
||||
#[tracing::instrument(name = "Fetch Bytes", skip(self))]
|
||||
pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> {
|
||||
let parsed_url = url.parse::<IriString>()?;
|
||||
|
||||
|
@ -350,7 +350,8 @@ impl Requests {
|
|||
|
||||
#[tracing::instrument(
|
||||
"Deliver to Inbox",
|
||||
fields(self, inbox = inbox.to_string().as_str(), item)
|
||||
skip_all,
|
||||
fields(inbox = inbox.to_string().as_str(), item)
|
||||
)]
|
||||
pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error>
|
||||
where
|
||||
|
|
|
@ -15,7 +15,7 @@ use activitystreams_ext::Ext1;
|
|||
use actix_web::{web, Responder};
|
||||
use rsa::pkcs8::EncodePublicKey;
|
||||
|
||||
#[tracing::instrument(name = "Actor")]
|
||||
#[tracing::instrument(name = "Actor", skip(config, state))]
|
||||
pub(crate) async fn route(
|
||||
state: web::Data<State>,
|
||||
config: web::Data<Config>,
|
||||
|
|
|
@ -17,7 +17,7 @@ use actix_web::{web, HttpResponse};
|
|||
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
|
||||
use tracing::error;
|
||||
|
||||
#[tracing::instrument(name = "Inbox")]
|
||||
#[tracing::instrument(name = "Inbox", skip(config, state))]
|
||||
pub(crate) async fn route(
|
||||
state: web::Data<State>,
|
||||
actors: web::Data<ActorCache>,
|
||||
|
|
|
@ -8,7 +8,7 @@ use rand::{seq::SliceRandom, thread_rng};
|
|||
use std::io::BufWriter;
|
||||
use tracing::error;
|
||||
|
||||
#[tracing::instrument(name = "Index")]
|
||||
#[tracing::instrument(name = "Index", skip(config, state))]
|
||||
pub(crate) async fn route(
|
||||
state: web::Data<State>,
|
||||
config: web::Data<Config>,
|
||||
|
|
|
@ -5,7 +5,7 @@ use crate::{
|
|||
use actix_web::{web, Responder};
|
||||
use actix_webfinger::Link;
|
||||
|
||||
#[tracing::instrument(name = "Well Known NodeInfo")]
|
||||
#[tracing::instrument(name = "Well Known NodeInfo", skip(config))]
|
||||
pub(crate) async fn well_known(config: web::Data<Config>) -> impl Responder {
|
||||
web::Json(Links {
|
||||
links: vec![Link {
|
||||
|
|
Loading…
Reference in a new issue