Instrument with tracing

This commit is contained in:
Aode (lion) 2021-09-18 12:55:39 -05:00
parent ebba8e3f60
commit 43e5b6d873
34 changed files with 748 additions and 707 deletions

725
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -15,26 +15,22 @@ build = "src/build.rs"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
actix-rt = "2.0.2" actix-rt = "2.0.2"
actix-web = { version = "4.0.0-beta.7", default-features = false, features = ["compress-brotli", "compress-gzip", "compress-zstd"] } actix-web = { version = "4.0.0-beta.7", default-features = false }
actix-webfinger = "0.4.0-beta.3" actix-webfinger = "0.4.0-beta.3"
activitystreams = "0.7.0-alpha.10" activitystreams = "0.7.0-alpha.10"
activitystreams-ext = "0.1.0-alpha.2" activitystreams-ext = "0.1.0-alpha.2"
ammonia = "3.1.0" ammonia = "3.1.0"
async-mutex = "1.0.1" async-mutex = "1.0.1"
async-rwlock = "1.3.0" async-rwlock = "1.3.0"
awc = { version = "3.0.0-beta.6", default-features = false, features = ["compress-brotli", "compress-gzip", "compress-zstd", "rustls"] } awc = { version = "3.0.0-beta.6", default-features = false, features = ["rustls"] }
background-jobs = "0.9.0"
base64 = "0.13" base64 = "0.13"
chrono = "0.4.19" chrono = "0.4.19"
config = "0.11.0" config = "0.11.0"
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.9.0" futures-util = "0.3.17"
futures = "0.3.12" http-signature-normalization-actix = { version = "0.5.0-beta.7", default-features = false, features = ["sha-2"], git = "https://git.asonix.dog/asonix/http-signature-normalization" }
http-signature-normalization-actix = { version = "0.5.0-beta.6", default-features = false, features = ["sha-2"] }
log = "0.4"
lru = "0.6.0" lru = "0.6.0"
mime = "0.3.16" mime = "0.3.16"
pretty_env_logger = "0.4.0"
rand = "0.8" rand = "0.8"
rsa = "0.5" rsa = "0.5"
rsa-magic-public-key = "0.4.0" rsa-magic-public-key = "0.4.0"
@ -44,8 +40,24 @@ sha2 = "0.9"
sled = "0.34.6" sled = "0.34.6"
structopt = "0.3.12" structopt = "0.3.12"
thiserror = "1.0" thiserror = "1.0"
tracing = "0.1"
tracing-actix-web = { version = "0.4.0-beta.12", git = "https://github.com/asonix/tracing-actix-web", branch = "asonix/tracing-error-work-around" }
tracing-error = "0.1"
tracing-futures = "0.2"
tracing-log = "0.1"
tracing-subscriber = { version = "0.2", features = ["ansi", "fmt"] }
uuid = { version = "0.8", features = ["v4", "serde"] } uuid = { version = "0.8", features = ["v4", "serde"] }
[dependencies.background-jobs]
version = "0.10.0"
git = "https://git.asonix.dog/asonix/background-jobs"
default-features = false
features = [
"background-jobs-actix",
"error-logging"
]
[build-dependencies] [build-dependencies]
anyhow = "1.0" anyhow = "1.0"
dotenv = "0.15.0" dotenv = "0.15.0"

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
data::{ActorCache, State}, data::{ActorCache, State},
error::MyError, error::Error,
middleware::MyVerify, middleware::MyVerify,
requests::Requests, requests::Requests,
}; };
@ -20,7 +20,6 @@ pub(crate) struct ParsedConfig {
restricted_mode: bool, restricted_mode: bool,
validate_signatures: bool, validate_signatures: bool,
https: bool, https: bool,
pretty_log: bool,
publish_blocks: bool, publish_blocks: bool,
sled_path: PathBuf, sled_path: PathBuf,
source_repo: Url, source_repo: Url,
@ -34,7 +33,6 @@ pub struct Config {
debug: bool, debug: bool,
restricted_mode: bool, restricted_mode: bool,
validate_signatures: bool, validate_signatures: bool,
pretty_log: bool,
publish_blocks: bool, publish_blocks: bool,
base_uri: Url, base_uri: Url,
sled_path: PathBuf, sled_path: PathBuf,
@ -55,7 +53,7 @@ pub enum UrlKind {
} }
impl Config { impl Config {
pub(crate) fn build() -> Result<Self, MyError> { pub(crate) fn build() -> Result<Self, Error> {
let mut config = config::Config::new(); let mut config = config::Config::new();
config config
.set_default("hostname", "localhost:8080")? .set_default("hostname", "localhost:8080")?
@ -65,7 +63,6 @@ impl Config {
.set_default("restricted_mode", false)? .set_default("restricted_mode", false)?
.set_default("validate_signatures", false)? .set_default("validate_signatures", false)?
.set_default("https", false)? .set_default("https", false)?
.set_default("pretty_log", true)?
.set_default("publish_blocks", false)? .set_default("publish_blocks", false)?
.set_default("sled_path", "./sled/db-0-34")? .set_default("sled_path", "./sled/db-0-34")?
.set_default("source_repo", "https://git.asonix.dog/asonix/relay")? .set_default("source_repo", "https://git.asonix.dog/asonix/relay")?
@ -83,7 +80,6 @@ impl Config {
debug: config.debug, debug: config.debug,
restricted_mode: config.restricted_mode, restricted_mode: config.restricted_mode,
validate_signatures: config.validate_signatures, validate_signatures: config.validate_signatures,
pretty_log: config.pretty_log,
publish_blocks: config.publish_blocks, publish_blocks: config.publish_blocks,
base_uri, base_uri,
sled_path: config.sled_path, sled_path: config.sled_path,
@ -95,10 +91,6 @@ impl Config {
&self.sled_path &self.sled_path
} }
pub(crate) fn pretty_log(&self) -> bool {
self.pretty_log
}
pub(crate) fn validate_signatures(&self) -> bool { pub(crate) fn validate_signatures(&self) -> bool {
self.validate_signatures self.validate_signatures
} }

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
apub::AcceptedActors, apub::AcceptedActors,
db::{Actor, Db}, db::{Actor, Db},
error::MyError, error::{Error, ErrorKind},
requests::Requests, requests::Requests,
}; };
use activitystreams::{prelude::*, url::Url}; use activitystreams::{prelude::*, url::Url};
@ -30,7 +30,7 @@ impl<T> MaybeCached<T> {
} }
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct ActorCache { pub struct ActorCache {
db: Db, db: Db,
} }
@ -40,11 +40,12 @@ impl ActorCache {
ActorCache { db } ActorCache { db }
} }
#[tracing::instrument(name = "Get Actor", skip(requests))]
pub(crate) async fn get( pub(crate) async fn get(
&self, &self,
id: &Url, id: &Url,
requests: &Requests, requests: &Requests,
) -> Result<MaybeCached<Actor>, MyError> { ) -> Result<MaybeCached<Actor>, Error> {
if let Some(actor) = self.db.actor(id.clone()).await? { if let Some(actor) = self.db.actor(id.clone()).await? {
if actor.saved_at + REFETCH_DURATION > SystemTime::now() { if actor.saved_at + REFETCH_DURATION > SystemTime::now() {
return Ok(MaybeCached::Cached(actor)); return Ok(MaybeCached::Cached(actor));
@ -56,26 +57,25 @@ impl ActorCache {
.map(MaybeCached::Fetched) .map(MaybeCached::Fetched)
} }
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), MyError> { #[tracing::instrument(name = "Add Connection")]
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
self.db.add_connection(actor.id.clone()).await?; self.db.add_connection(actor.id.clone()).await?;
self.db.save_actor(actor).await self.db.save_actor(actor).await
} }
pub(crate) async fn remove_connection(&self, actor: &Actor) -> Result<(), MyError> { #[tracing::instrument(name = "Remove Connection")]
pub(crate) async fn remove_connection(&self, actor: &Actor) -> Result<(), Error> {
self.db.remove_connection(actor.id.clone()).await self.db.remove_connection(actor.id.clone()).await
} }
pub(crate) async fn get_no_cache( #[tracing::instrument(name = "Fetch remote actor", skip(requests))]
&self, pub(crate) async fn get_no_cache(&self, id: &Url, requests: &Requests) -> Result<Actor, Error> {
id: &Url,
requests: &Requests,
) -> Result<Actor, MyError> {
let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?; let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?;
let input_domain = id.domain().ok_or(MyError::MissingDomain)?; let input_domain = id.domain().ok_or(ErrorKind::MissingDomain)?;
let accepted_actor_id = accepted_actor let accepted_actor_id = accepted_actor
.id(&input_domain)? .id(&input_domain)?
.ok_or(MyError::MissingId)?; .ok_or(ErrorKind::MissingId)?;
let inbox = get_inbox(&accepted_actor)?.clone(); let inbox = get_inbox(&accepted_actor)?.clone();
@ -93,7 +93,7 @@ impl ActorCache {
} }
} }
fn get_inbox(actor: &AcceptedActors) -> Result<&Url, MyError> { fn get_inbox(actor: &AcceptedActors) -> Result<&Url, Error> {
Ok(actor Ok(actor
.endpoints()? .endpoints()?
.and_then(|e| e.shared_inbox) .and_then(|e| e.shared_inbox)

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
db::{Db, MediaMeta}, db::{Db, MediaMeta},
error::MyError, error::Error,
}; };
use activitystreams::url::Url; use activitystreams::url::Url;
use actix_web::web::Bytes; use actix_web::web::Bytes;
@ -9,7 +9,7 @@ use uuid::Uuid;
static MEDIA_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 2); static MEDIA_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 2);
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct MediaCache { pub struct MediaCache {
db: Db, db: Db,
} }
@ -19,15 +19,18 @@ impl MediaCache {
MediaCache { db } MediaCache { db }
} }
pub(crate) async fn get_uuid(&self, url: Url) -> Result<Option<Uuid>, MyError> { #[tracing::instrument(name = "Get media uuid")]
pub(crate) async fn get_uuid(&self, url: Url) -> Result<Option<Uuid>, Error> {
self.db.media_id(url).await self.db.media_id(url).await
} }
pub(crate) async fn get_url(&self, uuid: Uuid) -> Result<Option<Url>, MyError> { #[tracing::instrument(name = "Get media url")]
pub(crate) async fn get_url(&self, uuid: Uuid) -> Result<Option<Url>, Error> {
self.db.media_url(uuid).await self.db.media_url(uuid).await
} }
pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result<bool, MyError> { #[tracing::instrument(name = "Is media outdated")]
pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result<bool, Error> {
if let Some(meta) = self.db.media_meta(uuid).await? { if let Some(meta) = self.db.media_meta(uuid).await? {
if meta.saved_at + MEDIA_DURATION > SystemTime::now() { if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
return Ok(false); return Ok(false);
@ -37,7 +40,8 @@ impl MediaCache {
Ok(true) Ok(true)
} }
pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result<Option<(String, Bytes)>, MyError> { #[tracing::instrument(name = "Get media bytes")]
pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result<Option<(String, Bytes)>, Error> {
if let Some(meta) = self.db.media_meta(uuid).await? { if let Some(meta) = self.db.media_meta(uuid).await? {
if meta.saved_at + MEDIA_DURATION > SystemTime::now() { if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
return self return self
@ -51,7 +55,8 @@ impl MediaCache {
Ok(None) Ok(None)
} }
pub(crate) async fn store_url(&self, url: Url) -> Result<Uuid, MyError> { #[tracing::instrument(name = "Store media url")]
pub(crate) async fn store_url(&self, url: Url) -> Result<Uuid, Error> {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
self.db.save_url(url, uuid).await?; self.db.save_url(url, uuid).await?;
@ -59,12 +64,13 @@ impl MediaCache {
Ok(uuid) Ok(uuid)
} }
#[tracing::instrument(name = "store media bytes", skip(bytes))]
pub(crate) async fn store_bytes( pub(crate) async fn store_bytes(
&self, &self,
uuid: Uuid, uuid: Uuid,
media_type: String, media_type: String,
bytes: Bytes, bytes: Bytes,
) -> Result<(), MyError> { ) -> Result<(), Error> {
self.db self.db
.save_bytes( .save_bytes(
uuid, uuid,

View file

@ -1,11 +1,11 @@
use crate::{ use crate::{
db::{Contact, Db, Info, Instance}, db::{Contact, Db, Info, Instance},
error::MyError, error::Error,
}; };
use activitystreams::url::Url; use activitystreams::url::Url;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct NodeCache { pub struct NodeCache {
db: Db, db: Db,
} }
@ -23,7 +23,8 @@ impl NodeCache {
NodeCache { db } NodeCache { db }
} }
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, MyError> { #[tracing::instrument(name = "Get nodes")]
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
let infos = self.db.connected_info().await?; let infos = self.db.connected_info().await?;
let instances = self.db.connected_instance().await?; let instances = self.db.connected_instance().await?;
let contacts = self.db.connected_contact().await?; let contacts = self.db.connected_contact().await?;
@ -48,6 +49,7 @@ impl NodeCache {
Ok(vec) Ok(vec)
} }
#[tracing::instrument(name = "Is NodeInfo Outdated")]
pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: Url) -> bool { pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: Url) -> bool {
self.db self.db
.info(actor_id) .info(actor_id)
@ -56,6 +58,7 @@ impl NodeCache {
.unwrap_or(true) .unwrap_or(true)
} }
#[tracing::instrument(name = "Is Contact Outdated")]
pub(crate) async fn is_contact_outdated(&self, actor_id: Url) -> bool { pub(crate) async fn is_contact_outdated(&self, actor_id: Url) -> bool {
self.db self.db
.contact(actor_id) .contact(actor_id)
@ -64,6 +67,7 @@ impl NodeCache {
.unwrap_or(true) .unwrap_or(true)
} }
#[tracing::instrument(name = "Is Instance Outdated")]
pub(crate) async fn is_instance_outdated(&self, actor_id: Url) -> bool { pub(crate) async fn is_instance_outdated(&self, actor_id: Url) -> bool {
self.db self.db
.instance(actor_id) .instance(actor_id)
@ -72,13 +76,14 @@ impl NodeCache {
.unwrap_or(true) .unwrap_or(true)
} }
#[tracing::instrument(name = "Save node info")]
pub(crate) async fn set_info( pub(crate) async fn set_info(
&self, &self,
actor_id: Url, actor_id: Url,
software: String, software: String,
version: String, version: String,
reg: bool, reg: bool,
) -> Result<(), MyError> { ) -> Result<(), Error> {
self.db self.db
.save_info( .save_info(
actor_id, actor_id,
@ -92,6 +97,7 @@ impl NodeCache {
.await .await
} }
#[tracing::instrument(name = "Save instance info")]
pub(crate) async fn set_instance( pub(crate) async fn set_instance(
&self, &self,
actor_id: Url, actor_id: Url,
@ -100,7 +106,7 @@ impl NodeCache {
version: String, version: String,
reg: bool, reg: bool,
requires_approval: bool, requires_approval: bool,
) -> Result<(), MyError> { ) -> Result<(), Error> {
self.db self.db
.save_instance( .save_instance(
actor_id, actor_id,
@ -116,6 +122,7 @@ impl NodeCache {
.await .await
} }
#[tracing::instrument(name = "Save contact info")]
pub(crate) async fn set_contact( pub(crate) async fn set_contact(
&self, &self,
actor_id: Url, actor_id: Url,
@ -123,7 +130,7 @@ impl NodeCache {
display_name: String, display_name: String,
url: Url, url: Url,
avatar: Url, avatar: Url,
) -> Result<(), MyError> { ) -> Result<(), Error> {
self.db self.db
.save_contact( .save_contact(
actor_id, actor_id,

View file

@ -2,17 +2,17 @@ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
data::NodeCache, data::NodeCache,
db::Db, db::Db,
error::MyError, error::Error,
requests::{Breakers, Requests}, requests::{Breakers, Requests},
}; };
use activitystreams::url::Url; use activitystreams::url::Url;
use actix_web::web; use actix_web::web;
use async_rwlock::RwLock; use async_rwlock::RwLock;
use log::info;
use lru::LruCache; use lru::LruCache;
use rand::thread_rng; use rand::thread_rng;
use rsa::{RsaPrivateKey, RsaPublicKey}; use rsa::{RsaPrivateKey, RsaPublicKey};
use std::sync::Arc; use std::sync::Arc;
use tracing::info;
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
@ -25,6 +25,20 @@ pub struct State {
pub(crate) db: Db, pub(crate) db: Db,
} }
impl std::fmt::Debug for State {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("State")
.field("public_key", &"PublicKey")
.field("private_key", &"[redacted]")
.field("config", &self.config)
.field("object_cache", &"Object Cache")
.field("node_cache", &self.node_cache)
.field("breakers", &self.breakers)
.field("db", &self.db)
.finish()
}
}
impl State { impl State {
pub(crate) fn node_cache(&self) -> NodeCache { pub(crate) fn node_cache(&self) -> NodeCache {
self.node_cache.clone() self.node_cache.clone()
@ -44,11 +58,12 @@ impl State {
) )
} }
#[tracing::instrument(name = "Get inboxes for other domains")]
pub(crate) async fn inboxes_without( pub(crate) async fn inboxes_without(
&self, &self,
existing_inbox: &Url, existing_inbox: &Url,
domain: &str, domain: &str,
) -> Result<Vec<Url>, MyError> { ) -> Result<Vec<Url>, Error> {
Ok(self Ok(self
.db .db
.inboxes() .inboxes()
@ -74,8 +89,10 @@ impl State {
self.object_cache.write().await.put(object_id, actor_id); self.object_cache.write().await.put(object_id, actor_id);
} }
pub(crate) async fn build(config: Config, db: Db) -> Result<Self, MyError> { #[tracing::instrument(name = "Building state")]
pub(crate) async fn build(config: Config, db: Db) -> Result<Self, Error> {
let private_key = if let Ok(Some(key)) = db.private_key().await { let private_key = if let Ok(Some(key)) = db.private_key().await {
info!("Using existing key");
key key
} else { } else {
info!("Generating new keys"); info!("Generating new keys");

View file

@ -1,4 +1,4 @@
use crate::{config::Config, error::MyError}; use crate::{config::Config, error::Error};
use activitystreams::url::Url; use activitystreams::url::Url;
use actix_web::web::Bytes; use actix_web::web::Bytes;
use rsa::{ use rsa::{
@ -9,7 +9,7 @@ use sled::Tree;
use std::{collections::HashMap, sync::Arc, time::SystemTime}; use std::{collections::HashMap, sync::Arc, time::SystemTime};
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone)] #[derive(Clone, Debug)]
pub(crate) struct Db { pub(crate) struct Db {
inner: Arc<Inner>, inner: Arc<Inner>,
} }
@ -31,6 +31,14 @@ struct Inner {
restricted_mode: bool, restricted_mode: bool,
} }
impl std::fmt::Debug for Inner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Inner")
.field("restricted_mode", &self.restricted_mode)
.finish()
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Actor { pub struct Actor {
pub(crate) id: Url, pub(crate) id: Url,
@ -194,12 +202,12 @@ impl Inner {
} }
impl Db { impl Db {
pub(crate) fn build(config: &Config) -> Result<Self, MyError> { pub(crate) fn build(config: &Config) -> Result<Self, Error> {
let db = sled::open(config.sled_path())?; let db = sled::open(config.sled_path())?;
Self::build_inner(config.restricted_mode(), db) Self::build_inner(config.restricted_mode(), db)
} }
fn build_inner(restricted_mode: bool, db: sled::Db) -> Result<Self, MyError> { fn build_inner(restricted_mode: bool, db: sled::Db) -> Result<Self, Error> {
Ok(Db { Ok(Db {
inner: Arc::new(Inner { inner: Arc::new(Inner {
actor_id_actor: db.open_tree("actor-id-actor")?, actor_id_actor: db.open_tree("actor-id-actor")?,
@ -222,8 +230,8 @@ impl Db {
async fn unblock<T>( async fn unblock<T>(
&self, &self,
f: impl Fn(&Inner) -> Result<T, MyError> + Send + 'static, f: impl Fn(&Inner) -> Result<T, Error> + Send + 'static,
) -> Result<T, MyError> ) -> Result<T, Error>
where where
T: Send + 'static, T: Send + 'static,
{ {
@ -234,11 +242,11 @@ impl Db {
Ok(t) Ok(t)
} }
pub(crate) async fn connected_ids(&self) -> Result<Vec<Url>, MyError> { pub(crate) async fn connected_ids(&self) -> Result<Vec<Url>, Error> {
self.unblock(|inner| Ok(inner.connected().collect())).await self.unblock(|inner| Ok(inner.connected().collect())).await
} }
pub(crate) async fn save_info(&self, actor_id: Url, info: Info) -> Result<(), MyError> { pub(crate) async fn save_info(&self, actor_id: Url, info: Info) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
let vec = serde_json::to_vec(&info)?; let vec = serde_json::to_vec(&info)?;
@ -251,7 +259,7 @@ impl Db {
.await .await
} }
pub(crate) async fn info(&self, actor_id: Url) -> Result<Option<Info>, MyError> { pub(crate) async fn info(&self, actor_id: Url) -> Result<Option<Info>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_info.get(actor_id.as_str().as_bytes())? { if let Some(ivec) = inner.actor_id_info.get(actor_id.as_str().as_bytes())? {
let info = serde_json::from_slice(&ivec)?; let info = serde_json::from_slice(&ivec)?;
@ -263,7 +271,7 @@ impl Db {
.await .await
} }
pub(crate) async fn connected_info(&self) -> Result<HashMap<Url, Info>, MyError> { pub(crate) async fn connected_info(&self) -> Result<HashMap<Url, Info>, Error> {
self.unblock(|inner| Ok(inner.connected_info().collect())) self.unblock(|inner| Ok(inner.connected_info().collect()))
.await .await
} }
@ -272,7 +280,7 @@ impl Db {
&self, &self,
actor_id: Url, actor_id: Url,
instance: Instance, instance: Instance,
) -> Result<(), MyError> { ) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
let vec = serde_json::to_vec(&instance)?; let vec = serde_json::to_vec(&instance)?;
@ -285,7 +293,7 @@ impl Db {
.await .await
} }
pub(crate) async fn instance(&self, actor_id: Url) -> Result<Option<Instance>, MyError> { pub(crate) async fn instance(&self, actor_id: Url) -> Result<Option<Instance>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_instance.get(actor_id.as_str().as_bytes())? { if let Some(ivec) = inner.actor_id_instance.get(actor_id.as_str().as_bytes())? {
let instance = serde_json::from_slice(&ivec)?; let instance = serde_json::from_slice(&ivec)?;
@ -297,16 +305,12 @@ impl Db {
.await .await
} }
pub(crate) async fn connected_instance(&self) -> Result<HashMap<Url, Instance>, MyError> { pub(crate) async fn connected_instance(&self) -> Result<HashMap<Url, Instance>, Error> {
self.unblock(|inner| Ok(inner.connected_instance().collect())) self.unblock(|inner| Ok(inner.connected_instance().collect()))
.await .await
} }
pub(crate) async fn save_contact( pub(crate) async fn save_contact(&self, actor_id: Url, contact: Contact) -> Result<(), Error> {
&self,
actor_id: Url,
contact: Contact,
) -> Result<(), MyError> {
self.unblock(move |inner| { self.unblock(move |inner| {
let vec = serde_json::to_vec(&contact)?; let vec = serde_json::to_vec(&contact)?;
@ -319,7 +323,7 @@ impl Db {
.await .await
} }
pub(crate) async fn contact(&self, actor_id: Url) -> Result<Option<Contact>, MyError> { pub(crate) async fn contact(&self, actor_id: Url) -> Result<Option<Contact>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_contact.get(actor_id.as_str().as_bytes())? { if let Some(ivec) = inner.actor_id_contact.get(actor_id.as_str().as_bytes())? {
let contact = serde_json::from_slice(&ivec)?; let contact = serde_json::from_slice(&ivec)?;
@ -331,12 +335,12 @@ impl Db {
.await .await
} }
pub(crate) async fn connected_contact(&self) -> Result<HashMap<Url, Contact>, MyError> { pub(crate) async fn connected_contact(&self) -> Result<HashMap<Url, Contact>, Error> {
self.unblock(|inner| Ok(inner.connected_contact().collect())) self.unblock(|inner| Ok(inner.connected_contact().collect()))
.await .await
} }
pub(crate) async fn save_url(&self, url: Url, id: Uuid) -> Result<(), MyError> { pub(crate) async fn save_url(&self, url: Url, id: Uuid) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
inner inner
.media_id_media_url .media_id_media_url
@ -354,7 +358,7 @@ impl Db {
id: Uuid, id: Uuid,
meta: MediaMeta, meta: MediaMeta,
bytes: Bytes, bytes: Bytes,
) -> Result<(), MyError> { ) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
let vec = serde_json::to_vec(&meta)?; let vec = serde_json::to_vec(&meta)?;
@ -368,7 +372,7 @@ impl Db {
.await .await
} }
pub(crate) async fn media_id(&self, url: Url) -> Result<Option<Uuid>, MyError> { pub(crate) async fn media_id(&self, url: Url) -> Result<Option<Uuid>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? { if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? {
Ok(uuid_from_ivec(ivec)) Ok(uuid_from_ivec(ivec))
@ -379,7 +383,7 @@ impl Db {
.await .await
} }
pub(crate) async fn media_url(&self, id: Uuid) -> Result<Option<Url>, MyError> { pub(crate) async fn media_url(&self, id: Uuid) -> Result<Option<Url>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_url.get(id.as_bytes())? { if let Some(ivec) = inner.media_id_media_url.get(id.as_bytes())? {
Ok(url_from_ivec(ivec)) Ok(url_from_ivec(ivec))
@ -390,7 +394,7 @@ impl Db {
.await .await
} }
pub(crate) async fn media_bytes(&self, id: Uuid) -> Result<Option<Bytes>, MyError> { pub(crate) async fn media_bytes(&self, id: Uuid) -> Result<Option<Bytes>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_bytes.get(id.as_bytes())? { if let Some(ivec) = inner.media_id_media_bytes.get(id.as_bytes())? {
Ok(Some(Bytes::copy_from_slice(&ivec))) Ok(Some(Bytes::copy_from_slice(&ivec)))
@ -401,7 +405,7 @@ impl Db {
.await .await
} }
pub(crate) async fn media_meta(&self, id: Uuid) -> Result<Option<MediaMeta>, MyError> { pub(crate) async fn media_meta(&self, id: Uuid) -> Result<Option<MediaMeta>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_meta.get(id.as_bytes())? { if let Some(ivec) = inner.media_id_media_meta.get(id.as_bytes())? {
let meta = serde_json::from_slice(&ivec)?; let meta = serde_json::from_slice(&ivec)?;
@ -413,16 +417,16 @@ impl Db {
.await .await
} }
pub(crate) async fn blocks(&self) -> Result<Vec<String>, MyError> { pub(crate) async fn blocks(&self) -> Result<Vec<String>, Error> {
self.unblock(|inner| Ok(inner.blocks().collect())).await self.unblock(|inner| Ok(inner.blocks().collect())).await
} }
pub(crate) async fn inboxes(&self) -> Result<Vec<Url>, MyError> { pub(crate) async fn inboxes(&self) -> Result<Vec<Url>, Error> {
self.unblock(|inner| Ok(inner.connected_actors().map(|actor| actor.inbox).collect())) self.unblock(|inner| Ok(inner.connected_actors().map(|actor| actor.inbox).collect()))
.await .await
} }
pub(crate) async fn is_connected(&self, mut id: Url) -> Result<bool, MyError> { pub(crate) async fn is_connected(&self, mut id: Url) -> Result<bool, Error> {
id.set_path(""); id.set_path("");
id.set_query(None); id.set_query(None);
id.set_fragment(None); id.set_fragment(None);
@ -444,7 +448,7 @@ impl Db {
pub(crate) async fn actor_id_from_public_key_id( pub(crate) async fn actor_id_from_public_key_id(
&self, &self,
public_key_id: Url, public_key_id: Url,
) -> Result<Option<Url>, MyError> { ) -> Result<Option<Url>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner if let Some(ivec) = inner
.public_key_id_actor_id .public_key_id_actor_id
@ -458,7 +462,7 @@ impl Db {
.await .await
} }
pub(crate) async fn actor(&self, actor_id: Url) -> Result<Option<Actor>, MyError> { pub(crate) async fn actor(&self, actor_id: Url) -> Result<Option<Actor>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_actor.get(actor_id.as_str().as_bytes())? { if let Some(ivec) = inner.actor_id_actor.get(actor_id.as_str().as_bytes())? {
let actor = serde_json::from_slice(&ivec)?; let actor = serde_json::from_slice(&ivec)?;
@ -470,7 +474,7 @@ impl Db {
.await .await
} }
pub(crate) async fn save_actor(&self, actor: Actor) -> Result<(), MyError> { pub(crate) async fn save_actor(&self, actor: Actor) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
let vec = serde_json::to_vec(&actor)?; let vec = serde_json::to_vec(&actor)?;
@ -486,8 +490,8 @@ impl Db {
.await .await
} }
pub(crate) async fn remove_connection(&self, actor_id: Url) -> Result<(), MyError> { pub(crate) async fn remove_connection(&self, actor_id: Url) -> Result<(), Error> {
log::debug!("Removing Connection: {}", actor_id); tracing::debug!("Removing Connection: {}", actor_id);
self.unblock(move |inner| { self.unblock(move |inner| {
inner inner
.connected_actor_ids .connected_actor_ids
@ -498,8 +502,8 @@ impl Db {
.await .await
} }
pub(crate) async fn add_connection(&self, actor_id: Url) -> Result<(), MyError> { pub(crate) async fn add_connection(&self, actor_id: Url) -> Result<(), Error> {
log::debug!("Adding Connection: {}", actor_id); tracing::debug!("Adding Connection: {}", actor_id);
self.unblock(move |inner| { self.unblock(move |inner| {
inner inner
.connected_actor_ids .connected_actor_ids
@ -510,7 +514,7 @@ impl Db {
.await .await
} }
pub(crate) async fn add_blocks(&self, domains: Vec<String>) -> Result<(), MyError> { pub(crate) async fn add_blocks(&self, domains: Vec<String>) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
for connected in inner.connected_by_domain(&domains) { for connected in inner.connected_by_domain(&domains) {
inner inner
@ -530,7 +534,7 @@ impl Db {
.await .await
} }
pub(crate) async fn remove_blocks(&self, domains: Vec<String>) -> Result<(), MyError> { pub(crate) async fn remove_blocks(&self, domains: Vec<String>) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
for domain in &domains { for domain in &domains {
inner.blocked_domains.remove(domain_key(domain))?; inner.blocked_domains.remove(domain_key(domain))?;
@ -541,7 +545,7 @@ impl Db {
.await .await
} }
pub(crate) async fn add_allows(&self, domains: Vec<String>) -> Result<(), MyError> { pub(crate) async fn add_allows(&self, domains: Vec<String>) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
for domain in &domains { for domain in &domains {
inner inner
@ -554,7 +558,7 @@ impl Db {
.await .await
} }
pub(crate) async fn remove_allows(&self, domains: Vec<String>) -> Result<(), MyError> { pub(crate) async fn remove_allows(&self, domains: Vec<String>) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if inner.restricted_mode { if inner.restricted_mode {
for connected in inner.connected_by_domain(&domains) { for connected in inner.connected_by_domain(&domains) {
@ -573,7 +577,7 @@ impl Db {
.await .await
} }
pub(crate) async fn is_allowed(&self, url: Url) -> Result<bool, MyError> { pub(crate) async fn is_allowed(&self, url: Url) -> Result<bool, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(domain) = url.domain() { if let Some(domain) = url.domain() {
Ok(inner.is_allowed(domain)) Ok(inner.is_allowed(domain))
@ -584,7 +588,7 @@ impl Db {
.await .await
} }
pub(crate) async fn private_key(&self) -> Result<Option<RsaPrivateKey>, MyError> { pub(crate) async fn private_key(&self) -> Result<Option<RsaPrivateKey>, Error> {
self.unblock(|inner| { self.unblock(|inner| {
if let Some(ivec) = inner.settings.get("private-key")? { if let Some(ivec) = inner.settings.get("private-key")? {
let key_str = String::from_utf8_lossy(&ivec); let key_str = String::from_utf8_lossy(&ivec);
@ -601,7 +605,7 @@ impl Db {
pub(crate) async fn update_private_key( pub(crate) async fn update_private_key(
&self, &self,
private_key: &RsaPrivateKey, private_key: &RsaPrivateKey,
) -> Result<(), MyError> { ) -> Result<(), Error> {
let pem_pkcs8 = private_key.to_pkcs8_pem()?; let pem_pkcs8 = private_key.to_pkcs8_pem()?;
self.unblock(move |inner| { self.unblock(move |inner| {

View file

@ -5,11 +5,43 @@ use actix_web::{
HttpResponse, HttpResponse,
}; };
use http_signature_normalization_actix::PrepareSignError; use http_signature_normalization_actix::PrepareSignError;
use log::error; use std::{convert::Infallible, fmt::Debug, io};
use std::{convert::Infallible, fmt::Debug, io::Error}; use tracing::error;
use tracing_error::SpanTrace;
#[derive(Debug)]
pub(crate) struct Error {
context: SpanTrace,
kind: ErrorKind,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.kind)?;
std::fmt::Display::fmt(&self.context, f)
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.kind.source()
}
}
impl<T> From<T> for Error
where
ErrorKind: From<T>,
{
fn from(error: T) -> Self {
Error {
context: SpanTrace::capture(),
kind: error.into(),
}
}
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum MyError { pub(crate) enum ErrorKind {
#[error("Error queueing job, {0}")] #[error("Error queueing job, {0}")]
Queue(anyhow::Error), Queue(anyhow::Error),
@ -23,7 +55,7 @@ pub(crate) enum MyError {
Uri(#[from] ParseError), Uri(#[from] ParseError),
#[error("Couldn't perform IO, {0}")] #[error("Couldn't perform IO, {0}")]
Io(#[from] Error), Io(#[from] io::Error),
#[error("Couldn't sign string, {0}")] #[error("Couldn't sign string, {0}")]
Rsa(rsa::errors::Error), Rsa(rsa::errors::Error),
@ -111,19 +143,23 @@ pub(crate) enum MyError {
#[error("Not trying request due to failed breaker")] #[error("Not trying request due to failed breaker")]
Breaker, Breaker,
#[error("Failed to extract fields from {0}")]
Extract(&'static str)
} }
impl ResponseError for MyError { impl ResponseError for Error {
fn status_code(&self) -> StatusCode { fn status_code(&self) -> StatusCode {
match self { match self.kind {
MyError::NotAllowed(_) | MyError::WrongActor(_) | MyError::BadActor(_, _) => { ErrorKind::NotAllowed(_) | ErrorKind::WrongActor(_) | ErrorKind::BadActor(_, _) => {
StatusCode::FORBIDDEN StatusCode::FORBIDDEN
} }
MyError::NotSubscribed(_) => StatusCode::UNAUTHORIZED, ErrorKind::NotSubscribed(_) => StatusCode::UNAUTHORIZED,
MyError::Duplicate => StatusCode::ACCEPTED, ErrorKind::Duplicate => StatusCode::ACCEPTED,
MyError::Kind(_) | MyError::MissingKind | MyError::MissingId | MyError::ObjectCount => { ErrorKind::Kind(_)
StatusCode::BAD_REQUEST | ErrorKind::MissingKind
} | ErrorKind::MissingId
| ErrorKind::ObjectCount => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR,
} }
} }
@ -133,27 +169,27 @@ impl ResponseError for MyError {
.insert_header(("Content-Type", "application/activity+json")) .insert_header(("Content-Type", "application/activity+json"))
.body( .body(
serde_json::to_string(&serde_json::json!({ serde_json::to_string(&serde_json::json!({
"error": self.to_string(), "error": self.kind.to_string(),
})) }))
.unwrap_or("{}".to_string()), .unwrap_or("{}".to_string()),
) )
} }
} }
impl From<BlockingError> for MyError { impl From<BlockingError> for ErrorKind {
fn from(_: BlockingError) -> Self { fn from(_: BlockingError) -> Self {
MyError::Canceled ErrorKind::Canceled
} }
} }
impl From<Infallible> for MyError { impl From<Infallible> for ErrorKind {
fn from(i: Infallible) -> Self { fn from(i: Infallible) -> Self {
match i {} match i {}
} }
} }
impl From<rsa::errors::Error> for MyError { impl From<rsa::errors::Error> for ErrorKind {
fn from(e: rsa::errors::Error) -> Self { fn from(e: rsa::errors::Error) -> Self {
MyError::Rsa(e) ErrorKind::Rsa(e)
} }
} }

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
db::Actor, db::Actor,
error::MyError, error::Error,
jobs::{ jobs::{
apub::{get_inboxes, prepare_activity}, apub::{get_inboxes, prepare_activity},
DeliverMany, JobState, DeliverMany, JobState,
@ -22,7 +22,8 @@ impl Announce {
Announce { object_id, actor } Announce { object_id, actor }
} }
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { #[tracing::instrument(name = "Announce")]
async fn perform(self, state: JobState) -> Result<(), Error> {
let activity_id = state.config.generate_url(UrlKind::Activity); let activity_id = state.config.generate_url(UrlKind::Activity);
let announce = generate_announce(&state.config, &activity_id, &self.object_id)?; let announce = generate_announce(&state.config, &activity_id, &self.object_id)?;
@ -41,7 +42,7 @@ fn generate_announce(
config: &Config, config: &Config,
activity_id: &Url, activity_id: &Url,
object_id: &Url, object_id: &Url,
) -> Result<AsAnnounce, MyError> { ) -> Result<AsAnnounce, Error> {
let announce = AsAnnounce::new(config.generate_url(UrlKind::Actor), object_id.clone()); let announce = AsAnnounce::new(config.generate_url(UrlKind::Actor), object_id.clone());
prepare_activity( prepare_activity(
@ -58,6 +59,6 @@ impl ActixJob for Announce {
const NAME: &'static str = "relay::jobs::apub::Announce"; const NAME: &'static str = "relay::jobs::apub::Announce";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -2,7 +2,7 @@ use crate::{
apub::AcceptedActivities, apub::AcceptedActivities,
config::{Config, UrlKind}, config::{Config, UrlKind},
db::Actor, db::Actor,
error::MyError, error::{Error, ErrorKind},
jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo}, jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo},
}; };
use activitystreams::{ use activitystreams::{
@ -24,7 +24,8 @@ impl Follow {
Follow { input, actor } Follow { input, actor }
} }
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { #[tracing::instrument(name = "Follow")]
async fn perform(self, state: JobState) -> Result<(), Error> {
let my_id = state.config.generate_url(UrlKind::Actor); let my_id = state.config.generate_url(UrlKind::Actor);
// if following relay directly, not just following 'public', followback // if following relay directly, not just following 'public', followback
@ -42,7 +43,7 @@ impl Follow {
let accept = generate_accept_follow( let accept = generate_accept_follow(
&state.config, &state.config,
&self.actor.id, &self.actor.id,
self.input.id_unchecked().ok_or(MyError::MissingId)?, self.input.id_unchecked().ok_or(ErrorKind::MissingId)?,
&my_id, &my_id,
)?; )?;
@ -61,7 +62,7 @@ impl Follow {
} }
// Generate a type that says "I want to follow you" // Generate a type that says "I want to follow you"
fn generate_follow(config: &Config, actor_id: &Url, my_id: &Url) -> Result<AsFollow, MyError> { fn generate_follow(config: &Config, actor_id: &Url, my_id: &Url) -> Result<AsFollow, Error> {
let follow = AsFollow::new(my_id.clone(), actor_id.clone()); let follow = AsFollow::new(my_id.clone(), actor_id.clone());
prepare_activity( prepare_activity(
@ -77,7 +78,7 @@ fn generate_accept_follow(
actor_id: &Url, actor_id: &Url,
input_id: &Url, input_id: &Url,
my_id: &Url, my_id: &Url,
) -> Result<AsAccept, MyError> { ) -> Result<AsAccept, Error> {
let mut follow = AsFollow::new(actor_id.clone(), my_id.clone()); let mut follow = AsFollow::new(actor_id.clone(), my_id.clone());
follow.set_id(input_id.clone()); follow.set_id(input_id.clone());
@ -98,6 +99,6 @@ impl ActixJob for Follow {
const NAME: &'static str = "relay::jobs::apub::Follow"; const NAME: &'static str = "relay::jobs::apub::Follow";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
apub::AcceptedActivities, apub::AcceptedActivities,
db::Actor, db::Actor,
error::MyError, error::{Error, ErrorKind},
jobs::{apub::get_inboxes, DeliverMany, JobState}, jobs::{apub::get_inboxes, DeliverMany, JobState},
}; };
use activitystreams::prelude::*; use activitystreams::prelude::*;
@ -19,12 +19,13 @@ impl Forward {
Forward { input, actor } Forward { input, actor }
} }
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { #[tracing::instrument(name = "Forward")]
async fn perform(self, state: JobState) -> Result<(), Error> {
let object_id = self let object_id = self
.input .input
.object() .object()
.as_single_id() .as_single_id()
.ok_or(MyError::MissingId)?; .ok_or(ErrorKind::MissingId)?;
let inboxes = get_inboxes(&state.state, &self.actor, object_id).await?; let inboxes = get_inboxes(&state.state, &self.actor, object_id).await?;
@ -43,6 +44,6 @@ impl ActixJob for Forward {
const NAME: &'static str = "relay::jobs::apub::Forward"; const NAME: &'static str = "relay::jobs::apub::Forward";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -2,7 +2,7 @@ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
data::State, data::State,
db::Actor, db::Actor,
error::MyError, error::{Error, ErrorKind},
}; };
use activitystreams::{ use activitystreams::{
activity::{Follow as AsFollow, Undo as AsUndo}, activity::{Follow as AsFollow, Undo as AsUndo},
@ -23,8 +23,8 @@ pub(crate) use self::{
announce::Announce, follow::Follow, forward::Forward, reject::Reject, undo::Undo, announce::Announce, follow::Follow, forward::Forward, reject::Reject, undo::Undo,
}; };
async fn get_inboxes(state: &State, actor: &Actor, object_id: &Url) -> Result<Vec<Url>, MyError> { async fn get_inboxes(state: &State, actor: &Actor, object_id: &Url) -> Result<Vec<Url>, Error> {
let domain = object_id.host().ok_or(MyError::Domain)?.to_string(); let domain = object_id.host().ok_or(ErrorKind::Domain)?.to_string();
state.inboxes_without(&actor.inbox, &domain).await state.inboxes_without(&actor.inbox, &domain).await
} }
@ -33,10 +33,10 @@ fn prepare_activity<T, U, V, Kind>(
mut t: T, mut t: T,
id: impl TryInto<Url, Error = U>, id: impl TryInto<Url, Error = U>,
to: impl TryInto<Url, Error = V>, to: impl TryInto<Url, Error = V>,
) -> Result<T, MyError> ) -> Result<T, Error>
where where
T: ObjectExt<Kind> + BaseExt<Kind>, T: ObjectExt<Kind> + BaseExt<Kind>,
MyError: From<U> + From<V>, Error: From<U> + From<V>,
{ {
t.set_id(id.try_into()?) t.set_id(id.try_into()?)
.set_many_tos(vec![to.try_into()?]) .set_many_tos(vec![to.try_into()?])
@ -45,7 +45,7 @@ where
} }
// Generate a type that says "I want to stop following you" // Generate a type that says "I want to stop following you"
fn generate_undo_follow(config: &Config, actor_id: &Url, my_id: &Url) -> Result<AsUndo, MyError> { fn generate_undo_follow(config: &Config, actor_id: &Url, my_id: &Url) -> Result<AsUndo, Error> {
let mut follow = AsFollow::new(my_id.clone(), actor_id.clone()); let mut follow = AsFollow::new(my_id.clone(), actor_id.clone());
follow.set_id(config.generate_url(UrlKind::Activity)); follow.set_id(config.generate_url(UrlKind::Activity));

View file

@ -1,6 +1,7 @@
use crate::{ use crate::{
config::UrlKind, config::UrlKind,
db::Actor, db::Actor,
error::Error,
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use background_jobs::ActixJob; use background_jobs::ActixJob;
@ -10,7 +11,8 @@ use std::{future::Future, pin::Pin};
pub(crate) struct Reject(pub(crate) Actor); pub(crate) struct Reject(pub(crate) Actor);
impl Reject { impl Reject {
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { #[tracing::instrument(name = "Reject")]
async fn perform(self, state: JobState) -> Result<(), Error> {
state.actors.remove_connection(&self.0).await?; state.actors.remove_connection(&self.0).await?;
let my_id = state.config.generate_url(UrlKind::Actor); let my_id = state.config.generate_url(UrlKind::Actor);
@ -29,6 +31,6 @@ impl ActixJob for Reject {
const NAME: &'static str = "relay::jobs::apub::Reject"; const NAME: &'static str = "relay::jobs::apub::Reject";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -2,6 +2,7 @@ use crate::{
apub::AcceptedActivities, apub::AcceptedActivities,
config::UrlKind, config::UrlKind,
db::Actor, db::Actor,
error::Error,
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use background_jobs::ActixJob; use background_jobs::ActixJob;
@ -18,7 +19,8 @@ impl Undo {
Undo { input, actor } Undo { input, actor }
} }
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { #[tracing::instrument(name = "Undo")]
async fn perform(self, state: JobState) -> Result<(), Error> {
let was_following = state.state.db.is_connected(self.actor.id.clone()).await?; let was_following = state.state.db.is_connected(self.actor.id.clone()).await?;
state.actors.remove_connection(&self.actor).await?; state.actors.remove_connection(&self.actor).await?;
@ -42,6 +44,6 @@ impl ActixJob for Undo {
const NAME: &'static str = "relay::jobs::apub::Undo"; const NAME: &'static str = "relay::jobs::apub::Undo";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -1,5 +1,4 @@
use crate::jobs::JobState; use crate::{error::Error, jobs::JobState};
use anyhow::Error;
use background_jobs::ActixJob; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use uuid::Uuid; use uuid::Uuid;
@ -14,6 +13,7 @@ impl CacheMedia {
CacheMedia { uuid } CacheMedia { uuid }
} }
#[tracing::instrument(name = "Cache media")]
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {
if !state.media.is_outdated(self.uuid).await? { if !state.media.is_outdated(self.uuid).await? {
return Ok(()); return Ok(());
@ -34,11 +34,11 @@ impl CacheMedia {
impl ActixJob for CacheMedia { impl ActixJob for CacheMedia {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::CacheMedia"; const NAME: &'static str = "relay::jobs::CacheMedia";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -1,6 +1,9 @@
use crate::{apub::AcceptedActors, jobs::JobState}; use crate::{
apub::AcceptedActors,
error::{Error, ErrorKind},
jobs::JobState,
};
use activitystreams::{object::Image, prelude::*, url::Url}; use activitystreams::{object::Image, prelude::*, url::Url};
use anyhow::Error;
use background_jobs::ActixJob; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
@ -33,8 +36,8 @@ impl QueryContact {
.fetch::<AcceptedActors>(self.contact_id.as_str()) .fetch::<AcceptedActors>(self.contact_id.as_str())
.await?; .await?;
let (username, display_name, url, avatar) = to_contact(contact) let (username, display_name, url, avatar) =
.ok_or_else(|| anyhow::anyhow!("Failed to extract fields from contact"))?; to_contact(contact).ok_or_else(|| ErrorKind::Extract("contact"))?;
state state
.node_cache .node_cache
@ -63,12 +66,12 @@ fn to_contact(contact: AcceptedActors) -> Option<(String, String, Url, Url)> {
impl ActixJob for QueryContact { impl ActixJob for QueryContact {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::QueryContact"; const NAME: &'static str = "relay::jobs::QueryContact";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -1,6 +1,5 @@
use crate::{error::MyError, jobs::JobState}; use crate::{error::Error, jobs::JobState};
use activitystreams::url::Url; use activitystreams::url::Url;
use anyhow::Error;
use background_jobs::{ActixJob, Backoff}; use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
@ -11,7 +10,7 @@ pub(crate) struct Deliver {
} }
impl Deliver { impl Deliver {
pub(crate) fn new<T>(to: Url, data: T) -> Result<Self, MyError> pub(crate) fn new<T>(to: Url, data: T) -> Result<Self, Error>
where where
T: serde::ser::Serialize, T: serde::ser::Serialize,
{ {
@ -20,20 +19,22 @@ impl Deliver {
data: serde_json::to_value(data)?, data: serde_json::to_value(data)?,
}) })
} }
#[tracing::instrument(name = "Deliver")]
async fn permform(self, state: JobState) -> Result<(), Error> {
state.requests.deliver(self.to, &self.data).await?;
Ok(())
}
} }
impl ActixJob for Deliver { impl ActixJob for Deliver {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::Deliver"; const NAME: &'static str = "relay::jobs::Deliver";
const BACKOFF: Backoff = Backoff::Exponential(8); const BACKOFF: Backoff = Backoff::Exponential(8);
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { Box::pin(async move { self.permform(state).await.map_err(Into::into) })
state.requests.deliver(self.to, &self.data).await?;
Ok(())
})
} }
} }

View file

@ -1,11 +1,10 @@
use crate::{ use crate::{
error::MyError, error::Error,
jobs::{Deliver, JobState}, jobs::{Deliver, JobState},
}; };
use activitystreams::url::Url; use activitystreams::url::Url;
use anyhow::Error;
use background_jobs::ActixJob; use background_jobs::ActixJob;
use futures::future::{ready, Ready}; use std::future::{ready, Ready};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct DeliverMany { pub(crate) struct DeliverMany {
@ -14,7 +13,7 @@ pub(crate) struct DeliverMany {
} }
impl DeliverMany { impl DeliverMany {
pub(crate) fn new<T>(to: Vec<Url>, data: T) -> Result<Self, MyError> pub(crate) fn new<T>(to: Vec<Url>, data: T) -> Result<Self, Error>
where where
T: serde::ser::Serialize, T: serde::ser::Serialize,
{ {
@ -24,6 +23,7 @@ impl DeliverMany {
}) })
} }
#[tracing::instrument(name = "Deliver many")]
fn perform(self, state: JobState) -> Result<(), Error> { fn perform(self, state: JobState) -> Result<(), Error> {
for inbox in self.to { for inbox in self.to {
state state
@ -37,11 +37,11 @@ impl DeliverMany {
impl ActixJob for DeliverMany { impl ActixJob for DeliverMany {
type State = JobState; type State = JobState;
type Future = Ready<Result<(), Error>>; type Future = Ready<Result<(), anyhow::Error>>;
const NAME: &'static str = "relay::jobs::DeliverMany"; const NAME: &'static str = "relay::jobs::DeliverMany";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
ready(self.perform(state)) ready(self.perform(state).map_err(Into::into))
} }
} }

View file

@ -1,9 +1,9 @@
use crate::{ use crate::{
config::UrlKind, config::UrlKind,
error::Error,
jobs::{cache_media::CacheMedia, JobState}, jobs::{cache_media::CacheMedia, JobState},
}; };
use activitystreams::url::Url; use activitystreams::url::Url;
use anyhow::Error;
use background_jobs::ActixJob; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
@ -17,6 +17,7 @@ impl QueryInstance {
QueryInstance { actor_id } QueryInstance { actor_id }
} }
#[tracing::instrument(name = "Query instance")]
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {
let contact_outdated = state let contact_outdated = state
.node_cache .node_cache
@ -91,12 +92,12 @@ impl QueryInstance {
impl ActixJob for QueryInstance { impl ActixJob for QueryInstance {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::QueryInstance"; const NAME: &'static str = "relay::jobs::QueryInstance";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -16,7 +16,7 @@ use crate::{
config::Config, config::Config,
data::{ActorCache, MediaCache, NodeCache, State}, data::{ActorCache, MediaCache, NodeCache, State},
db::Db, db::Db,
error::MyError, error::{Error, ErrorKind},
jobs::process_listeners::Listeners, jobs::process_listeners::Listeners,
requests::Requests, requests::Requests,
}; };
@ -67,7 +67,7 @@ pub(crate) fn create_workers(
.start(remote_handle); .start(remote_handle);
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub(crate) struct JobState { pub(crate) struct JobState {
db: Db, db: Db,
requests: Requests, requests: Requests,
@ -84,6 +84,14 @@ pub(crate) struct JobServer {
remote: QueueHandle, remote: QueueHandle,
} }
impl std::fmt::Debug for JobServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobServer")
.field("queue_handle", &"QueueHandle")
.finish()
}
}
impl JobState { impl JobState {
fn new( fn new(
db: Db, db: Db,
@ -113,10 +121,13 @@ impl JobServer {
} }
} }
pub(crate) fn queue<J>(&self, job: J) -> Result<(), MyError> pub(crate) fn queue<J>(&self, job: J) -> Result<(), Error>
where where
J: Job, J: Job,
{ {
self.remote.queue(job).map_err(MyError::Queue) self.remote
.queue(job)
.map_err(ErrorKind::Queue)
.map_err(Into::into)
} }
} }

View file

@ -1,6 +1,8 @@
use crate::jobs::{JobState, QueryContact}; use crate::{
error::Error,
jobs::{JobState, QueryContact},
};
use activitystreams::url::Url; use activitystreams::url::Url;
use anyhow::Error;
use background_jobs::ActixJob; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
@ -14,6 +16,7 @@ impl QueryNodeinfo {
QueryNodeinfo { actor_id } QueryNodeinfo { actor_id }
} }
#[tracing::instrument(name = "Query node info")]
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {
if !state if !state
.node_cache .node_cache
@ -65,12 +68,12 @@ impl QueryNodeinfo {
impl ActixJob for QueryNodeinfo { impl ActixJob for QueryNodeinfo {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::QueryNodeinfo"; const NAME: &'static str = "relay::jobs::QueryNodeinfo";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -1,5 +1,7 @@
use crate::jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState}; use crate::{
use anyhow::Error; error::Error,
jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState},
};
use background_jobs::ActixJob; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
@ -7,6 +9,7 @@ use std::{future::Future, pin::Pin};
pub(crate) struct Listeners; pub(crate) struct Listeners;
impl Listeners { impl Listeners {
#[tracing::instrument(name = "Spawn query instances")]
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {
for actor_id in state.state.db.connected_ids().await? { for actor_id in state.state.db.connected_ids().await? {
state state
@ -21,11 +24,11 @@ impl Listeners {
impl ActixJob for Listeners { impl ActixJob for Listeners {
type State = JobState; type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::Listeners"; const NAME: &'static str = "relay::jobs::Listeners";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View file

@ -1,7 +1,8 @@
use actix_web::{ use actix_web::{web, App, HttpServer};
middleware::{Compress, Logger}, use tracing_actix_web::TracingLogger;
web, App, HttpServer, use tracing_error::ErrorLayer;
}; use tracing_log::LogTracer;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, EnvFilter};
mod apub; mod apub;
mod args; mod args;
@ -28,23 +29,23 @@ use self::{
async fn main() -> Result<(), anyhow::Error> { async fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
LogTracer::init()?;
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let format_layer = tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.pretty();
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter)
.with(ErrorLayer::default())
.with(format_layer);
tracing::subscriber::set_global_default(subscriber)?;
let config = Config::build()?; let config = Config::build()?;
if config.debug() {
std::env::set_var(
"RUST_LOG",
"debug,h2=info,trust_dns_resolver=info,trust_dns_proto=info,rustls=info,html5ever=info",
)
} else {
std::env::set_var("RUST_LOG", "info")
}
if config.pretty_log() {
pretty_env_logger::init();
} else {
env_logger::init();
}
let db = Db::build(&config)?; let db = Db::build(&config)?;
let args = Args::new(); let args = Args::new();
@ -77,8 +78,7 @@ async fn main() -> Result<(), anyhow::Error> {
let bind_address = config.bind_address(); let bind_address = config.bind_address();
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.wrap(Compress::default()) .wrap(TracingLogger::default())
.wrap(Logger::default())
.app_data(web::Data::new(db.clone())) .app_data(web::Data::new(db.clone()))
.app_data(web::Data::new(state.clone())) .app_data(web::Data::new(state.clone()))
.app_data(web::Data::new(state.requests())) .app_data(web::Data::new(state.requests()))

View file

@ -1,15 +1,18 @@
use actix_web::{ use actix_web::{
dev::{Payload, Service, ServiceRequest, Transform}, dev::{Payload, Service, ServiceRequest, Transform},
http::{Method, StatusCode}, http::Method,
web::BytesMut, web::BytesMut,
HttpMessage, HttpResponse, ResponseError, HttpMessage,
}; };
use futures::{ use futures_util::{
future::{ok, LocalBoxFuture, Ready, TryFutureExt}, future::{LocalBoxFuture, TryFutureExt},
stream::{once, TryStreamExt}, stream::{once, TryStreamExt},
}; };
use log::{error, info}; use std::{
use std::task::{Context, Poll}; future::{ready, Ready},
task::{Context, Poll},
};
use tracing::info;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct DebugPayload(pub bool); pub(crate) struct DebugPayload(pub bool);
@ -18,20 +21,6 @@ pub(crate) struct DebugPayload(pub bool);
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct DebugPayloadMiddleware<S>(bool, S); pub(crate) struct DebugPayloadMiddleware<S>(bool, S);
#[derive(Clone, Debug, thiserror::Error)]
#[error("Failed to read payload")]
pub(crate) struct DebugError;
impl ResponseError for DebugError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
fn error_response(&self) -> HttpResponse {
HttpResponse::new(self.status_code())
}
}
impl<S> Transform<S, ServiceRequest> for DebugPayload impl<S> Transform<S, ServiceRequest> for DebugPayload
where where
S: Service<ServiceRequest, Error = actix_web::Error>, S: Service<ServiceRequest, Error = actix_web::Error>,
@ -45,7 +34,7 @@ where
type Future = Ready<Result<Self::Transform, Self::InitError>>; type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future { fn new_transform(&self, service: S) -> Self::Future {
ok(DebugPayloadMiddleware(self.0, service)) ready(Ok(DebugPayloadMiddleware(self.0, service)))
} }
} }

View file

@ -1,28 +1,28 @@
use crate::{ use crate::{
apub::AcceptedActors, apub::AcceptedActors,
data::{ActorCache, State}, data::{ActorCache, State},
error::MyError, error::{Error, ErrorKind},
requests::Requests, requests::Requests,
}; };
use activitystreams::{base::BaseExt, uri, url::Url}; use activitystreams::{base::BaseExt, uri, url::Url};
use actix_web::web; use actix_web::web;
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm};
use log::error;
use rsa::{hash::Hash, padding::PaddingScheme, pkcs8::FromPublicKey, PublicKey, RsaPublicKey}; use rsa::{hash::Hash, padding::PaddingScheme, pkcs8::FromPublicKey, PublicKey, RsaPublicKey};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone)] #[derive(Clone, Debug)]
pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State); pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State);
impl MyVerify { impl MyVerify {
#[tracing::instrument("Verify signature")]
async fn verify( async fn verify(
&self, &self,
algorithm: Option<Algorithm>, algorithm: Option<Algorithm>,
key_id: String, key_id: String,
signature: String, signature: String,
signing_string: String, signing_string: String,
) -> Result<bool, MyError> { ) -> Result<bool, Error> {
let public_key_id = uri!(key_id); let public_key_id = uri!(key_id);
let actor_id = if let Some(mut actor_id) = self let actor_id = if let Some(mut actor_id) = self
@ -32,7 +32,7 @@ impl MyVerify {
.await? .await?
{ {
if !self.2.db.is_allowed(actor_id.clone()).await? { if !self.2.db.is_allowed(actor_id.clone()).await? {
return Err(MyError::NotAllowed(key_id)); return Err(ErrorKind::NotAllowed(key_id).into());
} }
actor_id.set_fragment(None); actor_id.set_fragment(None);
@ -44,7 +44,7 @@ impl MyVerify {
Some(Algorithm::Hs2019) => (), Some(Algorithm::Hs2019) => (),
Some(Algorithm::Deprecated(DeprecatedAlgorithm::RsaSha256)) => (), Some(Algorithm::Deprecated(DeprecatedAlgorithm::RsaSha256)) => (),
Some(other) => { Some(other) => {
return Err(MyError::Algorithm(other.to_string())); return Err(ErrorKind::Algorithm(other.to_string()).into());
} }
None => (), None => (),
}; };
@ -65,7 +65,7 @@ impl MyVerify {
.fetch::<PublicKeyResponse>(public_key_id.as_str()) .fetch::<PublicKeyResponse>(public_key_id.as_str())
.await? .await?
.actor_id() .actor_id()
.ok_or_else(|| MyError::MissingId)? .ok_or_else(|| ErrorKind::MissingId)?
}; };
// Previously we verified the sig from an actor's local cache // Previously we verified the sig from an actor's local cache
@ -106,7 +106,7 @@ async fn do_verify(
public_key: &str, public_key: &str,
signature: String, signature: String,
signing_string: String, signing_string: String,
) -> Result<(), MyError> { ) -> Result<(), Error> {
let public_key = RsaPublicKey::from_public_key_pem(public_key)?; let public_key = RsaPublicKey::from_public_key_pem(public_key)?;
web::block(move || { web::block(move || {
@ -121,7 +121,7 @@ async fn do_verify(
&decoded, &decoded,
)?; )?;
Ok(()) as Result<(), MyError> Ok(()) as Result<(), Error>
}) })
.await??; .await??;
@ -129,7 +129,7 @@ async fn do_verify(
} }
impl SignatureVerify for MyVerify { impl SignatureVerify for MyVerify {
type Error = MyError; type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<bool, Self::Error>>>>; type Future = Pin<Box<dyn Future<Output = Result<bool, Self::Error>>>>;
fn signature_verify( fn signature_verify(
@ -144,10 +144,6 @@ impl SignatureVerify for MyVerify {
Box::pin(async move { Box::pin(async move {
this.verify(algorithm, key_id, signature, signing_string) this.verify(algorithm, key_id, signature, signing_string)
.await .await
.map_err(|e| {
error!("Failed to verify, {}", e);
e
})
}) })
} }
} }

View file

@ -4,8 +4,8 @@ use crate::{
}; };
use actix_web::web::Data; use actix_web::web::Data;
use actix_webfinger::{Resolver, Webfinger}; use actix_webfinger::{Resolver, Webfinger};
use futures_util::future::LocalBoxFuture;
use rsa_magic_public_key::AsMagicPublicKey; use rsa_magic_public_key::AsMagicPublicKey;
use std::{future::Future, pin::Pin};
pub(crate) struct RelayResolver; pub(crate) struct RelayResolver;
@ -13,8 +13,6 @@ pub(crate) struct RelayResolver;
#[error("Error resolving webfinger data")] #[error("Error resolving webfinger data")]
pub(crate) struct RelayError; pub(crate) struct RelayError;
type FutResult<T, E> = dyn Future<Output = Result<T, E>>;
impl Resolver for RelayResolver { impl Resolver for RelayResolver {
type State = (Data<State>, Data<Config>); type State = (Data<State>, Data<Config>);
type Error = RelayError; type Error = RelayError;
@ -23,7 +21,7 @@ impl Resolver for RelayResolver {
account: &str, account: &str,
domain: &str, domain: &str,
(state, config): Self::State, (state, config): Self::State,
) -> Pin<Box<FutResult<Option<Webfinger>, Self::Error>>> { ) -> LocalBoxFuture<'static, Result<Option<Webfinger>, Self::Error>> {
let domain = domain.to_owned(); let domain = domain.to_owned();
let account = account.to_owned(); let account = account.to_owned();

View file

@ -1,4 +1,4 @@
use crate::error::MyError; use crate::error::{Error, ErrorKind};
use activitystreams::url::Url; use activitystreams::url::Url;
use actix_web::{http::header::Date, web::Bytes}; use actix_web::{http::header::Date, web::Bytes};
use async_mutex::Mutex; use async_mutex::Mutex;
@ -6,7 +6,6 @@ use async_rwlock::RwLock;
use awc::Client; use awc::Client;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use http_signature_normalization_actix::prelude::*; use http_signature_normalization_actix::prelude::*;
use log::{debug, info, warn};
use rsa::{hash::Hash, padding::PaddingScheme, RsaPrivateKey}; use rsa::{hash::Hash, padding::PaddingScheme, RsaPrivateKey};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{ use std::{
@ -19,12 +18,19 @@ use std::{
}, },
time::SystemTime, time::SystemTime,
}; };
use tracing::{debug, info, warn};
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Breakers { pub(crate) struct Breakers {
inner: Arc<RwLock<HashMap<String, Arc<Mutex<Breaker>>>>>, inner: Arc<RwLock<HashMap<String, Arc<Mutex<Breaker>>>>>,
} }
impl std::fmt::Debug for Breakers {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Breakers").finish()
}
}
impl Breakers { impl Breakers {
async fn should_try(&self, url: &Url) -> bool { async fn should_try(&self, url: &Url) -> bool {
if let Some(domain) = url.domain() { if let Some(domain) = url.domain() {
@ -97,6 +103,7 @@ impl Default for Breakers {
} }
} }
#[derive(Debug)]
struct Breaker { struct Breaker {
failures: usize, failures: usize,
last_attempt: DateTime<Utc>, last_attempt: DateTime<Utc>,
@ -153,6 +160,21 @@ pub(crate) struct Requests {
breakers: Breakers, breakers: Breakers,
} }
impl std::fmt::Debug for Requests {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Requests")
.field("client", &"Client")
.field("consecutive_errors", &"AtomicUsize")
.field("error_limit", &self.error_limit)
.field("key_id", &self.key_id)
.field("user_agent", &self.user_agent)
.field("private_key", &"[redacted]")
.field("config", &self.config)
.field("breakers", &self.breakers)
.finish()
}
}
impl Requests { impl Requests {
pub(crate) fn new( pub(crate) fn new(
key_id: String, key_id: String,
@ -191,28 +213,28 @@ impl Requests {
self.consecutive_errors.swap(0, Ordering::Relaxed); self.consecutive_errors.swap(0, Ordering::Relaxed);
} }
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, MyError> pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
self.do_fetch(url, "application/json").await self.do_fetch(url, "application/json").await
} }
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, MyError> pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
self.do_fetch(url, "application/activity+json").await self.do_fetch(url, "application/activity+json").await
} }
async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, MyError> async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
let parsed_url = url.parse::<Url>()?; let parsed_url = url.parse::<Url>()?;
if !self.breakers.should_try(&parsed_url).await { if !self.breakers.should_try(&parsed_url).await {
return Err(MyError::Breaker); return Err(ErrorKind::Breaker.into());
} }
let signer = self.signer(); let signer = self.signer();
@ -236,7 +258,7 @@ impl Requests {
self.breakers.fail(&parsed_url).await; self.breakers.fail(&parsed_url).await;
} }
let mut res = res.map_err(|e| MyError::SendRequest(url.to_string(), e.to_string()))?; let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?;
self.reset_err(); self.reset_err();
@ -251,7 +273,7 @@ impl Requests {
self.breakers.fail(&parsed_url).await; self.breakers.fail(&parsed_url).await;
return Err(MyError::Status(url.to_string(), res.status())); return Err(ErrorKind::Status(url.to_string(), res.status()).into());
} }
self.breakers.succeed(&parsed_url).await; self.breakers.succeed(&parsed_url).await;
@ -259,16 +281,16 @@ impl Requests {
let body = res let body = res
.body() .body()
.await .await
.map_err(|e| MyError::ReceiveResponse(url.to_string(), e.to_string()))?; .map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
Ok(serde_json::from_slice(body.as_ref())?) Ok(serde_json::from_slice(body.as_ref())?)
} }
pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), MyError> { pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> {
let parsed_url = url.parse::<Url>()?; let parsed_url = url.parse::<Url>()?;
if !self.breakers.should_try(&parsed_url).await { if !self.breakers.should_try(&parsed_url).await {
return Err(MyError::Breaker); return Err(ErrorKind::Breaker.into());
} }
info!("Fetching bytes for {}", url); info!("Fetching bytes for {}", url);
@ -293,7 +315,7 @@ impl Requests {
self.count_err(); self.count_err();
} }
let mut res = res.map_err(|e| MyError::SendRequest(url.to_string(), e.to_string()))?; let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?;
self.reset_err(); self.reset_err();
@ -301,10 +323,10 @@ impl Requests {
if let Ok(s) = content_type.to_str() { if let Ok(s) = content_type.to_str() {
s.to_owned() s.to_owned()
} else { } else {
return Err(MyError::ContentType); return Err(ErrorKind::ContentType.into());
} }
} else { } else {
return Err(MyError::ContentType); return Err(ErrorKind::ContentType.into());
}; };
if !res.status().is_success() { if !res.status().is_success() {
@ -318,14 +340,14 @@ impl Requests {
self.breakers.fail(&parsed_url).await; self.breakers.fail(&parsed_url).await;
return Err(MyError::Status(url.to_string(), res.status())); return Err(ErrorKind::Status(url.to_string(), res.status()).into());
} }
self.breakers.succeed(&parsed_url).await; self.breakers.succeed(&parsed_url).await;
let bytes = match res.body().limit(1024 * 1024 * 4).await { let bytes = match res.body().limit(1024 * 1024 * 4).await {
Err(e) => { Err(e) => {
return Err(MyError::ReceiveResponse(url.to_string(), e.to_string())); return Err(ErrorKind::ReceiveResponse(url.to_string(), e.to_string()).into());
} }
Ok(bytes) => bytes, Ok(bytes) => bytes,
}; };
@ -333,12 +355,12 @@ impl Requests {
Ok((content_type, bytes)) Ok((content_type, bytes))
} }
pub(crate) async fn deliver<T>(&self, inbox: Url, item: &T) -> Result<(), MyError> pub(crate) async fn deliver<T>(&self, inbox: Url, item: &T) -> Result<(), Error>
where where
T: serde::ser::Serialize, T: serde::ser::Serialize,
{ {
if !self.breakers.should_try(&inbox).await { if !self.breakers.should_try(&inbox).await {
return Err(MyError::Breaker); return Err(ErrorKind::Breaker.into());
} }
let signer = self.signer(); let signer = self.signer();
@ -366,7 +388,7 @@ impl Requests {
self.breakers.fail(&inbox).await; self.breakers.fail(&inbox).await;
} }
let mut res = res.map_err(|e| MyError::SendRequest(inbox.to_string(), e.to_string()))?; let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?;
self.reset_err(); self.reset_err();
@ -380,7 +402,7 @@ impl Requests {
} }
self.breakers.fail(&inbox).await; self.breakers.fail(&inbox).await;
return Err(MyError::Status(inbox.to_string(), res.status())); return Err(ErrorKind::Status(inbox.to_string(), res.status()).into());
} }
self.breakers.succeed(&inbox).await; self.breakers.succeed(&inbox).await;
@ -400,7 +422,7 @@ struct Signer {
} }
impl Signer { impl Signer {
fn sign(&self, signing_string: &str) -> Result<String, MyError> { fn sign(&self, signing_string: &str) -> Result<String, Error> {
let hashed = Sha256::digest(signing_string.as_bytes()); let hashed = Sha256::digest(signing_string.as_bytes());
let bytes = self.private_key.sign( let bytes = self.private_key.sign(
PaddingScheme::PKCS1v15Sign { PaddingScheme::PKCS1v15Sign {

View file

@ -2,7 +2,7 @@ use crate::{
apub::{PublicKey, PublicKeyInner}, apub::{PublicKey, PublicKeyInner},
config::{Config, UrlKind}, config::{Config, UrlKind},
data::State, data::State,
error::MyError, error::Error,
routes::ok, routes::ok,
}; };
use activitystreams::{ use activitystreams::{
@ -15,10 +15,11 @@ use activitystreams_ext::Ext1;
use actix_web::{web, Responder}; use actix_web::{web, Responder};
use rsa::pkcs8::ToPublicKey; use rsa::pkcs8::ToPublicKey;
#[tracing::instrument(name = "Actor")]
pub(crate) async fn route( pub(crate) async fn route(
state: web::Data<State>, state: web::Data<State>,
config: web::Data<Config>, config: web::Data<Config>,
) -> Result<impl Responder, MyError> { ) -> Result<impl Responder, Error> {
let mut application = Ext1::new( let mut application = Ext1::new(
ApActor::new(config.generate_url(UrlKind::Inbox), Application::new()), ApActor::new(config.generate_url(UrlKind::Inbox), Application::new()),
PublicKey { PublicKey {

View file

@ -3,7 +3,7 @@ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
data::{ActorCache, State}, data::{ActorCache, State},
db::Actor, db::Actor,
error::MyError, error::{Error, ErrorKind},
jobs::apub::{Announce, Follow, Forward, Reject, Undo}, jobs::apub::{Announce, Follow, Forward, Reject, Undo},
jobs::JobServer, jobs::JobServer,
requests::Requests, requests::Requests,
@ -14,8 +14,9 @@ use activitystreams::{
}; };
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
use log::error; use tracing::error;
#[tracing::instrument(name = "Inbox")]
pub(crate) async fn route( pub(crate) async fn route(
state: web::Data<State>, state: web::Data<State>,
actors: web::Data<ActorCache>, actors: web::Data<ActorCache>,
@ -24,12 +25,12 @@ pub(crate) async fn route(
jobs: web::Data<JobServer>, jobs: web::Data<JobServer>,
input: web::Json<AcceptedActivities>, input: web::Json<AcceptedActivities>,
verified: Option<(SignatureVerified, DigestVerified)>, verified: Option<(SignatureVerified, DigestVerified)>,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, Error> {
let input = input.into_inner(); let input = input.into_inner();
let actor = actors let actor = actors
.get( .get(
input.actor()?.as_single_id().ok_or(MyError::MissingId)?, input.actor()?.as_single_id().ok_or(ErrorKind::MissingId)?,
&client, &client,
) )
.await? .await?
@ -39,28 +40,29 @@ pub(crate) async fn route(
let is_connected = state.db.is_connected(actor.id.clone()).await?; let is_connected = state.db.is_connected(actor.id.clone()).await?;
if !is_allowed { if !is_allowed {
return Err(MyError::NotAllowed(actor.id.to_string())); return Err(ErrorKind::NotAllowed(actor.id.to_string()).into());
} }
if !is_connected && !valid_without_listener(&input)? { if !is_connected && !valid_without_listener(&input)? {
return Err(MyError::NotSubscribed(actor.id.to_string())); return Err(ErrorKind::NotSubscribed(actor.id.to_string()).into());
} }
if config.validate_signatures() && verified.is_none() { if config.validate_signatures() && verified.is_none() {
return Err(MyError::NoSignature(actor.public_key_id.to_string())); return Err(ErrorKind::NoSignature(actor.public_key_id.to_string()).into());
} else if config.validate_signatures() { } else if config.validate_signatures() {
if let Some((verified, _)) = verified { if let Some((verified, _)) = verified {
if actor.public_key_id.as_str() != verified.key_id() { if actor.public_key_id.as_str() != verified.key_id() {
error!("Bad actor, more info: {:?}", input); error!("Bad actor, more info: {:?}", input);
return Err(MyError::BadActor( return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(), actor.public_key_id.to_string(),
verified.key_id().to_owned(), verified.key_id().to_owned(),
)); )
.into());
} }
} }
} }
match input.kind().ok_or(MyError::MissingKind)? { match input.kind().ok_or(ErrorKind::MissingKind)? {
ValidTypes::Accept => handle_accept(&config, input).await?, ValidTypes::Accept => handle_accept(&config, input).await?,
ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?, ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?,
ValidTypes::Announce | ValidTypes::Create => { ValidTypes::Announce | ValidTypes::Create => {
@ -74,7 +76,7 @@ pub(crate) async fn route(
Ok(accepted(serde_json::json!({}))) Ok(accepted(serde_json::json!({})))
} }
fn valid_without_listener(input: &AcceptedActivities) -> Result<bool, MyError> { fn valid_without_listener(input: &AcceptedActivities) -> Result<bool, Error> {
match input.kind() { match input.kind() {
Some(ValidTypes::Follow) => Ok(true), Some(ValidTypes::Follow) => Ok(true),
Some(ValidTypes::Undo) => Ok(single_object(input.object())?.is_kind("Follow")), Some(ValidTypes::Undo) => Ok(single_object(input.object())?.is_kind("Follow")),
@ -82,32 +84,32 @@ fn valid_without_listener(input: &AcceptedActivities) -> Result<bool, MyError> {
} }
} }
fn kind_str(base: &AnyBase) -> Result<&str, MyError> { fn kind_str(base: &AnyBase) -> Result<&str, Error> {
base.kind_str().ok_or(MyError::MissingKind) base.kind_str()
.ok_or(ErrorKind::MissingKind)
.map_err(Into::into)
} }
fn id_string(id: Option<&Url>) -> Result<String, MyError> { fn id_string(id: Option<&Url>) -> Result<String, Error> {
id.map(|s| s.to_string()).ok_or(MyError::MissingId) id.map(|s| s.to_string())
.ok_or(ErrorKind::MissingId)
.map_err(Into::into)
} }
fn single_object(o: &OneOrMany<AnyBase>) -> Result<&AnyBase, MyError> { fn single_object(o: &OneOrMany<AnyBase>) -> Result<&AnyBase, Error> {
o.as_one().ok_or(MyError::ObjectCount) o.as_one().ok_or(ErrorKind::ObjectCount).map_err(Into::into)
} }
async fn handle_accept(config: &Config, input: AcceptedActivities) -> Result<(), MyError> { async fn handle_accept(config: &Config, input: AcceptedActivities) -> Result<(), Error> {
let base = single_object(input.object())?.clone(); let base = single_object(input.object())?.clone();
let follow = if let Some(follow) = activity::Follow::from_any_base(base)? { let follow = if let Some(follow) = activity::Follow::from_any_base(base)? {
follow follow
} else { } else {
return Err(MyError::Kind( return Err(ErrorKind::Kind(kind_str(single_object(input.object())?)?.to_owned()).into());
kind_str(single_object(input.object())?)?.to_owned(),
));
}; };
if !follow.actor_is(&config.generate_url(UrlKind::Actor)) { if !follow.actor_is(&config.generate_url(UrlKind::Actor)) {
return Err(MyError::WrongActor(id_string( return Err(ErrorKind::WrongActor(id_string(follow.actor()?.as_single_id())?).into());
follow.actor()?.as_single_id(),
)?));
} }
Ok(()) Ok(())
@ -118,20 +120,16 @@ async fn handle_reject(
jobs: &JobServer, jobs: &JobServer,
input: AcceptedActivities, input: AcceptedActivities,
actor: Actor, actor: Actor,
) -> Result<(), MyError> { ) -> Result<(), Error> {
let base = single_object(input.object())?.clone(); let base = single_object(input.object())?.clone();
let follow = if let Some(follow) = activity::Follow::from_any_base(base)? { let follow = if let Some(follow) = activity::Follow::from_any_base(base)? {
follow follow
} else { } else {
return Err(MyError::Kind( return Err(ErrorKind::Kind(kind_str(single_object(input.object())?)?.to_owned()).into());
kind_str(single_object(input.object())?)?.to_owned(),
));
}; };
if !follow.actor_is(&config.generate_url(UrlKind::Actor)) { if !follow.actor_is(&config.generate_url(UrlKind::Actor)) {
return Err(MyError::WrongActor(id_string( return Err(ErrorKind::WrongActor(id_string(follow.actor()?.as_single_id())?).into());
follow.actor()?.as_single_id(),
)?));
} }
jobs.queue(Reject(actor))?; jobs.queue(Reject(actor))?;
@ -145,26 +143,26 @@ async fn handle_undo(
input: AcceptedActivities, input: AcceptedActivities,
actor: Actor, actor: Actor,
is_listener: bool, is_listener: bool,
) -> Result<(), MyError> { ) -> Result<(), Error> {
let any_base = single_object(input.object())?.clone(); let any_base = single_object(input.object())?.clone();
let undone_object = let undone_object =
AcceptedUndoObjects::from_any_base(any_base)?.ok_or(MyError::ObjectFormat)?; AcceptedUndoObjects::from_any_base(any_base)?.ok_or(ErrorKind::ObjectFormat)?;
if !undone_object.is_kind(&UndoTypes::Follow) { if !undone_object.is_kind(&UndoTypes::Follow) {
if is_listener { if is_listener {
jobs.queue(Forward::new(input, actor))?; jobs.queue(Forward::new(input, actor))?;
return Ok(()); return Ok(());
} else { } else {
return Err(MyError::NotSubscribed(actor.id.to_string())); return Err(ErrorKind::NotSubscribed(actor.id.to_string()).into());
} }
} }
let my_id: Url = config.generate_url(UrlKind::Actor); let my_id: Url = config.generate_url(UrlKind::Actor);
if !undone_object.object_is(&my_id) && !undone_object.object_is(&public()) { if !undone_object.object_is(&my_id) && !undone_object.object_is(&public()) {
return Err(MyError::WrongActor(id_string( return Err(
undone_object.object().as_single_id(), ErrorKind::WrongActor(id_string(undone_object.object().as_single_id())?).into(),
)?)); );
} }
if !is_listener { if !is_listener {
@ -179,7 +177,7 @@ async fn handle_forward(
jobs: &JobServer, jobs: &JobServer,
input: AcceptedActivities, input: AcceptedActivities,
actor: Actor, actor: Actor,
) -> Result<(), MyError> { ) -> Result<(), Error> {
jobs.queue(Forward::new(input, actor))?; jobs.queue(Forward::new(input, actor))?;
Ok(()) Ok(())
@ -190,11 +188,11 @@ async fn handle_announce(
jobs: &JobServer, jobs: &JobServer,
input: AcceptedActivities, input: AcceptedActivities,
actor: Actor, actor: Actor,
) -> Result<(), MyError> { ) -> Result<(), Error> {
let object_id = input.object().as_single_id().ok_or(MyError::MissingId)?; let object_id = input.object().as_single_id().ok_or(ErrorKind::MissingId)?;
if state.is_cached(object_id).await { if state.is_cached(object_id).await {
return Err(MyError::Duplicate); return Err(ErrorKind::Duplicate.into());
} }
jobs.queue(Announce::new(object_id.to_owned(), actor))?; jobs.queue(Announce::new(object_id.to_owned(), actor))?;
@ -207,13 +205,11 @@ async fn handle_follow(
jobs: &JobServer, jobs: &JobServer,
input: AcceptedActivities, input: AcceptedActivities,
actor: Actor, actor: Actor,
) -> Result<(), MyError> { ) -> Result<(), Error> {
let my_id: Url = config.generate_url(UrlKind::Actor); let my_id: Url = config.generate_url(UrlKind::Actor);
if !input.object_is(&my_id) && !input.object_is(&public()) { if !input.object_is(&my_id) && !input.object_is(&public()) {
return Err(MyError::WrongActor(id_string( return Err(ErrorKind::WrongActor(id_string(input.object().as_single_id())?).into());
input.object().as_single_id(),
)?));
} }
jobs.queue(Follow::new(input, actor))?; jobs.queue(Follow::new(input, actor))?;

View file

@ -1,13 +1,18 @@
use crate::{config::Config, data::State, error::MyError}; use crate::{
config::Config,
data::State,
error::{Error, ErrorKind},
};
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use log::error;
use rand::{seq::SliceRandom, thread_rng}; use rand::{seq::SliceRandom, thread_rng};
use std::io::BufWriter; use std::io::BufWriter;
use tracing::error;
#[tracing::instrument(name = "Index")]
pub(crate) async fn route( pub(crate) async fn route(
state: web::Data<State>, state: web::Data<State>,
config: web::Data<Config>, config: web::Data<Config>,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, Error> {
let mut nodes = state.node_cache().nodes().await?; let mut nodes = state.node_cache().nodes().await?;
nodes.shuffle(&mut thread_rng()); nodes.shuffle(&mut thread_rng());
let mut buf = BufWriter::new(Vec::new()); let mut buf = BufWriter::new(Vec::new());
@ -15,7 +20,7 @@ pub(crate) async fn route(
crate::templates::index(&mut buf, &nodes, &config)?; crate::templates::index(&mut buf, &nodes, &config)?;
let buf = buf.into_inner().map_err(|e| { let buf = buf.into_inner().map_err(|e| {
error!("Error rendering template, {}", e.error()); error!("Error rendering template, {}", e.error());
MyError::FlushBuffer ErrorKind::FlushBuffer
})?; })?;
Ok(HttpResponse::Ok().content_type("text/html").body(buf)) Ok(HttpResponse::Ok().content_type("text/html").body(buf))

View file

@ -1,15 +1,16 @@
use crate::{data::MediaCache, error::MyError, requests::Requests}; use crate::{data::MediaCache, error::Error, requests::Requests};
use actix_web::{ use actix_web::{
http::header::{CacheControl, CacheDirective}, http::header::{CacheControl, CacheDirective},
web, HttpResponse, web, HttpResponse,
}; };
use uuid::Uuid; use uuid::Uuid;
#[tracing::instrument(name = "Media")]
pub(crate) async fn route( pub(crate) async fn route(
media: web::Data<MediaCache>, media: web::Data<MediaCache>,
requests: web::Data<Requests>, requests: web::Data<Requests>,
uuid: web::Path<Uuid>, uuid: web::Path<Uuid>,
) -> Result<HttpResponse, MyError> { ) -> Result<HttpResponse, Error> {
let uuid = uuid.into_inner(); let uuid = uuid.into_inner();
if let Some((content_type, bytes)) = media.get_bytes(uuid).await? { if let Some((content_type, bytes)) = media.get_bytes(uuid).await? {

View file

@ -5,6 +5,7 @@ use crate::{
use actix_web::{web, Responder}; use actix_web::{web, Responder};
use actix_webfinger::Link; use actix_webfinger::Link;
#[tracing::instrument(name = "Well Known NodeInfo")]
pub(crate) async fn well_known(config: web::Data<Config>) -> impl Responder { pub(crate) async fn well_known(config: web::Data<Config>) -> impl Responder {
web::Json(Links { web::Json(Links {
links: vec![Link { links: vec![Link {
@ -22,6 +23,7 @@ struct Links {
links: Vec<Link>, links: Vec<Link>,
} }
#[tracing::instrument(name = "NodeInfo")]
pub(crate) async fn route( pub(crate) async fn route(
config: web::Data<Config>, config: web::Data<Config>,
state: web::Data<State>, state: web::Data<State>,

View file

@ -4,6 +4,7 @@ use actix_web::{
web, HttpResponse, web, HttpResponse,
}; };
#[tracing::instrument(name = "Statistics")]
pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse { pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse {
if let Some(data) = StaticFile::get(&filename.into_inner()) { if let Some(data) = StaticFile::get(&filename.into_inner()) {
HttpResponse::Ok() HttpResponse::Ok()