From 69e77dfa742978560e37ed1f049bc0da2a304778 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Sun, 19 Feb 2023 21:26:01 +0900 Subject: [PATCH] Various improvements for usability, examples and docs --- Cargo.lock | 142 ++----- Cargo.toml | 24 +- .../activities/accept.rs | 8 +- .../activities/create_post.rs} | 12 +- .../activities/follow.rs | 12 +- .../activities/mod.rs | 2 +- examples/local_federation/actix_web/http.rs | 58 +++ .../actix_web/mod.rs | 0 examples/local_federation/axum/http.rs | 64 +++ .../axum/mod.rs | 0 .../error.rs | 0 examples/local_federation/instance.rs | 63 +++ .../main.rs | 24 +- .../objects/mod.rs | 2 +- .../objects/person.rs | 71 ++-- .../objects/post.rs} | 32 +- .../utils.rs | 0 examples/simple_federation/actix_web/http.rs | 66 --- examples/simple_federation/axum/http.rs | 93 ----- examples/simple_federation/instance.rs | 68 ---- src/config.rs | 230 +++++++++++ src/core/activity_queue.rs | 47 ++- src/core/actix_web/inbox.rs | 112 +++++- src/core/actix_web/middleware.rs | 15 +- src/core/axum/digest.rs | 45 --- src/core/axum/inbox.rs | 29 +- src/core/axum/json.rs | 18 +- src/core/axum/middleware.rs | 16 +- src/core/axum/mod.rs | 87 ++-- src/core/object_id.rs | 120 +++--- src/core/signatures.rs | 70 +++- src/deser/helpers.rs | 83 ---- src/lib.rs | 179 +-------- src/{deser => protocol}/context.rs | 29 +- src/protocol/helpers.rs | 115 ++++++ src/{deser => protocol}/mod.rs | 0 src/{deser => protocol}/values.rs | 2 +- src/request_data.rs | 73 +--- src/traits.rs | 375 +++++++++++++++--- src/{utils.rs => utils/mod.rs} | 42 +- 40 files changed, 1409 insertions(+), 1019 deletions(-) rename examples/{simple_federation => local_federation}/activities/accept.rs (84%) rename examples/{simple_federation/activities/create_note.rs => local_federation/activities/create_post.rs} (83%) rename examples/{simple_federation => local_federation}/activities/follow.rs (85%) rename examples/{simple_federation => local_federation}/activities/mod.rs (60%) create mode 100644 examples/local_federation/actix_web/http.rs rename examples/{simple_federation => local_federation}/actix_web/mod.rs (100%) create mode 100644 examples/local_federation/axum/http.rs rename examples/{simple_federation => local_federation}/axum/mod.rs (100%) rename examples/{simple_federation => local_federation}/error.rs (100%) create mode 100644 examples/local_federation/instance.rs rename examples/{simple_federation => local_federation}/main.rs (57%) rename examples/{simple_federation => local_federation}/objects/mod.rs (53%) rename examples/{simple_federation => local_federation}/objects/person.rs (71%) rename examples/{simple_federation/objects/note.rs => local_federation/objects/post.rs} (74%) rename examples/{simple_federation => local_federation}/utils.rs (100%) delete mode 100644 examples/simple_federation/actix_web/http.rs delete mode 100644 examples/simple_federation/axum/http.rs delete mode 100644 examples/simple_federation/instance.rs create mode 100644 src/config.rs delete mode 100644 src/core/axum/digest.rs delete mode 100644 src/deser/helpers.rs rename src/{deser => protocol}/context.rs (54%) create mode 100644 src/protocol/helpers.rs rename src/{deser => protocol}/mod.rs (100%) rename src/{deser => protocol}/values.rs (96%) rename src/{utils.rs => utils/mod.rs} (55%) diff --git a/Cargo.lock b/Cargo.lock index 4bd97c9..a3d1492 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,7 @@ dependencies = [ "anyhow", "async-trait", "axum", + "axum-macros", "background-jobs", "base64", "bytes", @@ -37,11 +38,9 @@ dependencies = [ "serde_json", "sha2", "thiserror", - "tokio", "tower", "tower-http", "tracing", - "tracing-subscriber", "url", ] @@ -272,6 +271,17 @@ dependencies = [ "syn", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -332,9 +342,9 @@ dependencies = [ [[package]] name = "axum-macros" -version = "0.3.0" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4df0fc33ada14a338b799002f7e8657711422b25d4e16afb032708d6b185621" +checksum = "5fbf955307ff8addb48d2399393c9e2740dd491537ec562b66ab364fc4a38841" dependencies = [ "heck", "proc-macro2", @@ -554,9 +564,9 @@ dependencies = [ [[package]] name = "darling" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0dd3cd20dc6b5a876612a6e5accfe7f3dd883db6d07acfbf14c128f61550dfa" +checksum = "c0808e1bd8671fb44a113a14e13497557533369847788fa2ae912b6ebfce9fa8" dependencies = [ "darling_core", "darling_macro", @@ -564,9 +574,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a784d2ccaf7c98501746bf0be29b2022ba41fd62a2e622af997a03e9f972859f" +checksum = "001d80444f28e193f30c2f293455da62dcf9a6b29918a4253152ae2b1de592cb" dependencies = [ "fnv", "ident_case", @@ -578,9 +588,9 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7618812407e9402654622dd402b0a89dff9ba93badd6540781526117b92aab7e" +checksum = "b36230598a2d5de7ec1c6f51f72d8a99a9208daff41de2084d06e3fd3ea56685" dependencies = [ "darling_core", "quote", @@ -589,18 +599,18 @@ dependencies = [ [[package]] name = "derive_builder" -version = "0.11.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07adf7be193b71cc36b193d0f5fe60b918a3a9db4dad0449f57bcfd519704a3" +checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" dependencies = [ "derive_builder_macro", ] [[package]] name = "derive_builder_core" -version = "0.11.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f91d4cfa921f1c05904dc3c57b4a32c38aed3340cce209f3a6fd1478babafc4" +checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" dependencies = [ "darling", "proc-macro2", @@ -610,9 +620,9 @@ dependencies = [ [[package]] name = "derive_builder_macro" -version = "0.11.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f0314b72bed045f3a68671b3c86328386762c93f82d98c65c3cb5e5f573dd68" +checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" dependencies = [ "derive_builder_core", "syn", @@ -692,7 +702,11 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7" dependencies = [ + "atty", + "humantime", "log", + "regex", + "termcolor", ] [[package]] @@ -930,6 +944,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.23" @@ -1120,15 +1140,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "matchers" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" -dependencies = [ - "regex-automata", -] - [[package]] name = "matchit" version = "0.6.0" @@ -1187,16 +1198,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi", -] - [[package]] name = "num-integer" version = "0.1.45" @@ -1277,12 +1278,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "parking_lot" version = "0.12.1" @@ -1430,15 +1425,6 @@ dependencies = [ "regex-syntax", ] -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax", -] - [[package]] name = "regex-syntax" version = "0.6.28" @@ -1655,15 +1641,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sharded-slab" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" -dependencies = [ - "lazy_static", -] - [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1773,15 +1750,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread_local" -version = "1.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" -dependencies = [ - "once_cell", -] - [[package]] name = "time" version = "0.3.17" @@ -1835,7 +1803,6 @@ dependencies = [ "libc", "memchr", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -1912,7 +1879,6 @@ dependencies = [ "tower", "tower-layer", "tower-service", - "tracing", ] [[package]] @@ -1958,7 +1924,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", - "valuable", ] [[package]] @@ -1971,35 +1936,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.3.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" -dependencies = [ - "matchers", - "nu-ansi-term", - "once_cell", - "regex", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", -] - [[package]] name = "try-lock" version = "0.2.3" @@ -2070,12 +2006,6 @@ dependencies = [ "serde", ] -[[package]] -name = "valuable" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" - [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 6d17e80..8533c4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,17 +25,18 @@ http = "0.2.8" sha2 = "0.10.6" background-jobs = "0.13.0" thiserror = "1.0.37" -derive_builder = "0.11.2" +derive_builder = "0.12.0" itertools = "0.10.5" dyn-clone = "1.0.9" enum_delegate = "0.2.0" httpdate = "1.0.2" http-signature-normalization-reqwest = { version = "0.7.1", default-features = false, features = ["sha-2", "middleware"] } http-signature-normalization = "0.6.0" -actix-rt = { version = "2.7.0" } +actix-rt = "2.7.0" bytes = "1.3.0" futures-core = { version = "0.3.25", default-features = false } pin-project-lite = "0.2.9" +activitystreams-kinds = "0.2.1" # Actix-web actix-web = { version = "4.2.1", default-features = false, optional = true } @@ -48,25 +49,18 @@ hyper = { version = "0.14", optional = true } [features] default = [] actix-web = ["dep:actix-web"] -axum = [ - "dep:axum", - "dep:tower", - "dep:hyper", -] +axum = ["dep:axum", "dep:tower", "dep:hyper"] [dev-dependencies] -activitystreams-kinds = "0.2.1" rand = "0.8.5" -actix-rt = "2.7.0" -tokio = { version = "1.21.2", features = ["full"] } -env_logger = { version = "0.9.3", default-features = false } -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -tower-http = { version = "0.3", features = ["map-request-body", "util", "trace"] } +env_logger = "0.9.3" +tower-http = { version = "0.3", features = ["map-request-body", "util"] } +axum-macros = "0.3.4" [profile.dev] strip = "symbols" debug = 0 [[example]] -name = "simple_federation" -path = "examples/simple_federation/main.rs" \ No newline at end of file +name = "local_federation" +path = "examples/local_federation/main.rs" diff --git a/examples/simple_federation/activities/accept.rs b/examples/local_federation/activities/accept.rs similarity index 84% rename from examples/simple_federation/activities/accept.rs rename to examples/local_federation/activities/accept.rs index f3e1243..d89e20a 100644 --- a/examples/simple_federation/activities/accept.rs +++ b/examples/local_federation/activities/accept.rs @@ -1,17 +1,17 @@ -use crate::{activities::follow::Follow, instance::DatabaseHandle, objects::person::MyUser}; +use crate::{activities::follow::Follow, instance::DatabaseHandle, objects::person::DbUser}; use activitypub_federation::{ core::object_id::ObjectId, + kinds::activity::AcceptType, request_data::RequestData, traits::ActivityHandler, }; -use activitystreams_kinds::activity::AcceptType; use serde::{Deserialize, Serialize}; use url::Url; #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct Accept { - actor: ObjectId, + actor: ObjectId, object: Follow, #[serde(rename = "type")] kind: AcceptType, @@ -19,7 +19,7 @@ pub struct Accept { } impl Accept { - pub fn new(actor: ObjectId, object: Follow, id: Url) -> Accept { + pub fn new(actor: ObjectId, object: Follow, id: Url) -> Accept { Accept { actor, object, diff --git a/examples/simple_federation/activities/create_note.rs b/examples/local_federation/activities/create_post.rs similarity index 83% rename from examples/simple_federation/activities/create_note.rs rename to examples/local_federation/activities/create_post.rs index ac35979..14eb94c 100644 --- a/examples/simple_federation/activities/create_note.rs +++ b/examples/local_federation/activities/create_post.rs @@ -1,22 +1,22 @@ use crate::{ instance::DatabaseHandle, - objects::{note::Note, person::MyUser}, - MyPost, + objects::{person::DbUser, post::Note}, + DbPost, }; use activitypub_federation::{ core::object_id::ObjectId, - deser::helpers::deserialize_one_or_many, + kinds::activity::CreateType, + protocol::helpers::deserialize_one_or_many, request_data::RequestData, traits::{ActivityHandler, ApubObject}, }; -use activitystreams_kinds::activity::CreateType; use serde::{Deserialize, Serialize}; use url::Url; #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct CreateNote { - pub(crate) actor: ObjectId, + pub(crate) actor: ObjectId, #[serde(deserialize_with = "deserialize_one_or_many")] pub(crate) to: Vec, pub(crate) object: Note, @@ -51,7 +51,7 @@ impl ActivityHandler for CreateNote { } async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { - MyPost::from_apub(self.object, data).await?; + DbPost::from_apub(self.object, data).await?; Ok(()) } } diff --git a/examples/simple_federation/activities/follow.rs b/examples/local_federation/activities/follow.rs similarity index 85% rename from examples/simple_federation/activities/follow.rs rename to examples/local_federation/activities/follow.rs index b430299..54880a9 100644 --- a/examples/simple_federation/activities/follow.rs +++ b/examples/local_federation/activities/follow.rs @@ -2,29 +2,29 @@ use crate::{ activities::accept::Accept, generate_object_id, instance::DatabaseHandle, - objects::person::MyUser, + objects::person::DbUser, }; use activitypub_federation::{ core::object_id::ObjectId, + kinds::activity::FollowType, request_data::RequestData, traits::{ActivityHandler, Actor}, }; -use activitystreams_kinds::activity::FollowType; use serde::{Deserialize, Serialize}; use url::Url; #[derive(Deserialize, Serialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct Follow { - pub(crate) actor: ObjectId, - pub(crate) object: ObjectId, + pub(crate) actor: ObjectId, + pub(crate) object: ObjectId, #[serde(rename = "type")] kind: FollowType, id: Url, } impl Follow { - pub fn new(actor: ObjectId, object: ObjectId, id: Url) -> Follow { + pub fn new(actor: ObjectId, object: ObjectId, id: Url) -> Follow { Follow { actor, object, @@ -60,7 +60,7 @@ impl ActivityHandler for Follow { // send back an accept let follower = self.actor.dereference(data).await?; - let id = generate_object_id(data.local_instance().hostname())?; + let id = generate_object_id(data.hostname())?; let accept = Accept::new(local_user.ap_id.clone(), self, id.clone()); local_user .send(accept, vec![follower.shared_inbox_or_inbox()], data) diff --git a/examples/simple_federation/activities/mod.rs b/examples/local_federation/activities/mod.rs similarity index 60% rename from examples/simple_federation/activities/mod.rs rename to examples/local_federation/activities/mod.rs index 59d2fb0..73f5dd6 100644 --- a/examples/simple_federation/activities/mod.rs +++ b/examples/local_federation/activities/mod.rs @@ -1,3 +1,3 @@ pub mod accept; -pub mod create_note; +pub mod create_post; pub mod follow; diff --git a/examples/local_federation/actix_web/http.rs b/examples/local_federation/actix_web/http.rs new file mode 100644 index 0000000..02956b6 --- /dev/null +++ b/examples/local_federation/actix_web/http.rs @@ -0,0 +1,58 @@ +use crate::{ + error::Error, + instance::DatabaseHandle, + objects::person::{DbUser, PersonAcceptedActivities}, +}; +use activitypub_federation::{ + config::FederationConfig, + core::actix_web::inbox::receive_activity, + protocol::context::WithContext, + request_data::{ApubMiddleware, RequestData}, + traits::ApubObject, + APUB_JSON_CONTENT_TYPE, +}; +use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer}; +use anyhow::anyhow; + +pub fn listen(data: &FederationConfig) -> Result<(), Error> { + let hostname = data.hostname(); + let data = data.clone(); + let server = HttpServer::new(move || { + App::new() + .wrap(ApubMiddleware::new(data.clone())) + .route("/{user}", web::get().to(http_get_user)) + .route("/{user}/inbox", web::post().to(http_post_user_inbox)) + }) + .bind(hostname)? + .run(); + actix_rt::spawn(server); + Ok(()) +} + +/// Handles requests to fetch user json over HTTP +pub async fn http_get_user( + user_name: web::Path, + data: RequestData, +) -> Result { + let db_user = data.local_user(); + if user_name.into_inner() == db_user.name { + let apub_user = db_user.into_apub(&data).await?; + Ok(HttpResponse::Ok() + .content_type(APUB_JSON_CONTENT_TYPE) + .json(WithContext::new_default(apub_user))) + } else { + Err(anyhow!("Invalid user").into()) + } +} + +/// Handles messages received in user inbox +pub async fn http_post_user_inbox( + request: HttpRequest, + body: Bytes, + data: RequestData, +) -> Result { + receive_activity::, DbUser, DatabaseHandle>( + request, body, &data, + ) + .await +} diff --git a/examples/simple_federation/actix_web/mod.rs b/examples/local_federation/actix_web/mod.rs similarity index 100% rename from examples/simple_federation/actix_web/mod.rs rename to examples/local_federation/actix_web/mod.rs diff --git a/examples/local_federation/axum/http.rs b/examples/local_federation/axum/http.rs new file mode 100644 index 0000000..062251f --- /dev/null +++ b/examples/local_federation/axum/http.rs @@ -0,0 +1,64 @@ +use crate::{ + error::Error, + instance::DatabaseHandle, + objects::person::{DbUser, Person, PersonAcceptedActivities}, +}; +use activitypub_federation::{ + config::FederationConfig, + core::axum::{inbox::receive_activity, json::ApubJson, ActivityData}, + protocol::context::WithContext, + request_data::{ApubMiddleware, RequestData}, + traits::ApubObject, +}; +use anyhow::anyhow; +use axum::{ + extract::Path, + response::IntoResponse, + routing::{get, post}, + Router, +}; +use std::net::ToSocketAddrs; + +pub fn listen(data: &FederationConfig) -> Result<(), Error> { + let hostname = data.hostname(); + let data = data.clone(); + let app = Router::new() + .route("/:user/inbox", post(http_post_user_inbox)) + .route("/:user", get(http_get_user)) + .layer(ApubMiddleware::new(data)); + + let addr = hostname + .to_socket_addrs()? + .next() + .expect("Failed to lookup domain name"); + let server = axum::Server::bind(&addr).serve(app.into_make_service()); + + actix_rt::spawn(server); + Ok(()) +} + +#[axum_macros::debug_handler] +async fn http_get_user( + Path(user): Path, + data: RequestData, +) -> Result>, Error> { + let db_user = data.local_user(); + if user == db_user.name { + let apub_user = db_user.into_apub(&data).await?; + Ok(ApubJson(WithContext::new_default(apub_user))) + } else { + Err(anyhow!("Invalid user {user}").into()) + } +} + +#[axum_macros::debug_handler] +async fn http_post_user_inbox( + data: RequestData, + activity_data: ActivityData, +) -> impl IntoResponse { + receive_activity::, DbUser, DatabaseHandle>( + activity_data, + &data, + ) + .await +} diff --git a/examples/simple_federation/axum/mod.rs b/examples/local_federation/axum/mod.rs similarity index 100% rename from examples/simple_federation/axum/mod.rs rename to examples/local_federation/axum/mod.rs diff --git a/examples/simple_federation/error.rs b/examples/local_federation/error.rs similarity index 100% rename from examples/simple_federation/error.rs rename to examples/local_federation/error.rs diff --git a/examples/local_federation/instance.rs b/examples/local_federation/instance.rs new file mode 100644 index 0000000..404934b --- /dev/null +++ b/examples/local_federation/instance.rs @@ -0,0 +1,63 @@ +use crate::{ + objects::{person::DbUser, post::DbPost}, + Error, +}; +use activitypub_federation::config::{FederationConfig, UrlVerifier}; +use async_trait::async_trait; +use std::sync::{Arc, Mutex}; +use url::Url; + +pub type DatabaseHandle = Arc; + +/// Our "database" which contains all known posts users (local and federated) +pub struct Database { + pub users: Mutex>, + pub posts: Mutex>, +} + +/// Use this to store your federation blocklist, or a database connection needed to retrieve it. +#[derive(Clone)] +struct MyUrlVerifier(); + +#[async_trait] +impl UrlVerifier for MyUrlVerifier { + async fn verify(&self, url: &Url) -> Result<(), &'static str> { + if url.domain() == Some("malicious.com") { + Err("malicious domain") + } else { + Ok(()) + } + } +} + +pub fn listen(data: &FederationConfig) -> Result<(), Error> { + if cfg!(feature = "actix-web") == cfg!(feature = "axum") { + panic!("Exactly one of features \"actix-web\" and \"axum\" must be enabled"); + } + #[cfg(feature = "actix-web")] + crate::actix_web::http::listen(data)?; + #[cfg(feature = "axum")] + crate::axum::http::listen(data)?; + Ok(()) +} + +impl Database { + pub fn new(hostname: &str, name: String) -> Result, Error> { + let local_user = DbUser::new(hostname, name)?; + let database = Arc::new(Database { + users: Mutex::new(vec![local_user]), + posts: Mutex::new(vec![]), + }); + let config = FederationConfig::builder() + .hostname(hostname) + .app_data(database) + .debug(true) + .build()?; + Ok(config) + } + + pub fn local_user(&self) -> DbUser { + let lock = self.users.lock().unwrap(); + lock.first().unwrap().clone() + } +} diff --git a/examples/simple_federation/main.rs b/examples/local_federation/main.rs similarity index 57% rename from examples/simple_federation/main.rs rename to examples/local_federation/main.rs index 2bf5b1b..baa55a2 100644 --- a/examples/simple_federation/main.rs +++ b/examples/local_federation/main.rs @@ -1,10 +1,10 @@ use crate::{ instance::{listen, Database}, - objects::note::MyPost, + objects::post::DbPost, utils::generate_object_id, }; use error::Error; -use tracing::log::LevelFilter; +use tracing::log::{info, LevelFilter}; mod activities; #[cfg(feature = "actix-web")] @@ -19,35 +19,41 @@ mod utils; #[actix_rt::main] async fn main() -> Result<(), Error> { env_logger::builder() - .filter_level(LevelFilter::Debug) + .filter_level(LevelFilter::Warn) + .filter_module("local_federation", LevelFilter::Info) + .format_timestamp(None) .init(); - let alpha = Database::new("localhost:8001".to_string())?; - let beta = Database::new("localhost:8002".to_string())?; + info!("Starting local instances alpha and beta on localhost:8001, localhost:8002"); + let alpha = Database::new("localhost:8001", "alpha".to_string())?; + let beta = Database::new("localhost:8002", "beta".to_string())?; listen(&alpha)?; listen(&beta)?; + info!("Local instances started"); - // alpha user follows beta user + info!("Alpha user follows beta user"); alpha .local_user() .follow(&beta.local_user(), &alpha.to_request_data()) .await?; - // assert that follow worked correctly assert_eq!( beta.local_user().followers(), &vec![alpha.local_user().ap_id.inner().clone()] ); + info!("Follow was successful"); - // beta sends a post to its followers - let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id); + info!("Beta sends a post to its followers"); + let sent_post = DbPost::new("Hello world!".to_string(), beta.local_user().ap_id)?; beta.local_user() .post(sent_post.clone(), &beta.to_request_data()) .await?; let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap(); + info!("Alpha received post: {}", received_post.text); // assert that alpha received the post assert_eq!(received_post.text, sent_post.text); assert_eq!(received_post.ap_id.inner(), sent_post.ap_id.inner()); assert_eq!(received_post.creator.inner(), sent_post.creator.inner()); + info!("Test completed"); Ok(()) } diff --git a/examples/simple_federation/objects/mod.rs b/examples/local_federation/objects/mod.rs similarity index 53% rename from examples/simple_federation/objects/mod.rs rename to examples/local_federation/objects/mod.rs index 20b1a49..b5239ab 100644 --- a/examples/simple_federation/objects/mod.rs +++ b/examples/local_federation/objects/mod.rs @@ -1,2 +1,2 @@ -pub mod note; pub mod person; +pub mod post; diff --git a/examples/simple_federation/objects/person.rs b/examples/local_federation/objects/person.rs similarity index 71% rename from examples/simple_federation/objects/person.rs rename to examples/local_federation/objects/person.rs index 078cf74..9af7e07 100644 --- a/examples/simple_federation/objects/person.rs +++ b/examples/local_federation/objects/person.rs @@ -1,27 +1,29 @@ use crate::{ - activities::{accept::Accept, create_note::CreateNote, follow::Follow}, + activities::{accept::Accept, create_post::CreateNote, follow::Follow}, error::Error, instance::DatabaseHandle, - objects::note::MyPost, + objects::post::DbPost, utils::generate_object_id, }; use activitypub_federation::{ core::{ activity_queue::send_activity, object_id::ObjectId, - signatures::{Keypair, PublicKey}, + signatures::{generate_actor_keypair, PublicKey}, }, - deser::context::WithContext, + kinds::actor::PersonType, + protocol::context::WithContext, request_data::RequestData, traits::{ActivityHandler, Actor, ApubObject}, }; -use activitystreams_kinds::actor::PersonType; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use url::Url; #[derive(Debug, Clone)] -pub struct MyUser { - pub ap_id: ObjectId, +pub struct DbUser { + pub name: String, + pub ap_id: ObjectId, pub inbox: Url, // exists for all users (necessary to verify http signatures) public_key: String, @@ -41,19 +43,20 @@ pub enum PersonAcceptedActivities { CreateNote(CreateNote), } -impl MyUser { - pub fn new(ap_id: Url, keypair: Keypair) -> MyUser { - let mut inbox = ap_id.clone(); - inbox.set_path("/inbox"); - let ap_id = ObjectId::new(ap_id); - MyUser { +impl DbUser { + pub fn new(hostname: &str, name: String) -> Result { + let ap_id = Url::parse(&format!("http://{}/{}", hostname, &name))?.into(); + let inbox = Url::parse(&format!("http://{}/{}/inbox", hostname, &name))?; + let keypair = generate_actor_keypair()?; + Ok(DbUser { + name, ap_id, inbox, public_key: keypair.public_key, private_key: Some(keypair.private_key), followers: vec![], local: true, - } + }) } } @@ -62,12 +65,13 @@ impl MyUser { pub struct Person { #[serde(rename = "type")] kind: PersonType, - id: ObjectId, + preferred_username: String, + id: ObjectId, inbox: Url, public_key: PublicKey, } -impl MyUser { +impl DbUser { pub fn followers(&self) -> &Vec { &self.followers } @@ -82,29 +86,29 @@ impl MyUser { pub async fn follow( &self, - other: &MyUser, - instance: &RequestData, + other: &DbUser, + data: &RequestData, ) -> Result<(), Error> { - let id = generate_object_id(instance.local_instance().hostname())?; + let id = generate_object_id(data.hostname())?; let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); - self.send(follow, vec![other.shared_inbox_or_inbox()], instance) + self.send(follow, vec![other.shared_inbox_or_inbox()], data) .await?; Ok(()) } pub async fn post( &self, - post: MyPost, - instance: &RequestData, + post: DbPost, + data: &RequestData, ) -> Result<(), Error> { - let id = generate_object_id(instance.local_instance().hostname())?; - let create = CreateNote::new(post.into_apub(instance).await?, id.clone()); + let id = generate_object_id(data.hostname())?; + let create = CreateNote::new(post.into_apub(data).await?, id.clone()); let mut inboxes = vec![]; for f in self.followers.clone() { - let user: MyUser = ObjectId::new(f).dereference(instance).await?; + let user: DbUser = ObjectId::new(f).dereference(data).await?; inboxes.push(user.shared_inbox_or_inbox()); } - self.send(create, inboxes, instance).await?; + self.send(create, inboxes, data).await?; Ok(()) } @@ -115,16 +119,16 @@ impl MyUser { data: &RequestData, ) -> Result<(), ::Error> where - Activity: ActivityHandler + Serialize + Send + Sync, + Activity: ActivityHandler + Serialize + Debug + Send + Sync, ::Error: From + From, { let activity = WithContext::new_default(activity); send_activity( activity, self.public_key(), - self.private_key.clone().expect("has private key"), + self.private_key.clone().unwrap(), recipients, - data.local_instance(), + data, ) .await?; Ok(()) @@ -132,10 +136,9 @@ impl MyUser { } #[async_trait::async_trait] -impl ApubObject for MyUser { +impl ApubObject for DbUser { type DataType = DatabaseHandle; type ApubType = Person; - type DbType = MyUser; type Error = Error; async fn read_from_apub_id( @@ -155,6 +158,7 @@ impl ApubObject for MyUser { _data: &RequestData, ) -> Result { Ok(Person { + preferred_username: self.name.clone(), kind: Default::default(), id: self.ap_id.clone(), inbox: self.inbox.clone(), @@ -166,7 +170,8 @@ impl ApubObject for MyUser { apub: Self::ApubType, _data: &RequestData, ) -> Result { - Ok(MyUser { + Ok(DbUser { + name: apub.preferred_username, ap_id: apub.id, inbox: apub.inbox, public_key: apub.public_key.public_key_pem, @@ -177,7 +182,7 @@ impl ApubObject for MyUser { } } -impl Actor for MyUser { +impl Actor for DbUser { fn public_key(&self) -> &str { &self.public_key } diff --git a/examples/simple_federation/objects/note.rs b/examples/local_federation/objects/post.rs similarity index 74% rename from examples/simple_federation/objects/note.rs rename to examples/local_federation/objects/post.rs index 51bcee8..18a49a7 100644 --- a/examples/simple_federation/objects/note.rs +++ b/examples/local_federation/objects/post.rs @@ -1,30 +1,31 @@ -use crate::{error::Error, generate_object_id, instance::DatabaseHandle, objects::person::MyUser}; +use crate::{error::Error, generate_object_id, instance::DatabaseHandle, objects::person::DbUser}; use activitypub_federation::{ core::object_id::ObjectId, - deser::helpers::deserialize_one_or_many, + kinds::{object::NoteType, public}, + protocol::helpers::deserialize_one_or_many, request_data::RequestData, traits::ApubObject, }; -use activitystreams_kinds::{object::NoteType, public}; use serde::{Deserialize, Serialize}; use url::Url; #[derive(Clone, Debug)] -pub struct MyPost { +pub struct DbPost { pub text: String, - pub ap_id: ObjectId, - pub creator: ObjectId, + pub ap_id: ObjectId, + pub creator: ObjectId, pub local: bool, } -impl MyPost { - pub fn new(text: String, creator: ObjectId) -> MyPost { - MyPost { +impl DbPost { + pub fn new(text: String, creator: ObjectId) -> Result { + let ap_id = generate_object_id(creator.inner().domain().unwrap())?.into(); + Ok(DbPost { text, - ap_id: ObjectId::new(generate_object_id(creator.inner().domain().unwrap()).unwrap()), + ap_id, creator, local: true, - } + }) } } @@ -33,18 +34,17 @@ impl MyPost { pub struct Note { #[serde(rename = "type")] kind: NoteType, - id: ObjectId, - pub(crate) attributed_to: ObjectId, + id: ObjectId, + pub(crate) attributed_to: ObjectId, #[serde(deserialize_with = "deserialize_one_or_many")] pub(crate) to: Vec, content: String, } #[async_trait::async_trait] -impl ApubObject for MyPost { +impl ApubObject for DbPost { type DataType = DatabaseHandle; type ApubType = Note; - type DbType = (); type Error = Error; async fn read_from_apub_id( @@ -72,7 +72,7 @@ impl ApubObject for MyPost { apub: Self::ApubType, data: &RequestData, ) -> Result { - let post = MyPost { + let post = DbPost { text: apub.content, ap_id: apub.id, creator: apub.attributed_to, diff --git a/examples/simple_federation/utils.rs b/examples/local_federation/utils.rs similarity index 100% rename from examples/simple_federation/utils.rs rename to examples/local_federation/utils.rs diff --git a/examples/simple_federation/actix_web/http.rs b/examples/simple_federation/actix_web/http.rs deleted file mode 100644 index 1c2850d..0000000 --- a/examples/simple_federation/actix_web/http.rs +++ /dev/null @@ -1,66 +0,0 @@ -use crate::{ - error::Error, - instance::DatabaseHandle, - objects::person::{MyUser, PersonAcceptedActivities}, -}; -use activitypub_federation::{ - core::{actix_web::inbox::receive_activity, object_id::ObjectId}, - deser::context::WithContext, - request_data::{ApubContext, ApubMiddleware, RequestData}, - traits::ApubObject, - APUB_JSON_CONTENT_TYPE, -}; -use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer}; -use tokio::task; -use url::Url; - -pub fn listen(data: &ApubContext) -> Result<(), Error> { - let hostname = data.local_instance().hostname(); - let data = data.clone(); - let server = HttpServer::new(move || { - App::new() - .wrap(ApubMiddleware::new(data.clone())) - .route("/objects/{user_name}", web::get().to(http_get_user)) - .service( - web::scope("") - // Just a single, global inbox for simplicity - .route("/inbox", web::post().to(http_post_user_inbox)), - ) - }) - .bind(hostname)? - .run(); - task::spawn(server); - Ok(()) -} - -/// Handles requests to fetch user json over HTTP -pub async fn http_get_user( - request: HttpRequest, - data: RequestData, -) -> Result { - let hostname: String = data.local_instance().hostname().to_string(); - let request_url = format!("http://{}{}", hostname, &request.uri().to_string()); - let url = Url::parse(&request_url)?; - let user = ObjectId::::new(url) - .dereference_local(&data) - .await? - .into_apub(&data) - .await?; - - Ok(HttpResponse::Ok() - .content_type(APUB_JSON_CONTENT_TYPE) - .json(WithContext::new_default(user))) -} - -/// Handles messages received in user inbox -pub async fn http_post_user_inbox( - request: HttpRequest, - payload: String, - data: RequestData, -) -> Result { - let activity = serde_json::from_str(&payload)?; - receive_activity::, MyUser, DatabaseHandle>( - request, activity, &data, - ) - .await -} diff --git a/examples/simple_federation/axum/http.rs b/examples/simple_federation/axum/http.rs deleted file mode 100644 index 7040f83..0000000 --- a/examples/simple_federation/axum/http.rs +++ /dev/null @@ -1,93 +0,0 @@ -use crate::{ - error::Error, - instance::DatabaseHandle, - objects::person::{MyUser, Person, PersonAcceptedActivities}, -}; -use activitypub_federation::{ - core::{ - axum::{inbox::receive_activity, json::ApubJson, verify_request_payload, DigestVerified}, - object_id::ObjectId, - }, - deser::context::WithContext, - request_data::{ApubContext, ApubMiddleware, RequestData}, - traits::ApubObject, -}; -use axum::{ - body, - extract::OriginalUri, - middleware, - response::IntoResponse, - routing::{get, post}, - Extension, - Json, - Router, -}; -use http::{HeaderMap, Method, Request}; -use hyper::Body; -use std::net::ToSocketAddrs; -use tokio::task; -use tower::ServiceBuilder; -use tower_http::{trace::TraceLayer, ServiceBuilderExt}; -use url::Url; - -pub fn listen(data: &ApubContext) -> Result<(), Error> { - let hostname = data.local_instance().hostname(); - let data = data.clone(); - let app = Router::new() - .route("/inbox", post(http_post_user_inbox)) - .layer( - ServiceBuilder::new() - .map_request_body(body::boxed) - .layer(middleware::from_fn(verify_request_payload)), - ) - .route("/objects/:user_name", get(http_get_user)) - .layer(ApubMiddleware::new(data)) - .layer(TraceLayer::new_for_http()); - - // run it - let addr = hostname - .to_socket_addrs()? - .next() - .expect("Failed to lookup domain name"); - let server = axum::Server::bind(&addr).serve(app.into_make_service()); - - task::spawn(server); - Ok(()) -} - -async fn http_get_user( - data: RequestData, - request: Request, -) -> Result>, Error> { - let hostname: String = data.local_instance().hostname().to_string(); - let request_url = format!("http://{}{}", hostname, &request.uri()); - - let url = Url::parse(&request_url).expect("Failed to parse url"); - - let user = ObjectId::::new(url) - .dereference_local(&data) - .await? - .into_apub(&data) - .await?; - - Ok(ApubJson(WithContext::new_default(user))) -} - -async fn http_post_user_inbox( - headers: HeaderMap, - method: Method, - OriginalUri(uri): OriginalUri, - data: RequestData, - Extension(digest_verified): Extension, - Json(activity): Json>, -) -> impl IntoResponse { - receive_activity::, MyUser, DatabaseHandle>( - digest_verified, - activity, - &data, - headers, - method, - uri, - ) - .await -} diff --git a/examples/simple_federation/instance.rs b/examples/simple_federation/instance.rs deleted file mode 100644 index f7cb37f..0000000 --- a/examples/simple_federation/instance.rs +++ /dev/null @@ -1,68 +0,0 @@ -use crate::{ - generate_object_id, - objects::{note::MyPost, person::MyUser}, - Error, -}; -use activitypub_federation::{ - core::signatures::generate_actor_keypair, - request_data::ApubContext, - FederationSettings, - InstanceConfig, - UrlVerifier, -}; -use async_trait::async_trait; -use reqwest::Client; -use std::sync::{Arc, Mutex}; -use url::Url; - -pub type DatabaseHandle = Arc; - -/// Our "database" which contains all known posts users (local and federated) -pub struct Database { - pub users: Mutex>, - pub posts: Mutex>, -} - -/// Use this to store your federation blocklist, or a database connection needed to retrieve it. -#[derive(Clone)] -struct MyUrlVerifier(); - -#[async_trait] -impl UrlVerifier for MyUrlVerifier { - async fn verify(&self, url: &Url) -> Result<(), &'static str> { - if url.domain() == Some("malicious.com") { - Err("malicious domain") - } else { - Ok(()) - } - } -} - -pub fn listen(data: &ApubContext) -> Result<(), Error> { - #[cfg(feature = "actix-web")] - crate::actix_web::http::listen(data)?; - #[cfg(feature = "axum")] - crate::axum::http::listen(data)?; - Ok(()) -} - -impl Database { - pub fn new(hostname: String) -> Result, Error> { - let settings = FederationSettings::builder() - .debug(true) - .url_verifier(Box::new(MyUrlVerifier())) - .build()?; - let local_instance = - InstanceConfig::new(hostname.clone(), Client::default().into(), settings); - let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?); - let instance = Arc::new(Database { - users: Mutex::new(vec![local_user]), - posts: Mutex::new(vec![]), - }); - Ok(ApubContext::new(instance, local_instance)) - } - - pub fn local_user(&self) -> MyUser { - self.users.lock().unwrap().first().cloned().unwrap() - } -} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..62818d8 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,230 @@ +use crate::{ + core::activity_queue::create_activity_queue, + request_data::RequestData, + traits::ActivityHandler, + utils::verify_domains_match, + Error, +}; +use async_trait::async_trait; +use background_jobs::Manager; +use derive_builder::Builder; +use dyn_clone::{clone_trait_object, DynClone}; +use reqwest_middleware::ClientWithMiddleware; +use serde::de::DeserializeOwned; +use std::{ops::Deref, sync::Arc, time::Duration}; +use url::Url; + +/// Various settings related to Activitypub federation. +/// +/// Use [FederationSettings.builder()] to initialize this. +/// +/// ``` +/// # use activitypub_federation::config::FederationConfig; +/// # let _ = actix_rt::System::new(); +/// let settings = FederationConfig::builder() +/// .hostname("example.com") +/// .app_data(()) +/// .http_fetch_limit(50) +/// .worker_count(16) +/// .build()?; +/// # Ok::<(), anyhow::Error>(()) +/// ``` +#[derive(Builder, Clone)] +#[builder(build_fn(private, name = "partial_build"))] +pub struct FederationConfig { + /// The domain where this federated instance is running + #[builder(setter(into))] + pub(crate) hostname: String, + /// Data which the application requires in handlers, such as database connection + /// or configuration. + pub(crate) app_data: T, + /// Maximum number of outgoing HTTP requests per incoming HTTP request. See + /// [crate::utils::fetch_object_http] for more details. + #[builder(default = "20")] + pub(crate) http_fetch_limit: i32, + #[builder(default = "reqwest::Client::default().into()")] + /// HTTP client used for all outgoing requests. Middleware can be used to add functionality + /// like log tracing or retry of failed requests. + pub(crate) client: ClientWithMiddleware, + /// Number of worker threads for sending outgoing activities + #[builder(default = "64")] + pub(crate) worker_count: u64, + /// Run library in debug mode. This allows usage of http and localhost urls. It also sends + /// outgoing activities synchronously, not in background thread. This helps to make tests + /// more consistent. + /// Do not use for production. + #[builder(default = "false")] + pub(crate) debug: bool, + /// Timeout for all HTTP requests. HTTP signatures are valid for 10s, so it makes sense to + /// use the same as timeout when sending + #[builder(default = "Duration::from_secs(10)")] + pub(crate) request_timeout: Duration, + /// Function used to verify that urls are valid, See [UrlVerifier] for details. + #[builder(default = "Box::new(DefaultUrlVerifier())")] + pub(crate) url_verifier: Box, + /// Enable to sign HTTP signatures according to draft 10, which does not include (created) and + /// (expires) fields. This is required for compatibility with some software like Pleroma. + /// + /// + #[builder(default = "false")] + pub(crate) http_signature_compat: bool, + /// Queue for sending outgoing activities. Only optional to make builder work, its always + /// present once constructed. + #[builder(setter(skip))] + pub(crate) activity_queue: Option>, +} + +impl FederationConfig { + pub fn builder() -> FederationConfigBuilder { + FederationConfigBuilder::default() + } + pub(crate) async fn verify_url_and_domain( + &self, + activity: &Activity, + ) -> Result<(), Error> + where + Activity: ActivityHandler + DeserializeOwned + Send + 'static, + { + verify_domains_match(activity.id(), activity.actor())?; + self.verify_url_valid(activity.id()).await?; + if self.is_local_url(activity.id()) { + return Err(Error::UrlVerificationError( + "Activity was sent from local instance", + )); + } + + Ok(()) + } + + /// Create new [RequestData] from this. You should prefer to use a middleware if possible. + pub fn to_request_data(&self) -> RequestData { + RequestData { + config: self.clone(), + request_counter: Default::default(), + } + } + + /// Perform some security checks on URLs as mentioned in activitypub spec, and call user-supplied + /// [`InstanceSettings.verify_url_function`]. + /// + /// https://www.w3.org/TR/activitypub/#security-considerations + pub(crate) async fn verify_url_valid(&self, url: &Url) -> Result<(), Error> { + match url.scheme() { + "https" => {} + "http" => { + if !self.debug { + return Err(Error::UrlVerificationError( + "Http urls are only allowed in debug mode", + )); + } + } + _ => return Err(Error::UrlVerificationError("Invalid url scheme")), + }; + + if url.domain().is_none() { + return Err(Error::UrlVerificationError("Url must have a domain")); + } + + if url.domain() == Some("localhost") && !self.debug { + return Err(Error::UrlVerificationError( + "Localhost is only allowed in debug mode", + )); + } + + self.url_verifier + .verify(url) + .await + .map_err(Error::UrlVerificationError)?; + + Ok(()) + } + + /// Returns true if the url refers to this instance. Handles hostnames like `localhost:8540` for + /// local debugging. + pub(crate) fn is_local_url(&self, url: &Url) -> bool { + let mut domain = url.domain().expect("id has domain").to_string(); + if let Some(port) = url.port() { + domain = format!("{}:{}", domain, port); + } + domain == self.hostname + } + + /// Returns the local hostname + pub fn hostname(&self) -> &str { + &self.hostname + } +} + +impl FederationConfigBuilder { + pub fn build(&mut self) -> Result, FederationConfigBuilderError> { + let mut config = self.partial_build()?; + let queue = create_activity_queue( + config.client.clone(), + config.worker_count, + config.request_timeout, + config.debug, + ); + config.activity_queue = Some(Arc::new(queue)); + Ok(config) + } +} + +impl Deref for FederationConfig { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.app_data + } +} + +/// Handler for validating URLs. +/// +/// This is used for implementing domain blocklists and similar functionality. It is called +/// with the ID of newly received activities, when fetching remote data from a given URL +/// and before sending an activity to a given inbox URL. If processing for this domain/URL should +/// be aborted, return an error. In case of `Ok(())`, processing continues. +/// +/// ``` +/// # use async_trait::async_trait; +/// use url::Url; +/// # use activitypub_federation::config::UrlVerifier; +/// # #[derive(Clone)] +/// # struct DatabaseConnection(); +/// # async fn get_blocklist(_: &DatabaseConnection) -> Vec { +/// # vec![] +/// # } +/// #[derive(Clone)] +/// struct Verifier { +/// db_connection: DatabaseConnection, +/// } +/// +/// #[async_trait] +/// impl UrlVerifier for Verifier { +/// async fn verify(&self, url: &Url) -> Result<(), &'static str> { +/// let blocklist = get_blocklist(&self.db_connection).await; +/// let domain = url.domain().unwrap().to_string(); +/// if blocklist.contains(&domain) { +/// Err("Domain is blocked") +/// } else { +/// Ok(()) +/// } +/// } +/// } +/// ``` +#[async_trait] +pub trait UrlVerifier: DynClone + Send { + async fn verify(&self, url: &Url) -> Result<(), &'static str>; +} + +/// Default URL verifier which does nothing. +#[derive(Clone)] +struct DefaultUrlVerifier(); + +#[async_trait] +impl UrlVerifier for DefaultUrlVerifier { + async fn verify(&self, _url: &Url) -> Result<(), &'static str> { + Ok(()) + } +} + +clone_trait_object!(UrlVerifier); diff --git a/src/core/activity_queue.rs b/src/core/activity_queue.rs index 32ce6bd..ba0e6e2 100644 --- a/src/core/activity_queue.rs +++ b/src/core/activity_queue.rs @@ -1,10 +1,9 @@ use crate::{ core::signatures::{sign_request, PublicKey}, + request_data::RequestData, traits::ActivityHandler, utils::reqwest_shim::ResponseExt, Error, - FederationSettings, - InstanceConfig, APUB_JSON_CONTENT_TYPE, }; use anyhow::anyhow; @@ -30,8 +29,10 @@ use std::{ use tracing::{debug, info, warn}; use url::Url; -/// Send out the given activity to all inboxes, automatically generating the HTTP signatures. By -/// default, sending is done on a background thread, and automatically retried on failure with +/// Hands off an activity for delivery to remote actors. +/// +/// Send out the given activity to all recipient inboxes, automatically generating the HTTP +/// signatures. In production builds, sending is done on a background thread, and automatically retried on failure with /// exponential backoff. /// /// - `activity`: The activity to be sent, gets converted to json @@ -39,28 +40,35 @@ use url::Url; /// - `private_key`: The sending actor's private key for signing HTTP signature /// - `recipients`: List of actors who should receive the activity. This gets deduplicated, and /// local/invalid inbox urls removed -pub async fn send_activity( +/// +/// TODO: how can this only take a single pubkey? seems completely wrong, should be one per recipient +/// TODO: example +/// TODO: consider reading privkey from activity +pub async fn send_activity( activity: Activity, public_key: PublicKey, private_key: String, recipients: Vec, - instance: &InstanceConfig, + data: &RequestData, ) -> Result<(), ::Error> where Activity: ActivityHandler + Serialize, ::Error: From + From, + T: Clone, { + let config = &data.config; let activity_id = activity.id(); let activity_serialized = serde_json::to_string_pretty(&activity)?; let inboxes: Vec = recipients .into_iter() .unique() - .filter(|i| !instance.is_local_url(i)) + .filter(|i| !config.is_local_url(i)) .collect(); - let activity_queue = &instance.activity_queue; + // This field is only optional to make builder work, its always present at this point + let activity_queue = config.activity_queue.as_ref().unwrap(); for inbox in inboxes { - if instance.verify_url_valid(&inbox).await.is_err() { + if config.verify_url_valid(&inbox).await.is_err() { continue; } @@ -70,10 +78,10 @@ where activity: activity_serialized.clone(), public_key: public_key.clone(), private_key: private_key.clone(), - http_signature_compat: instance.settings.http_signature_compat, + http_signature_compat: config.http_signature_compat, }; - if instance.settings.debug { - let res = do_send(message, &instance.client, instance.settings.request_timeout).await; + if config.debug { + let res = do_send(message, &config.client, config.request_timeout).await; // Don't fail on error, as we intentionally do some invalid actions in tests, to verify that // they are rejected on the receiving side. These errors shouldn't bubble up to make the API // call fail. This matches the behaviour in production. @@ -90,7 +98,7 @@ where stats.dead.this_hour(), stats.complete.this_hour() ); - if stats.running as u64 == instance.settings.worker_count { + if stats.running as u64 == config.worker_count { warn!("Maximum number of activitypub workers reached. Consider increasing worker count to avoid federation delays"); } } @@ -211,20 +219,17 @@ fn generate_request_headers(inbox_url: &Url) -> HeaderMap { pub(crate) fn create_activity_queue( client: ClientWithMiddleware, - settings: &FederationSettings, + worker_count: u64, + request_timeout: Duration, + debug: bool, ) -> Manager { // queue is not used in debug mod, so dont create any workers to avoid log spam - let worker_count = if settings.debug { - 0 - } else { - settings.worker_count - }; - let timeout = settings.request_timeout; + let worker_count = if debug { 0 } else { worker_count }; // Configure and start our workers WorkerConfig::new_managed(Storage::new(ActixTimer), move |_| MyState { client: client.clone(), - timeout, + timeout: request_timeout, }) .register::() .set_worker_count("default", worker_count) diff --git a/src/core/actix_web/inbox.rs b/src/core/actix_web/inbox.rs index c47031f..12f6c2d 100644 --- a/src/core/actix_web/inbox.rs +++ b/src/core/actix_web/inbox.rs @@ -1,17 +1,20 @@ use crate::{ - core::{object_id::ObjectId, signatures::verify_signature}, + core::{ + object_id::ObjectId, + signatures::{verify_inbox_hash, verify_signature}, + }, request_data::RequestData, traits::{ActivityHandler, Actor, ApubObject}, Error, }; -use actix_web::{HttpRequest, HttpResponse}; +use actix_web::{web::Bytes, HttpRequest, HttpResponse}; use serde::de::DeserializeOwned; use tracing::debug; /// Receive an activity and perform some basic checks, including HTTP signature verification. pub async fn receive_activity( request: HttpRequest, - activity: Activity, + body: Bytes, data: &RequestData, ) -> Result::Error> where @@ -23,11 +26,12 @@ where + From<::Error> + From, ::Error: From + From, + Datatype: Clone, { - data.local_instance() - .verify_url_and_domain(&activity) - .await?; + verify_inbox_hash(request.headers().get("Digest"), &body)?; + let activity: Activity = serde_json::from_slice(&body)?; + data.config.verify_url_and_domain(&activity).await?; let actor = ObjectId::::new(activity.actor().clone()) .dereference(data) .await?; @@ -43,3 +47,99 @@ where activity.receive(data).await?; Ok(HttpResponse::Ok().finish()) } + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + config::FederationConfig, + core::signatures::{sign_request, PublicKey}, + traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR}, + }; + use actix_web::test::TestRequest; + use reqwest::Client; + use reqwest_middleware::ClientWithMiddleware; + use url::Url; + + #[actix_rt::test] + async fn test_receive_activity() { + let (body, incoming_request, config) = setup_receive_test().await; + receive_activity::( + incoming_request.to_http_request(), + body.into(), + &config.to_request_data(), + ) + .await + .unwrap(); + } + + #[actix_rt::test] + async fn test_receive_activity_invalid_body_signature() { + let (_, incoming_request, config) = setup_receive_test().await; + let err = receive_activity::( + incoming_request.to_http_request(), + "invalid".into(), + &config.to_request_data(), + ) + .await + .err() + .unwrap(); + + let e = err.root_cause().downcast_ref::().unwrap(); + assert_eq!(e, &Error::ActivityBodyDigestInvalid) + } + + #[actix_rt::test] + async fn test_receive_activity_invalid_path() { + let (body, incoming_request, config) = setup_receive_test().await; + let incoming_request = incoming_request.uri("/wrong"); + let err = receive_activity::( + incoming_request.to_http_request(), + body.into(), + &config.to_request_data(), + ) + .await + .err() + .unwrap(); + + let e = err.root_cause().downcast_ref::().unwrap(); + assert_eq!(e, &Error::ActivitySignatureInvalid) + } + + async fn setup_receive_test() -> (String, TestRequest, FederationConfig) { + let request_builder = + ClientWithMiddleware::from(Client::default()).post("https://example.com/inbox"); + let public_key = PublicKey::new_main_key( + Url::parse("https://example.com").unwrap(), + DB_USER_KEYPAIR.public_key.clone(), + ); + let activity = Follow { + actor: "http://localhost:123".try_into().unwrap(), + object: "http://localhost:124".try_into().unwrap(), + kind: Default::default(), + id: "http://localhost:123/1".try_into().unwrap(), + }; + let body = serde_json::to_string(&activity).unwrap(); + let outgoing_request = sign_request( + request_builder, + body.to_string(), + public_key, + DB_USER_KEYPAIR.private_key.clone(), + false, + ) + .await + .unwrap(); + let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path()); + for h in outgoing_request.headers() { + incoming_request = incoming_request.append_header(h); + } + + let config = FederationConfig::builder() + .hostname("localhost:8002") + .app_data(DbConnection) + .debug(true) + .build() + .unwrap(); + (body, incoming_request, config) + } +} diff --git a/src/core/actix_web/middleware.rs b/src/core/actix_web/middleware.rs index 58e4af9..1b60c8a 100644 --- a/src/core/actix_web/middleware.rs +++ b/src/core/actix_web/middleware.rs @@ -1,4 +1,7 @@ -use crate::request_data::{ApubContext, ApubMiddleware, RequestData}; +use crate::{ + config::FederationConfig, + request_data::{ApubMiddleware, RequestData}, +}; use actix_web::{ dev::{forward_ready, Payload, Service, ServiceRequest, ServiceResponse, Transform}, Error, @@ -24,19 +27,19 @@ where fn new_transform(&self, service: S) -> Self::Future { ready(Ok(ApubService { service, - context: self.0.clone(), + config: self.0.clone(), })) } } -pub struct ApubService +pub struct ApubService where S: Service, S::Future: 'static, T: Sync, { service: S, - context: ApubContext, + config: FederationConfig, } impl Service for ApubService @@ -53,7 +56,7 @@ where forward_ready!(service); fn call(&self, req: ServiceRequest) -> Self::Future { - req.extensions_mut().insert(self.context.clone()); + req.extensions_mut().insert(self.config.clone()); self.service.call(req) } @@ -64,7 +67,7 @@ impl FromRequest for RequestData { type Future = Ready>; fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future { - ready(match req.extensions().get::>() { + ready(match req.extensions().get::>() { Some(c) => Ok(c.to_request_data()), None => Err(actix_web::error::ErrorBadRequest( "Missing extension, did you register ApubMiddleware?", diff --git a/src/core/axum/digest.rs b/src/core/axum/digest.rs deleted file mode 100644 index 2046cbd..0000000 --- a/src/core/axum/digest.rs +++ /dev/null @@ -1,45 +0,0 @@ -use axum::http::HeaderValue; -use sha2::{Digest, Sha256}; - -#[derive(Clone, Debug)] -pub struct DigestPart { - pub algorithm: String, - pub digest: String, -} - -impl DigestPart { - pub fn try_from_header(h: &HeaderValue) -> Option> { - let h = h.to_str().ok()?.split(';').next()?; - let v: Vec<_> = h - .split(',') - .filter_map(|p| { - let mut iter = p.splitn(2, '='); - iter.next() - .and_then(|alg| iter.next().map(|value| (alg, value))) - }) - .map(|(alg, value)| DigestPart { - algorithm: alg.to_owned(), - digest: value.to_owned(), - }) - .collect(); - - if v.is_empty() { - None - } else { - Some(v) - } - } -} - -pub fn verify_sha256(digests: &[DigestPart], payload: &[u8]) -> bool { - let mut hasher = Sha256::new(); - - for part in digests { - hasher.update(payload); - if base64::encode(hasher.finalize_reset()) != part.digest { - return false; - } - } - - true -} diff --git a/src/core/axum/inbox.rs b/src/core/axum/inbox.rs index ecf4f9c..b9cde82 100644 --- a/src/core/axum/inbox.rs +++ b/src/core/axum/inbox.rs @@ -1,21 +1,20 @@ use crate::{ - core::{axum::DigestVerified, object_id::ObjectId, signatures::verify_signature}, + core::{ + axum::ActivityData, + object_id::ObjectId, + signatures::{verify_inbox_hash, verify_signature}, + }, request_data::RequestData, traits::{ActivityHandler, Actor, ApubObject}, Error, }; -use http::{HeaderMap, Method, Uri}; use serde::de::DeserializeOwned; use tracing::debug; /// Receive an activity and perform some basic checks, including HTTP signature verification. pub async fn receive_activity( - _digest_verified: DigestVerified, - activity: Activity, + activity_data: ActivityData, data: &RequestData, - headers: HeaderMap, - method: Method, - uri: Uri, ) -> Result<(), ::Error> where Activity: ActivityHandler + DeserializeOwned + Send + 'static, @@ -26,18 +25,26 @@ where + From<::Error> + From, ::Error: From + From, + Datatype: Clone, { - data.local_instance() - .verify_url_and_domain(&activity) - .await?; + verify_inbox_hash(activity_data.headers.get("Digest"), &activity_data.body)?; + let activity: Activity = serde_json::from_slice(&activity_data.body)?; + data.config.verify_url_and_domain(&activity).await?; let actor = ObjectId::::new(activity.actor().clone()) .dereference(data) .await?; - verify_signature(&headers, &method, &uri, actor.public_key())?; + verify_signature( + &activity_data.headers, + &activity_data.method, + &activity_data.uri, + actor.public_key(), + )?; debug!("Receiving activity {}", activity.id().to_string()); activity.receive(data).await?; Ok(()) } + +// TODO: copy tests from actix-web inbox and implement for axum as well diff --git a/src/core/axum/json.rs b/src/core/axum/json.rs index 6962942..ae9a23a 100644 --- a/src/core/axum/json.rs +++ b/src/core/axum/json.rs @@ -6,13 +6,19 @@ use serde::Serialize; /// A wrapper struct to respond with [`APUB_JSON_CONTENT_TYPE`] /// in axum handlers /// -/// ## Example: -/// ```rust, no_run -/// use activitypub_federation::deser::context::WithContext; -/// async fn http_get_user() -> Result>, Error> { -/// let user = WithContext::new_default(M); +/// ``` +/// # use anyhow::Error; +/// # use axum::extract::Path; +/// # use activitypub_federation::core::axum::json::ApubJson; +/// # use activitypub_federation::protocol::context::WithContext; +/// # use activitypub_federation::request_data::RequestData; +/// # use activitypub_federation::traits::ApubObject; +/// # use activitypub_federation::traits::tests::{DbConnection, DbUser, Person}; +/// async fn http_get_user(Path(name): Path, data: RequestData) -> Result>, Error> { +/// let user: DbUser = data.read_local_user(name).await?; +/// let person = user.into_apub(&data).await?; /// -/// Ok(ApubJson(WithContext::new_default(MyUser::default()))) +/// Ok(ApubJson(WithContext::new_default(person))) /// } /// ``` #[derive(Debug, Clone, Copy, Default)] diff --git a/src/core/axum/middleware.rs b/src/core/axum/middleware.rs index 2a2d9e6..797a58b 100644 --- a/src/core/axum/middleware.rs +++ b/src/core/axum/middleware.rs @@ -1,4 +1,7 @@ -use crate::request_data::{ApubContext, ApubMiddleware, RequestData}; +use crate::{ + config::FederationConfig, + request_data::{ApubMiddleware, RequestData}, +}; use axum::{async_trait, body::Body, extract::FromRequestParts, http::Request, response::Response}; use http::{request::Parts, StatusCode}; use std::task::{Context, Poll}; @@ -10,15 +13,15 @@ impl Layer for ApubMiddleware { fn layer(&self, inner: S) -> Self::Service { ApubService { inner, - context: self.0.clone(), + config: self.0.clone(), } } } #[derive(Clone)] -pub struct ApubService { +pub struct ApubService { inner: S, - context: ApubContext, + config: FederationConfig, } impl Service> for ApubService @@ -36,7 +39,7 @@ where } fn call(&mut self, mut request: Request) -> Self::Future { - request.extensions_mut().insert(self.context.clone()); + request.extensions_mut().insert(self.config.clone()); self.inner.call(request) } } @@ -50,8 +53,7 @@ where type Rejection = (StatusCode, &'static str); async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { - // TODO: need to set this extension from middleware - match parts.extensions.get::>() { + match parts.extensions.get::>() { Some(c) => Ok(c.to_request_data()), None => Err(( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/src/core/axum/mod.rs b/src/core/axum/mod.rs index 90c205c..21c74b0 100644 --- a/src/core/axum/mod.rs +++ b/src/core/axum/mod.rs @@ -1,81 +1,50 @@ use axum::{ async_trait, - body::{self, BoxBody, Bytes, Full}, + body::{Bytes, HttpBody}, extract::FromRequest, http::{Request, StatusCode}, - middleware::Next, response::{IntoResponse, Response}, }; -use digest::{verify_sha256, DigestPart}; +use http::{HeaderMap, Method, Uri}; -mod digest; pub mod inbox; pub mod json; pub mod middleware; -/// A request guard to ensure digest has been verified request has been -/// see [`receive_activity`] -#[derive(Clone)] -pub struct DigestVerified; - -pub struct BufferRequestBody(pub Bytes); - -pub async fn verify_request_payload( - request: Request, - next: Next, -) -> Result { - let mut request = verify_payload(request).await?; - request.extensions_mut().insert(DigestVerified); - Ok(next.run(request).await) -} - -async fn verify_payload(request: Request) -> Result, Response> { - let (parts, body) = request.into_parts(); - - // this wont work if the body is an long running stream - let bytes = hyper::body::to_bytes(body) - .await - .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?; - - match parts.headers.get("Digest") { - None => Err(( - StatusCode::UNAUTHORIZED, - "Missing digest header".to_string(), - ) - .into_response()), - Some(digest) => match DigestPart::try_from_header(digest) { - None => Err(( - StatusCode::UNAUTHORIZED, - "Malformed digest header".to_string(), - ) - .into_response()), - Some(digests) => { - if !verify_sha256(&digests, bytes.as_ref()) { - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Digest does not match payload".to_string(), - ) - .into_response()) - } else { - Ok(Request::from_parts(parts, body::boxed(Full::from(bytes)))) - } - } - }, - } +/// Contains everything that is necessary to verify HTTP signatures and receive an +/// activity, including the request body. +#[derive(Debug)] +pub struct ActivityData { + headers: HeaderMap, + method: Method, + uri: Uri, + body: Vec, } #[async_trait] -impl FromRequest for BufferRequestBody +impl FromRequest for ActivityData where + Bytes: FromRequest, + B: HttpBody + Send + 'static, S: Send + Sync, + ::Error: std::fmt::Display, + ::Data: Send, { type Rejection = Response; - async fn from_request(req: Request, state: &S) -> Result { - let body = Bytes::from_request(req, state) - .await - .map_err(IntoResponse::into_response)?; + async fn from_request(req: Request, _state: &S) -> Result { + let (parts, body) = req.into_parts(); - Ok(Self(body)) + // this wont work if the body is an long running stream + let bytes = hyper::body::to_bytes(body) + .await + .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?; + + Ok(Self { + headers: parts.headers, + method: parts.method, + uri: parts.uri, + body: bytes.to_vec(), + }) } } diff --git a/src/core/object_id.rs b/src/core/object_id.rs index ec3010f..f427aa1 100644 --- a/src/core/object_id.rs +++ b/src/core/object_id.rs @@ -8,8 +8,41 @@ use std::{ }; use url::Url; -/// We store Url on the heap because it is quite large (88 bytes). -#[derive(Serialize, Deserialize, Debug)] +/// Typed wrapper for Activitypub Object ID. +/// +/// It provides convenient methods for fetching the object from remote server or local database. +/// Objects are automatically cached locally, so they don't have to be fetched every time. Much of +/// the crate functionality relies on this wrapper. +/// +/// Every time an object is fetched via HTTP, [RequestData.request_counter] is incremented by one. +/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with +/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers +/// infinite, recursive fetching of data. +/// +/// ``` +/// # use activitypub_federation::core::object_id::ObjectId; +/// # use activitypub_federation::config::FederationConfig; +/// # use activitypub_federation::Error::NotFound; +/// # use activitypub_federation::traits::tests::{DbConnection, DbUser}; +/// # let _ = actix_rt::System::new(); +/// # actix_rt::Runtime::new().unwrap().block_on(async { +/// # let db_connection = DbConnection; +/// let config = FederationConfig::builder() +/// .hostname("example.com") +/// .app_data(db_connection) +/// .build()?; +/// let request_data = config.to_request_data(); +/// let object_id: ObjectId:: = "https://lemmy.ml/u/nutomic".try_into()?; +/// // Attempt to fetch object from local database or fall back to remote server +/// let user = object_id.dereference(&request_data).await; +/// assert!(user.is_ok()); +/// // Now you can also read the object from local database without network requests +/// let user = object_id.dereference_local(&request_data).await; +/// assert!(user.is_ok()); +/// # Ok::<(), anyhow::Error>(()) +/// # }).unwrap(); +/// ``` +#[derive(Serialize, Deserialize)] #[serde(transparent)] pub struct ObjectId(Box, PhantomData) where @@ -21,6 +54,7 @@ where Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>, { + /// Construct a new objectid instance pub fn new(url: T) -> Self where T: Into, @@ -47,7 +81,7 @@ where let db_object = self.dereference_from_db(data).await?; // if its a local object, only fetch it from the database and not over http - if data.local_instance().is_local_url(&self.0) { + if data.config.is_local_url(&self.0) { return match db_object { None => Err(Error::NotFound.into()), Some(o) => Ok(o), @@ -132,9 +166,6 @@ static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 20; /// Determines when a remote actor should be refetched from its instance. In release builds, this is /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`. -/// -/// TODO it won't pick up new avatars, summaries etc until a day after. -/// Actors need an "update" activity pushed to other servers to fix this. fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool { let update_interval = if cfg!(debug_assertions) { // avoid infinite loop when fetching community outbox @@ -151,10 +182,18 @@ where Kind: ApubObject, for<'de2> ::ApubType: serde::Deserialize<'de2>, { - #[allow(clippy::recursive_format_impl)] fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - // Use to_string here because Url.display is not useful for us - write!(f, "{}", self.0) + write!(f, "{}", self.0.as_str()) + } +} + +impl Debug for ObjectId +where + Kind: ApubObject, + for<'de2> ::ApubType: serde::Deserialize<'de2>, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.as_str()) } } @@ -178,6 +217,18 @@ where } } +impl<'a, Kind> TryFrom<&'a str> for ObjectId +where + Kind: ApubObject + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>, +{ + type Error = url::ParseError; + + fn try_from(value: &'a str) -> Result { + Ok(ObjectId::new(Url::parse(value)?)) + } +} + impl PartialEq for ObjectId where Kind: ApubObject, @@ -189,67 +240,28 @@ where } #[cfg(test)] -mod tests { +pub mod tests { use super::*; - use crate::core::object_id::should_refetch_object; - use anyhow::Error; - - #[derive(Debug, Clone)] - struct TestObject {} - - #[async_trait::async_trait] - impl ApubObject for TestObject { - type DataType = TestObject; - type ApubType = (); - type DbType = (); - type Error = Error; - - async fn read_from_apub_id( - _object_id: Url, - _data: &RequestData, - ) -> Result, Self::Error> - where - Self: Sized, - { - todo!() - } - - async fn into_apub( - self, - _data: &RequestData, - ) -> Result { - todo!() - } - - async fn from_apub( - _apub: Self::ApubType, - _data: &RequestData, - ) -> Result - where - Self: Sized, - { - todo!() - } - } + use crate::{core::object_id::should_refetch_object, traits::tests::DbUser}; #[test] fn test_deserialize() { let url = Url::parse("http://test.com/").unwrap(); - let id = ObjectId::::new(url); + let id = ObjectId::::new(url); let string = serde_json::to_string(&id).unwrap(); assert_eq!("\"http://test.com/\"", string); - let parsed: ObjectId = serde_json::from_str(&string).unwrap(); + let parsed: ObjectId = serde_json::from_str(&string).unwrap(); assert_eq!(parsed, id); } #[test] fn test_should_refetch_object() { let one_second_ago = Utc::now().naive_utc() - ChronoDuration::seconds(1); - assert!(!should_refetch_object(one_second_ago)); + assert_eq!(false, should_refetch_object(one_second_ago)); let two_days_ago = Utc::now().naive_utc() - ChronoDuration::days(2); - assert!(should_refetch_object(two_days_ago)); + assert_eq!(true, should_refetch_object(two_days_ago)); } } diff --git a/src/core/signatures.rs b/src/core/signatures.rs index 4d22c4e..4171c26 100644 --- a/src/core/signatures.rs +++ b/src/core/signatures.rs @@ -1,4 +1,4 @@ -use crate::utils::header_to_map; +use crate::{utils::header_to_map, Error, Error::ActivitySignatureInvalid}; use anyhow::anyhow; use http::{header::HeaderName, uri::PathAndQuery, HeaderValue, Method, Uri}; use http_signature_normalization_reqwest::prelude::{Config, SignExt}; @@ -13,7 +13,7 @@ use reqwest::Request; use reqwest_middleware::RequestBuilder; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; -use std::io::{Error, ErrorKind}; +use std::io::ErrorKind; use tracing::debug; use url::Url; @@ -26,15 +26,15 @@ pub struct Keypair { pub public_key: String, } -/// Generate the asymmetric keypair for ActivityPub HTTP signatures. -pub fn generate_actor_keypair() -> Result { +/// Generate a random asymmetric keypair for ActivityPub HTTP signatures. +pub fn generate_actor_keypair() -> Result { let rsa = Rsa::generate(2048)?; let pkey = PKey::from_rsa(rsa)?; let public_key = pkey.public_key_to_pem()?; let private_key = pkey.private_key_to_pem_pkcs8()?; let key_to_string = |key| match String::from_utf8(key) { Ok(s) => Ok(s), - Err(e) => Err(Error::new( + Err(e) => Err(std::io::Error::new( ErrorKind::Other, format!("Failed converting key to string: {}", e), )), @@ -79,6 +79,8 @@ pub(crate) async fn sign_request( .await } +/// Public key of actors which is used for HTTP signatures. This needs to be federated in the +/// `public_key` field of all actors. #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct PublicKey { @@ -113,7 +115,7 @@ pub fn verify_signature<'a, H>( method: &Method, uri: &Uri, public_key: &str, -) -> Result<(), anyhow::Error> +) -> Result<(), Error> where H: IntoIterator, { @@ -121,7 +123,8 @@ where let path_and_query = uri.path_and_query().map(PathAndQuery::as_str).unwrap_or(""); let verified = CONFIG2 - .begin_verify(method.as_str(), path_and_query, headers)? + .begin_verify(method.as_str(), path_and_query, headers) + .map_err(Error::conv)? .verify(|signature, signing_string| -> anyhow::Result { debug!( "Verifying with key {}, message {}", @@ -131,12 +134,61 @@ where let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?; verifier.update(signing_string.as_bytes())?; Ok(verifier.verify(&base64::decode(signature)?)?) - })?; + }) + .map_err(Error::conv)?; if verified { debug!("verified signature for {}", uri); Ok(()) } else { - Err(anyhow!("Invalid signature on request: {}", uri)) + Err(ActivitySignatureInvalid) } } + +#[derive(Clone, Debug)] +struct DigestPart { + pub algorithm: String, + pub digest: String, +} + +impl DigestPart { + fn try_from_header(h: &HeaderValue) -> Option> { + let h = h.to_str().ok()?.split(';').next()?; + let v: Vec<_> = h + .split(',') + .filter_map(|p| { + let mut iter = p.splitn(2, '='); + iter.next() + .and_then(|alg| iter.next().map(|value| (alg, value))) + }) + .map(|(alg, value)| DigestPart { + algorithm: alg.to_owned(), + digest: value.to_owned(), + }) + .collect(); + + if v.is_empty() { + None + } else { + Some(v) + } + } +} + +/// Verify body of an inbox request against the hash provided in `Digest` header. +pub(crate) fn verify_inbox_hash( + digest_header: Option<&HeaderValue>, + body: &[u8], +) -> Result<(), crate::Error> { + let digests = DigestPart::try_from_header(digest_header.unwrap()).unwrap(); + let mut hasher = Sha256::new(); + + for part in digests { + hasher.update(body); + if base64::encode(hasher.finalize_reset()) != part.digest { + return Err(crate::Error::ActivityBodyDigestInvalid); + } + } + + Ok(()) +} diff --git a/src/deser/helpers.rs b/src/deser/helpers.rs deleted file mode 100644 index f5d4cf1..0000000 --- a/src/deser/helpers.rs +++ /dev/null @@ -1,83 +0,0 @@ -use serde::{Deserialize, Deserializer}; - -/// Deserialize either a single json value, or a json array. In either case, the items are returned -/// as an array. -/// -/// Usage: -/// `#[serde(deserialize_with = "deserialize_one_or_many")]` -pub fn deserialize_one_or_many<'de, T, D>(deserializer: D) -> Result, D::Error> -where - T: Deserialize<'de>, - D: Deserializer<'de>, -{ - #[derive(Deserialize)] - #[serde(untagged)] - enum OneOrMany { - One(T), - Many(Vec), - } - - let result: OneOrMany = Deserialize::deserialize(deserializer)?; - Ok(match result { - OneOrMany::Many(list) => list, - OneOrMany::One(value) => vec![value], - }) -} - -/// Deserialize either a single json value, or a json array with one element. In both cases it -/// returns an array with a single element. -/// -/// Usage: -/// `#[serde(deserialize_with = "deserialize_one")]` -pub fn deserialize_one<'de, T, D>(deserializer: D) -> Result<[T; 1], D::Error> -where - T: Deserialize<'de>, - D: Deserializer<'de>, -{ - #[derive(Deserialize)] - #[serde(untagged)] - enum MaybeArray { - Simple(T), - Array([T; 1]), - } - - let result: MaybeArray = Deserialize::deserialize(deserializer)?; - Ok(match result { - MaybeArray::Simple(value) => [value], - MaybeArray::Array(value) => value, - }) -} - -/// Attempts to deserialize the item. If any error happens, its ignored and the type's default -/// value is returned. -/// -/// Usage: -/// `#[serde(deserialize_with = "deserialize_skip_error")]` -pub fn deserialize_skip_error<'de, T, D>(deserializer: D) -> Result -where - T: Deserialize<'de> + Default, - D: Deserializer<'de>, -{ - let value = serde_json::Value::deserialize(deserializer)?; - let inner = T::deserialize(value).unwrap_or_default(); - Ok(inner) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_deserialize_skip_error() { - #[derive(Debug, Deserialize)] - pub struct MyData { - #[serde(deserialize_with = "deserialize_skip_error")] - pub data: Option, - } - // data has type object - let _: MyData = serde_json::from_str(r#"{ "data": {} }"#).unwrap(); - - // data has type array - let _: MyData = serde_json::from_str(r#"{"data": []}"#).unwrap(); - } -} diff --git a/src/lib.rs b/src/lib.rs index 378f904..7f2bf72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,9 @@ -use crate::{ - core::activity_queue::create_activity_queue, - traits::ActivityHandler, - utils::verify_domains_match, -}; -use async_trait::async_trait; -use background_jobs::Manager; -use derive_builder::Builder; -use dyn_clone::{clone_trait_object, DynClone}; -use reqwest_middleware::ClientWithMiddleware; -use serde::de::DeserializeOwned; -use std::time::Duration; -use url::Url; +//#![deny(missing_docs)] +pub use activitystreams_kinds as kinds; +pub mod config; pub mod core; -pub mod deser; +pub mod protocol; pub mod request_data; pub mod traits; pub mod utils; @@ -21,157 +11,10 @@ pub mod utils; /// Mime type for Activitypub, used for `Accept` and `Content-Type` HTTP headers pub static APUB_JSON_CONTENT_TYPE: &str = "application/activity+json"; -/// Represents configuration for a single, federated instance. There should usually be only one of -/// this per application. -pub struct InstanceConfig { - hostname: String, - client: ClientWithMiddleware, - activity_queue: Manager, - settings: FederationSettings, -} - -impl InstanceConfig { - async fn verify_url_and_domain( - &self, - activity: &Activity, - ) -> Result<(), Error> - where - Activity: ActivityHandler + DeserializeOwned + Send + 'static, - { - verify_domains_match(activity.id(), activity.actor())?; - self.verify_url_valid(activity.id()).await?; - if self.is_local_url(activity.id()) { - return Err(Error::UrlVerificationError( - "Activity was sent from local instance", - )); - } - - Ok(()) - } - - /// Perform some security checks on URLs as mentioned in activitypub spec, and call user-supplied - /// [`InstanceSettings.verify_url_function`]. - /// - /// https://www.w3.org/TR/activitypub/#security-considerations - async fn verify_url_valid(&self, url: &Url) -> Result<(), Error> { - match url.scheme() { - "https" => {} - "http" => { - if !self.settings.debug { - return Err(Error::UrlVerificationError( - "Http urls are only allowed in debug mode", - )); - } - } - _ => return Err(Error::UrlVerificationError("Invalid url scheme")), - }; - - if url.domain().is_none() { - return Err(Error::UrlVerificationError("Url must have a domain")); - } - - if url.domain() == Some("localhost") && !self.settings.debug { - return Err(Error::UrlVerificationError( - "Localhost is only allowed in debug mode", - )); - } - - self.settings - .url_verifier - .verify(url) - .await - .map_err(Error::UrlVerificationError)?; - - Ok(()) - } -} - -#[async_trait] -pub trait UrlVerifier: DynClone + Send { - async fn verify(&self, url: &Url) -> Result<(), &'static str>; -} -clone_trait_object!(UrlVerifier); - -/// Various settings related to Activitypub federation. -/// -/// Use [FederationSettings.builder()] to initialize this. -#[derive(Builder)] -pub struct FederationSettings { - /// Maximum number of outgoing HTTP requests per incoming activity - #[builder(default = "20")] - http_fetch_limit: i32, - /// Number of worker threads for sending outgoing activities - #[builder(default = "64")] - worker_count: u64, - /// Run library in debug mode. This allows usage of http and localhost urls. It also sends - /// outgoing activities synchronously, not in background thread. This helps to make tests - /// more consistent. - /// Do not use for production. - #[builder(default = "false")] - debug: bool, - /// Timeout for all HTTP requests. HTTP signatures are valid for 10s, so it makes sense to - /// use the same as timeout when sending - #[builder(default = "Duration::from_secs(10)")] - request_timeout: Duration, - /// Function used to verify that urls are valid, used when receiving activities or fetching remote - /// objects. Use this to implement functionality like federation blocklists. In case verification - /// fails, it should return an error message. - #[builder(default = "Box::new(DefaultUrlVerifier())")] - url_verifier: Box, - /// Enable to sign HTTP signatures according to draft 10, which does not include (created) and - /// (expires) fields. This is required for compatibility with some software like Pleroma. - /// https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-10 - /// https://git.pleroma.social/pleroma/pleroma/-/issues/2939 - #[builder(default = "false")] - http_signature_compat: bool, -} - -impl FederationSettings { - /// Returns a new settings builder. - pub fn builder() -> FederationSettingsBuilder { - <_>::default() - } -} - -#[derive(Clone)] -struct DefaultUrlVerifier(); - -#[async_trait] -impl UrlVerifier for DefaultUrlVerifier { - async fn verify(&self, _url: &Url) -> Result<(), &'static str> { - Ok(()) - } -} - -impl InstanceConfig { - pub fn new(domain: String, client: ClientWithMiddleware, settings: FederationSettings) -> Self { - let activity_queue = create_activity_queue(client.clone(), &settings); - InstanceConfig { - hostname: domain, - client, - activity_queue, - settings, - } - } - /// Returns true if the url refers to this instance. Handles hostnames like `localhost:8540` for - /// local debugging. - fn is_local_url(&self, url: &Url) -> bool { - let mut domain = url.domain().expect("id has domain").to_string(); - if let Some(port) = url.port() { - domain = format!("{}:{}", domain, port); - } - domain == self.hostname - } - - /// Returns the local hostname - pub fn hostname(&self) -> &str { - &self.hostname - } -} - +/// Error messages returned by this library. #[derive(thiserror::Error, Debug)] pub enum Error { - #[error("Object was not found in database")] + #[error("Object was not found in local database")] NotFound, #[error("Request limit was reached during fetch")] RequestLimit, @@ -181,6 +24,10 @@ pub enum Error { ObjectDeleted, #[error("{0}")] UrlVerificationError(&'static str), + #[error("Incoming activity has invalid digest for body")] + ActivityBodyDigestInvalid, + #[error("incoming activity has invalid signature")] + ActivitySignatureInvalid, #[error(transparent)] Other(#[from] anyhow::Error), } @@ -193,3 +40,9 @@ impl Error { Error::Other(error.into()) } } + +impl PartialEq for Error { + fn eq(&self, other: &Self) -> bool { + std::mem::discriminant(self) == std::mem::discriminant(other) + } +} diff --git a/src/deser/context.rs b/src/protocol/context.rs similarity index 54% rename from src/deser/context.rs rename to src/protocol/context.rs index 8981361..fedccb9 100644 --- a/src/deser/context.rs +++ b/src/protocol/context.rs @@ -1,5 +1,5 @@ use crate::{ - deser::helpers::deserialize_one_or_many, + protocol::helpers::deserialize_one_or_many, request_data::RequestData, traits::ActivityHandler, }; @@ -8,10 +8,29 @@ use serde_json::Value; use std::str::FromStr; use url::Url; +/// Default context used in Activitypub const DEFAULT_CONTEXT: &str = "[\"https://www.w3.org/ns/activitystreams\"]"; -/// Simple wrapper which adds json-ld context to an object or activity. Doing it this way ensures -/// that nested objects dont have any context, but only the outermost one. +/// Wrapper for federated structs which handles `@context` field. +/// +/// This wrapper can be used when sending Activitypub data, to automatically add `@context`. It +/// avoids having to repeat the `@context` property on every struct, and getting multiple contexts +/// in nested structs. +/// +/// ``` +/// # use activitypub_federation::protocol::context::WithContext; +/// #[derive(serde::Serialize)] +/// struct Note { +/// content: String +/// } +/// let note = Note { +/// content: "Hello world".to_string() +/// }; +/// let note_with_context = WithContext::new_default(note); +/// let serialized = serde_json::to_string(¬e_with_context)?; +/// assert_eq!(serialized, r#"{"@context":[["https://www.w3.org/ns/activitystreams"]],"content":"Hello world"}"#); +/// Ok::<(), serde_json::error::Error>(()) +/// ``` #[derive(Serialize, Deserialize, Debug)] pub struct WithContext { #[serde(rename = "@context")] @@ -22,11 +41,13 @@ pub struct WithContext { } impl WithContext { + /// Create a new wrapper with the default Activitypub context. pub fn new_default(inner: T) -> WithContext { let context = vec![Value::from_str(DEFAULT_CONTEXT).expect("valid context")]; WithContext::new(inner, context) } + /// Create new wrapper with custom context. Use this in case you are implementing extensions. pub fn new(inner: T, context: Vec) -> WithContext { WithContext { context, inner } } @@ -39,7 +60,7 @@ impl WithContext { #[async_trait::async_trait] impl ActivityHandler for WithContext where - T: ActivityHandler + Send + Sync, + T: ActivityHandler + Send, { type DataType = ::DataType; type Error = ::Error; diff --git a/src/protocol/helpers.rs b/src/protocol/helpers.rs new file mode 100644 index 0000000..1e62e30 --- /dev/null +++ b/src/protocol/helpers.rs @@ -0,0 +1,115 @@ +use serde::{Deserialize, Deserializer}; + +/// Deserialize JSON single value or array into Vec. +/// +/// Useful if your application can handle multiple values for a field, but another federated +/// platform only sends a single one. +/// +/// ``` +/// # use activitypub_federation::protocol::helpers::deserialize_one_or_many; +/// # use url::Url; +/// #[derive(serde::Deserialize)] +/// struct Note { +/// #[serde(deserialize_with = "deserialize_one_or_many")] +/// to: Vec +/// } +/// +/// let single: Note = serde_json::from_str(r#"{"to": "https://example.com/u/alice" }"#)?; +/// assert_eq!(single.to.len(), 1); +/// +/// let multiple: Note = serde_json::from_str( +/// r#"{"to": [ +/// "https://example.com/u/alice", +/// "https://lemmy.ml/u/bob" +/// ]}"#)?; +/// assert_eq!(multiple.to.len(), 2); +/// Ok::<(), anyhow::Error>(()) +pub fn deserialize_one_or_many<'de, T, D>(deserializer: D) -> Result, D::Error> +where + T: Deserialize<'de>, + D: Deserializer<'de>, +{ + #[derive(Deserialize)] + #[serde(untagged)] + enum OneOrMany { + One(T), + Many(Vec), + } + + let result: OneOrMany = Deserialize::deserialize(deserializer)?; + Ok(match result { + OneOrMany::Many(list) => list, + OneOrMany::One(value) => vec![value], + }) +} + +/// Deserialize JSON single value or single element array into single value. +/// +/// Useful if your application can only handle a single value for a field, but another federated +/// platform sends single value wrapped in array. Fails if array contains multiple items. +/// +/// ``` +/// # use activitypub_federation::protocol::helpers::deserialize_one; +/// # use url::Url; +/// #[derive(serde::Deserialize)] +/// struct Note { +/// #[serde(deserialize_with = "deserialize_one")] +/// to: Url +/// } +/// +/// let note = serde_json::from_str::(r#"{"to": ["https://example.com/u/alice"] }"#); +/// assert!(note.is_ok()); +pub fn deserialize_one<'de, T, D>(deserializer: D) -> Result +where + T: Deserialize<'de>, + D: Deserializer<'de>, +{ + #[derive(Deserialize)] + #[serde(untagged)] + enum MaybeArray { + Simple(T), + Array([T; 1]), + } + + let result: MaybeArray = Deserialize::deserialize(deserializer)?; + Ok(match result { + MaybeArray::Simple(value) => value, + MaybeArray::Array([value]) => value, + }) +} + +/// Attempts to deserialize item, in case of error falls back to the type's default value. +/// +/// Useful for optional fields which are sent with a different type from another platform, +/// eg object instead of array. Should always be used together with `#[serde(default)]`, so +/// that a mssing value doesn't cause an error. +/// +/// ``` +/// # use activitypub_federation::protocol::helpers::deserialize_skip_error; +/// # use url::Url; +/// #[derive(serde::Deserialize)] +/// struct Note { +/// content: String, +/// #[serde(deserialize_with = "deserialize_skip_error", default)] +/// source: Option +/// } +/// +/// let note = serde_json::from_str::( +/// r#"{ +/// "content": "How are you?", +/// "source": { +/// "content": "How are you?", +/// "mediaType": "text/markdown" +/// } +/// }"#); +/// assert_eq!(note.unwrap().source, None); +/// # Ok::<(), anyhow::Error>(()) +pub fn deserialize_skip_error<'de, T, D>(deserializer: D) -> Result +where + T: Deserialize<'de> + Default, + D: Deserializer<'de>, +{ + let value = serde_json::Value::deserialize(deserializer)?; + let inner = T::deserialize(value).unwrap_or_default(); + Ok(inner) +} diff --git a/src/deser/mod.rs b/src/protocol/mod.rs similarity index 100% rename from src/deser/mod.rs rename to src/protocol/mod.rs diff --git a/src/deser/values.rs b/src/protocol/values.rs similarity index 96% rename from src/deser/values.rs rename to src/protocol/values.rs index 06bc7b6..fb83b85 100644 --- a/src/deser/values.rs +++ b/src/protocol/values.rs @@ -14,7 +14,7 @@ //! ``` //! use serde_json::from_str; //! use serde::{Deserialize, Serialize}; -//! use activitypub_federation::deser::values::MediaTypeMarkdown; +//! use activitypub_federation::protocol::values::MediaTypeMarkdown; //! //! #[derive(Deserialize, Serialize)] //! struct MyObject { diff --git a/src/request_data.rs b/src/request_data.rs index 57656c9..8f0099c 100644 --- a/src/request_data.rs +++ b/src/request_data.rs @@ -1,60 +1,23 @@ -use crate::InstanceConfig; -use std::{ - ops::Deref, - sync::{atomic::AtomicI32, Arc}, -}; +use crate::config::FederationConfig; +use std::{ops::Deref, sync::atomic::AtomicI32}; -/// Stores context data which is necessary for the library to work. -#[derive(Clone)] -pub struct ApubContext { - /// Data which the application requires in handlers, such as database connection - /// or configuration. - application_data: Arc, - /// Configuration of this library. - pub(crate) local_instance: Arc, -} - -impl ApubContext { - pub fn new(state: T, local_instance: InstanceConfig) -> ApubContext { - ApubContext { - application_data: Arc::new(state), - local_instance: Arc::new(local_instance), - } - } - pub fn local_instance(&self) -> &InstanceConfig { - self.local_instance.deref() - } - - /// Create new [RequestData] from this. You should prefer to use a middleware if possible. - pub fn to_request_data(&self) -> RequestData { - RequestData { - apub_context: self.clone(), - request_counter: AtomicI32::default(), - } - } -} - -/// Stores data for handling one specific HTTP request. Most importantly this contains a -/// counter for outgoing HTTP requests. This is necessary to prevent denial of service attacks, -/// where an attacker triggers fetching of recursive objects. +/// Stores data for handling one specific HTTP request. /// -/// https://www.w3.org/TR/activitypub/#security-recursive-objects -pub struct RequestData { - pub(crate) apub_context: ApubContext, +/// Most importantly this contains a counter for outgoing HTTP requests. This is necessary to +/// prevent denial of service attacks, where an attacker triggers fetching of recursive objects. +/// +/// +pub struct RequestData { + pub(crate) config: FederationConfig, pub(crate) request_counter: AtomicI32, } -impl RequestData { - pub fn local_instance(&self) -> &InstanceConfig { - self.apub_context.local_instance.deref() +impl RequestData { + pub fn app_data(&self) -> &T { + &self.config.app_data } -} - -impl Deref for ApubContext { - type Target = T; - - fn deref(&self) -> &T { - &self.application_data + pub fn hostname(&self) -> &str { + &self.config.hostname } } @@ -62,15 +25,15 @@ impl Deref for RequestData { type Target = T; fn deref(&self) -> &T { - &self.apub_context.application_data + &self.config.app_data } } #[derive(Clone)] -pub struct ApubMiddleware(pub(crate) ApubContext); +pub struct ApubMiddleware(pub(crate) FederationConfig); impl ApubMiddleware { - pub fn new(apub_context: ApubContext) -> Self { - ApubMiddleware(apub_context) + pub fn new(config: FederationConfig) -> Self { + ApubMiddleware(config) } } diff --git a/src/traits.rs b/src/traits.rs index 4cb48f0..e5f7ff7 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -1,13 +1,169 @@ use crate::request_data::RequestData; +use async_trait::async_trait; use chrono::NaiveDateTime; use std::ops::Deref; use url::Url; -/// Trait which allows verification and reception of incoming activities. -#[async_trait::async_trait] +/// Helper for converting between database structs and federated protocol structs. +/// +/// ``` +/// # use url::Url; +/// # use activitypub_federation::core::signatures::PublicKey; +/// # use activitypub_federation::request_data::RequestData; +/// # use activitypub_federation::traits::ApubObject; +/// # use activitypub_federation::traits::tests::{DbConnection, Person}; +/// # pub struct DbUser { +/// # pub name: String, +/// # pub ap_id: Url, +/// # pub inbox: Url, +/// # pub public_key: String, +/// # pub local: bool, +/// # } +/// +/// #[async_trait::async_trait] +/// impl ApubObject for DbUser { +/// type DataType = DbConnection; +/// type ApubType = Person; +/// type Error = anyhow::Error; +/// +/// async fn read_from_apub_id(object_id: Url, data: &RequestData) -> Result, Self::Error> { +/// // Attempt to read object from local database. Return Ok(None) if not found. +/// let user: Option = data.read_user_from_apub_id(object_id).await?; +/// Ok(user) +/// } +/// +/// async fn into_apub(self, data: &RequestData) -> Result { +/// // Called when a local object gets sent out over Activitypub. Simply convert it to the +/// // protocol struct +/// Ok(Person { +/// kind: Default::default(), +/// preferred_username: self.name, +/// id: self.ap_id.clone().into(), +/// inbox: self.inbox, +/// public_key: PublicKey::new_main_key(self.ap_id, self.public_key), +/// }) +/// } +/// +/// async fn from_apub(apub: Self::ApubType, data: &RequestData) -> Result { +/// // Called when a remote object gets received over Activitypub. Validate and insert it +/// // into the database. +/// +/// let user = DbUser { +/// name: apub.preferred_username, +/// ap_id: apub.id.into_inner(), +/// inbox: apub.inbox, +/// public_key: apub.public_key.public_key_pem, +/// local: false, +/// }; +/// +/// // Make sure not to overwrite any local object +/// // TODO: this should be handled by library so the method doesnt get called for local object +/// if data.hostname() == user.ap_id.domain().unwrap() { +/// // Activitypub doesnt distinguish between creating and updating an object. Thats why we +/// // need to use upsert functionality here +/// data.upsert(&user).await?; +/// } +/// Ok(user) +/// } +/// +/// } +#[async_trait] +pub trait ApubObject: Sized { + /// App data type passed to handlers. Must be identical to [crate::config::FederationConfig::app_data]. + type DataType: Clone + Send + Sync; + /// The type of protocol struct which gets sent over network to federate this database struct. + type ApubType; + /// Error type returned by handler methods + type Error; + + /// Returns the last time this object was updated. + /// + /// Used to avoid refetching an object over HTTP every time it is dereferenced. Only called + /// for remote objects. + fn last_refreshed_at(&self) -> Option { + None + } + + /// Try to read the object with given `id` from local database. + /// + /// Should return `Ok(None)` if not found. + async fn read_from_apub_id( + object_id: Url, + data: &RequestData, + ) -> Result, Self::Error>; + + /// Mark remote object as deleted in local database. + /// + /// Called when a `Delete` activity is received, or if fetch returns a `Tombstone` object. + async fn delete(self, _data: &RequestData) -> Result<(), Self::Error> { + Ok(()) + } + + /// Convert database type to Activitypub type. + /// + /// Called when a local object gets fetched by another instance over HTTP, or when an object + /// gets sent in an activity. + async fn into_apub( + self, + data: &RequestData, + ) -> Result; + + /// Convert object from ActivityPub type to database type. + /// + /// Called when an object is received from HTTP fetch or as part of an activity. This method + /// should do verification and write the received object to database. Note that there is no + /// distinction between create and update, so an `upsert` operation should be used. + async fn from_apub( + apub: Self::ApubType, + data: &RequestData, + ) -> Result; +} + +/// Handler for receiving incoming activities. +/// +/// ``` +/// # use activitystreams_kinds::activity::FollowType; +/// # use url::Url; +/// # use activitypub_federation::core::object_id::ObjectId; +/// # use activitypub_federation::request_data::RequestData; +/// # use activitypub_federation::traits::ActivityHandler; +/// # use activitypub_federation::traits::tests::{DbConnection, DbUser}; +/// #[derive(serde::Deserialize)] +/// struct Follow { +/// actor: ObjectId, +/// object: ObjectId, +/// #[serde(rename = "type")] +/// kind: FollowType, +/// id: Url, +/// } +/// +/// #[async_trait::async_trait] +/// impl ActivityHandler for Follow { +/// type DataType = DbConnection; +/// type Error = anyhow::Error; +/// +/// fn id(&self) -> &Url { +/// &self.id +/// } +/// +/// fn actor(&self) -> &Url { +/// self.actor.inner() +/// } +/// +/// async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { +/// let local_user = self.object.dereference(data).await?; +/// let follower = self.actor.dereference(data).await?; +/// data.add_follower(local_user, follower).await?; +/// Ok(()) +/// } +/// } +/// ``` +#[async_trait] #[enum_delegate::register] pub trait ActivityHandler { - type DataType: Send + Sync; + /// App data type passed to handlers. Must be identical to [crate::config::FederationConfig::app_data]. + type DataType: Clone + Send + Sync; + /// Error type returned by handler methods type Error; /// `id` field of the activity @@ -16,12 +172,34 @@ pub trait ActivityHandler { /// `actor` field of activity fn actor(&self) -> &Url; - /// Receives the activity and stores its action in database. + /// Called when an activity is received. + /// + /// Should perform validation and possibly write action to the database. In case the activity + /// has a nested `object` field, must call `object.from_apub` handler. async fn receive(self, data: &RequestData) -> Result<(), Self::Error>; } +/// Trait to allow retrieving common Actor data. +pub trait Actor: ApubObject { + /// The actor's public key for verification of HTTP signatures + fn public_key(&self) -> &str; + + /// The inbox where activities for this user should be sent to + fn inbox(&self) -> Url; + + /// The actor's shared inbox, if any + fn shared_inbox(&self) -> Option { + None + } + + /// Returns shared inbox if it exists, normal inbox otherwise. + fn shared_inbox_or_inbox(&self) -> Url { + self.shared_inbox().unwrap_or_else(|| self.inbox()) + } +} + /// Allow for boxing of enum variants -#[async_trait::async_trait] +#[async_trait] impl ActivityHandler for Box where T: ActivityHandler + Send, @@ -42,69 +220,150 @@ where } } -#[async_trait::async_trait] -pub trait ApubObject { - type DataType: Send + Sync; - type ApubType; - type DbType; - type Error; +/// Some impls of these traits for use in tests. Dont use this from external crates. +/// +/// TODO: Should be using `cfg[doctest]` but blocked by +#[doc(hidden)] +pub mod tests { + use super::*; + use crate::core::{ + object_id::ObjectId, + signatures::{generate_actor_keypair, Keypair, PublicKey}, + }; + use activitystreams_kinds::{activity::FollowType, actor::PersonType}; + use anyhow::Error; + use once_cell::sync::Lazy; + use serde::{Deserialize, Serialize}; - /// If the object is stored in the database, this method should return the fetch time. Used to - /// update actors after certain interval. - fn last_refreshed_at(&self) -> Option { - None + #[derive(Clone)] + pub struct DbConnection; + + impl DbConnection { + pub async fn read_user_from_apub_id(&self, _: Url) -> Result, Error> { + Ok(None) + } + pub async fn read_local_user(&self, _: String) -> Result { + todo!() + } + pub async fn upsert(&self, _: &T) -> Result<(), Error> { + Ok(()) + } + pub async fn add_follower(&self, _: DbUser, _: DbUser) -> Result<(), Error> { + Ok(()) + } } - /// Try to read the object with given ID from local database. Returns Ok(None) if it doesn't exist. - async fn read_from_apub_id( - object_id: Url, - data: &RequestData, - ) -> Result, Self::Error> - where - Self: Sized; - - /// Marks the object as deleted in local db. Called when a delete activity is received, or if - /// fetch returns a tombstone. - async fn delete(self, _data: &RequestData) -> Result<(), Self::Error> - where - Self: Sized, - { - Ok(()) + #[derive(Clone, Debug, Deserialize, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct Person { + #[serde(rename = "type")] + pub kind: PersonType, + pub preferred_username: String, + pub id: ObjectId, + pub inbox: Url, + pub public_key: PublicKey, + } + #[derive(Debug, Clone)] + pub struct DbUser { + pub name: String, + pub ap_id: Url, + pub inbox: Url, + // exists for all users (necessary to verify http signatures) + pub public_key: String, + // exists only for local users + private_key: Option, + pub followers: Vec, + pub local: bool, } - /// Trait for converting an object or actor into the respective ActivityPub type. - async fn into_apub( - self, - data: &RequestData, - ) -> Result; + pub static DB_USER_KEYPAIR: Lazy = Lazy::new(|| generate_actor_keypair().unwrap()); - /// Converts an object from ActivityPub type to Lemmy internal type. - /// - /// * `apub` The object to read from - /// * `context` LemmyContext which holds DB pool, HTTP client etc - /// * `expected_domain` Domain where the object was received from. None in case of mod action. - /// * `mod_action_allowed` True if the object can be a mod activity, ignore `expected_domain` in this case - async fn from_apub( - apub: Self::ApubType, - data: &RequestData, - ) -> Result - where - Self: Sized; -} + #[async_trait] + impl ApubObject for DbUser { + type DataType = DbConnection; + type ApubType = Person; + type Error = Error; -pub trait Actor: ApubObject { - /// Returns the actor's public key for verification of HTTP signatures - fn public_key(&self) -> &str; + async fn read_from_apub_id( + object_id: Url, + _data: &RequestData, + ) -> Result, Self::Error> { + Ok(Some(DbUser { + name: "".to_string(), + ap_id: object_id.clone().into(), + inbox: object_id.into(), + public_key: DB_USER_KEYPAIR.public_key.clone(), + private_key: None, + followers: vec![], + local: false, + })) + } - /// The inbox where activities for this user should be sent to - fn inbox(&self) -> Url; + async fn into_apub( + self, + _data: &RequestData, + ) -> Result { + let public_key = PublicKey::new_main_key(self.ap_id.clone(), self.public_key.clone()); + Ok(Person { + preferred_username: self.name.clone(), + kind: Default::default(), + id: self.ap_id.into(), + inbox: self.inbox, + public_key, + }) + } - /// The actor's shared inbox, if any - fn shared_inbox(&self) -> Option { - None + async fn from_apub( + apub: Self::ApubType, + _data: &RequestData, + ) -> Result { + Ok(DbUser { + name: apub.preferred_username, + ap_id: apub.id.into(), + inbox: apub.inbox, + public_key: apub.public_key.public_key_pem, + private_key: None, + followers: vec![], + local: false, + }) + } } - fn shared_inbox_or_inbox(&self) -> Url { - self.shared_inbox().unwrap_or_else(|| self.inbox()) + impl Actor for DbUser { + fn public_key(&self) -> &str { + &self.public_key + } + + fn inbox(&self) -> Url { + todo!() + } + } + + #[derive(Deserialize, Serialize, Clone, Debug)] + #[serde(rename_all = "camelCase")] + pub struct Follow { + pub actor: ObjectId, + pub object: ObjectId, + #[serde(rename = "type")] + pub kind: FollowType, + pub id: Url, + } + + #[async_trait] + impl ActivityHandler for Follow { + type DataType = DbConnection; + type Error = Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { + Ok(()) + } } } diff --git a/src/utils.rs b/src/utils/mod.rs similarity index 55% rename from src/utils.rs rename to src/utils/mod.rs index b910b93..86dc0e4 100644 --- a/src/utils.rs +++ b/src/utils/mod.rs @@ -12,26 +12,36 @@ use url::Url; pub(crate) mod reqwest_shim; -pub async fn fetch_object_http( +/// Fetch a remote object over HTTP and convert to `Kind`. +/// +/// [crate::core::object_id::ObjectId::dereference] wraps this function to add caching and +/// conversion to database type. Only use this function directly in exceptional cases where that +/// behaviour is undesired. +/// +/// Every time an object is fetched via HTTP, [RequestData.request_counter] is incremented by one. +/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with +/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers +/// infinite, recursive fetching of data. +pub async fn fetch_object_http( url: &Url, data: &RequestData, ) -> Result { - let instance = &data.local_instance(); + let config = &data.config; // dont fetch local objects this way - debug_assert!(url.domain() != Some(&instance.hostname)); - instance.verify_url_valid(url).await?; + debug_assert!(url.domain() != Some(&config.hostname)); + config.verify_url_valid(url).await?; info!("Fetching remote object {}", url.to_string()); let counter = data.request_counter.fetch_add(1, Ordering::SeqCst); - if counter > instance.settings.http_fetch_limit { + if counter > config.http_fetch_limit { return Err(Error::RequestLimit); } - let res = instance + let res = config .client .get(url.as_str()) .header("Accept", APUB_JSON_CONTENT_TYPE) - .timeout(instance.settings.request_timeout) + .timeout(config.request_timeout) .send() .await .map_err(Error::conv)?; @@ -44,6 +54,15 @@ pub async fn fetch_object_http( } /// Check that both urls have the same domain. If not, return UrlVerificationError. +/// +/// ``` +/// # use url::Url; +/// # use activitypub_federation::utils::verify_domains_match; +/// let a = Url::parse("https://example.com/abc")?; +/// let b = Url::parse("https://sample.net/abc")?; +/// assert!(verify_domains_match(&a, &b).is_err()); +/// # Ok::<(), url::ParseError>(()) +/// ``` pub fn verify_domains_match(a: &Url, b: &Url) -> Result<(), Error> { if a.domain() != b.domain() { return Err(Error::UrlVerificationError("Domains do not match")); @@ -52,6 +71,15 @@ pub fn verify_domains_match(a: &Url, b: &Url) -> Result<(), Error> { } /// Check that both urls are identical. If not, return UrlVerificationError. +/// +/// ``` +/// # use url::Url; +/// # use activitypub_federation::utils::verify_urls_match; +/// let a = Url::parse("https://example.com/abc")?; +/// let b = Url::parse("https://example.com/123")?; +/// assert!(verify_urls_match(&a, &b).is_err()); +/// # Ok::<(), url::ParseError>(()) +/// ``` pub fn verify_urls_match(a: &Url, b: &Url) -> Result<(), Error> { if a != b { return Err(Error::UrlVerificationError("Urls do not match"));