From 9332c8145850d1c37595b2f2aaf88d19bce99b6b Mon Sep 17 00:00:00 2001 From: Paul Delafosse Date: Mon, 28 Nov 2022 22:19:56 +0100 Subject: [PATCH] feat: add axum compat (#12) * feat: add actix feature flag * (WIP)feat: add axum feature * WIP: axum veridy digest + example Note: this does not compile yet * WIP * chore: clippy lints * Use actix rt for axum example * ci: run example in CI for both actix and axum * feat: add json wrapper type for axum * docs: update readme with actix and axum feature flags * fix: fix ci * chore: more clippy lints * refactor: update according to PR comment and factorize 'verify_digest' --- .drone.yml | 11 +- Cargo.lock | 267 ++++++++++++++++-- Cargo.toml | 39 ++- README.md | 20 +- .../activities/accept.rs | 2 +- .../activities/create_note.rs | 2 +- .../activities/follow.rs | 13 +- .../activities/mod.rs | 0 .../{federation => federation-actix}/error.rs | 4 +- .../instance.rs | 14 +- .../{federation => federation-actix}/main.rs | 0 .../objects/mod.rs | 0 .../objects/note.rs | 2 +- .../objects/person.rs | 4 +- .../{federation => federation-actix}/utils.rs | 0 examples/federation-axum/activities/accept.rs | 56 ++++ .../federation-axum/activities/create_note.rs | 70 +++++ examples/federation-axum/activities/follow.rs | 89 ++++++ examples/federation-axum/activities/mod.rs | 3 + examples/federation-axum/error.rs | 24 ++ examples/federation-axum/instance.rs | 160 +++++++++++ examples/federation-axum/main.rs | 48 ++++ examples/federation-axum/objects/mod.rs | 2 + examples/federation-axum/objects/note.rs | 92 ++++++ examples/federation-axum/objects/person.rs | 195 +++++++++++++ examples/federation-axum/utils.rs | 13 + src/core/activity_queue.rs | 8 +- src/core/actix/inbox.rs | 51 ++++ src/core/actix/mod.rs | 1 + src/core/axum/digest.rs | 45 +++ src/core/axum/inbox.rs | 47 +++ src/core/axum/json.rs | 32 +++ src/core/axum/mod.rs | 80 ++++++ src/core/inbox.rs | 53 ---- src/core/mod.rs | 7 +- src/core/object_id.rs | 2 +- src/core/signatures.rs | 69 +++-- src/deser/context.rs | 4 +- src/lib.rs | 63 ++++- src/signature.rs | 0 src/traits.rs | 12 +- src/utils.rs | 51 ++-- 42 files changed, 1465 insertions(+), 190 deletions(-) rename examples/{federation => federation-actix}/activities/accept.rs (97%) rename examples/{federation => federation-actix}/activities/create_note.rs (97%) rename examples/{federation => federation-actix}/activities/follow.rs (88%) rename examples/{federation => federation-actix}/activities/mod.rs (100%) rename examples/{federation => federation-actix}/error.rs (100%) rename examples/{federation => federation-actix}/instance.rs (91%) rename examples/{federation => federation-actix}/main.rs (100%) rename examples/{federation => federation-actix}/objects/mod.rs (100%) rename examples/{federation => federation-actix}/objects/note.rs (98%) rename examples/{federation => federation-actix}/objects/person.rs (98%) rename examples/{federation => federation-actix}/utils.rs (100%) create mode 100644 examples/federation-axum/activities/accept.rs create mode 100644 examples/federation-axum/activities/create_note.rs create mode 100644 examples/federation-axum/activities/follow.rs create mode 100644 examples/federation-axum/activities/mod.rs create mode 100644 examples/federation-axum/error.rs create mode 100644 examples/federation-axum/instance.rs create mode 100644 examples/federation-axum/main.rs create mode 100644 examples/federation-axum/objects/mod.rs create mode 100644 examples/federation-axum/objects/note.rs create mode 100644 examples/federation-axum/objects/person.rs create mode 100644 examples/federation-axum/utils.rs create mode 100644 src/core/actix/inbox.rs create mode 100644 src/core/actix/mod.rs create mode 100644 src/core/axum/digest.rs create mode 100644 src/core/axum/inbox.rs create mode 100644 src/core/axum/json.rs create mode 100644 src/core/axum/mod.rs create mode 100644 src/signature.rs diff --git a/.drone.yml b/.drone.yml index 830185a..af9793f 100644 --- a/.drone.yml +++ b/.drone.yml @@ -42,10 +42,17 @@ steps: commands: - cargo test --workspace --no-fail-fast - - name: cargo run + - name: cargo run actix image: rust:1.61-bullseye environment: CARGO_HOME: .cargo RUST_BACKTRACE: 1 commands: - - cargo run -p activitypub_federation --example federation + - cargo run --example simple_federation_actix + - name: cargo run axum + image: rust:1.61-bullseye + environment: + CARGO_HOME: .cargo + RUST_BACKTRACE: 1 + commands: + - cargo run --example simple_federation_axum --features axum diff --git a/Cargo.lock b/Cargo.lock index 668616f..c4ff2c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,7 @@ dependencies = [ "actix-web", "anyhow", "async-trait", + "axum", "background-jobs", "base64", "chrono", @@ -19,9 +20,10 @@ dependencies = [ "enum_delegate", "env_logger", "http", - "http-signature-normalization-actix", + "http-signature-normalization", "http-signature-normalization-reqwest", "httpdate", + "hyper", "itertools", "once_cell", "openssl", @@ -33,7 +35,10 @@ dependencies = [ "sha2", "thiserror", "tokio", + "tower", + "tower-http", "tracing", + "tracing-subscriber", "url", ] @@ -270,6 +275,70 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744864363a200a5e724a7e61bc8c11b6628cf2e3ec519c8a1a48e609a8156b40" +dependencies = [ + "async-trait", + "axum-core", + "axum-macros", + "bitflags", + "bytes", + "futures-util", + "headers", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4df0fc33ada14a338b799002f7e8657711422b25d4e16afb032708d6b185621" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "background-jobs" version = "0.13.0" @@ -753,6 +822,37 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "headers" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" +dependencies = [ + "base64", + "bitflags", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -784,6 +884,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "http-signature-normalization" version = "0.6.0" @@ -793,26 +899,6 @@ dependencies = [ "httpdate", ] -[[package]] -name = "http-signature-normalization-actix" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86dfd54a1764ad79376b8dbf29e5bf918a463eb5ec66c90cd0388508289af6f0" -dependencies = [ - "actix-http", - "actix-rt", - "actix-web", - "base64", - "futures-util", - "http-signature-normalization", - "sha2", - "thiserror", - "tokio", - "tracing", - "tracing-error", - "tracing-futures", -] - [[package]] name = "http-signature-normalization-reqwest" version = "0.7.1" @@ -1031,6 +1117,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "matchit" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dfc802da7b1cf80aefffa0c7b2f77247c8b32206cc83c270b61264f5b360a80" + [[package]] name = "memchr" version = "2.5.0" @@ -1083,6 +1184,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -1163,6 +1274,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1310,6 +1427,15 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.6.28" @@ -1387,6 +1513,12 @@ dependencies = [ "semver", ] +[[package]] +name = "rustversion" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" + [[package]] name = "ryu" version = "1.0.11" @@ -1476,6 +1608,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "184c643044780f7ceb59104cef98a5a6f12cb2288a7bc701ab93a362b49fd47d" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1570,6 +1711,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "task-local-extensions" version = "0.1.3" @@ -1684,6 +1831,7 @@ dependencies = [ "libc", "memchr", "mio", + "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -1727,6 +1875,48 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -1764,16 +1954,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", -] - -[[package]] -name = "tracing-error" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e" -dependencies = [ - "tracing", - "tracing-subscriber", + "valuable", ] [[package]] @@ -1786,15 +1967,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", + "smallvec", "thread_local", + "tracing", "tracing-core", + "tracing-log", ] [[package]] @@ -1867,6 +2066,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index b9494f3..1938cf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,9 +22,6 @@ openssl = "0.10.42" once_cell = "1.16.0" http = "0.2.8" sha2 = "0.10.6" -actix-web = { version = "4.2.1", default-features = false } -http-signature-normalization-actix = { version = "0.6.1", default-features = false, features = ["server", "sha-2"] } -http-signature-normalization-reqwest = { version = "0.7.1", default-features = false, features = ["sha-2", "middleware"] } background-jobs = "0.13.0" thiserror = "1.0.37" derive_builder = "0.11.2" @@ -32,10 +29,44 @@ itertools = "0.10.5" dyn-clone = "1.0.9" enum_delegate = "0.2.0" httpdate = "1.0.2" +http-signature-normalization-reqwest = { version = "0.7.1", default-features = false, features = ["sha-2", "middleware"] } +http-signature-normalization = "0.6.0" +actix-rt = { version = "2.7.0" } + +actix-web = { version = "4.2.1", default-features = false, optional = true } +axum = { version = "0.6.0", features = ["json", "headers", "macros", "original-uri"], optional = true } + +# Axum +tower-http = { version = "0.3", features = ["map-request-body", "util", "trace"], optional = true } +tower = { version = "0.4.13", optional = true } +hyper = { version = "0.14", optional = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true } + +[features] +default = ["actix"] +actix = ["dep:actix-web"] +axum = [ + "dep:axum", + "dep:tower-http", + "dep:tower", + "dep:hyper", + "dep:tracing-subscriber", +] [dev-dependencies] activitystreams-kinds = "0.2.1" rand = "0.8.5" actix-rt = "2.7.0" -tokio = "1.21.2" +tokio = { version = "1.21.2", features = ["full"] } env_logger = { version = "0.9.3", default-features = false } + +[[example]] +name = "simple_federation_actix" +path = "examples/federation-actix/main.rs" +required-features = ["actix"] + +[[example]] +name = "simple_federation_axum" +path = "examples/federation-axum/main.rs" +required-features = ["axum"] + diff --git a/README.md b/README.md index 59c6088..7f516ae 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,25 @@ You can join the Matrix channel [#activitystreams:asonix.dog](https://matrix.to/ ## How to use -To get started, have a look at the [API documentation](https://docs.rs/activitypub_federation/0.2.0/activitypub_federation/) and example code. You can also find some [ActivityPub resources in the Lemmy documentation](https://join-lemmy.org/docs/en/contributing/resources.html#activitypub-resources). If anything is unclear, please open an issue for clarification. For a more advanced implementation, take a look at the [Lemmy federation code](https://github.com/LemmyNet/lemmy/tree/main/crates/apub). + +To get started, have a look at the [API documentation](https://docs.rs/activitypub_federation/0.2.0/activitypub_federation/) +and [example code](https://github.com/LemmyNet/lemmy/tree/main/example/). You can also find some [ActivityPub resources in the Lemmy documentation](https://join-lemmy.org/docs/en/contributing/resources.html#activitypub-resources). +If anything is unclear, please open an issue for clarification. For a more advanced implementation, +take a look at the [Lemmy federation code](https://github.com/LemmyNet/lemmy/tree/main/crates/apub). + +Currently supported frameworks include [actix](https://actix.rs/) and [axum](https://github.com/tokio-rs/axum): + +**actix:** + +```toml +activitypub_federation = { version = "*", features = ["actix"] } +``` + +**axum:** + +```toml +activitypub_federation = { version = "*", features = ["axum"] } +``` ## Roadmap diff --git a/examples/federation/activities/accept.rs b/examples/federation-actix/activities/accept.rs similarity index 97% rename from examples/federation/activities/accept.rs rename to examples/federation-actix/activities/accept.rs index 1b306b2..1022685 100644 --- a/examples/federation/activities/accept.rs +++ b/examples/federation-actix/activities/accept.rs @@ -25,7 +25,7 @@ impl Accept { } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl ActivityHandler for Accept { type DataType = InstanceHandle; type Error = crate::error::Error; diff --git a/examples/federation/activities/create_note.rs b/examples/federation-actix/activities/create_note.rs similarity index 97% rename from examples/federation/activities/create_note.rs rename to examples/federation-actix/activities/create_note.rs index b03327c..3d28878 100644 --- a/examples/federation/activities/create_note.rs +++ b/examples/federation-actix/activities/create_note.rs @@ -37,7 +37,7 @@ impl CreateNote { } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl ActivityHandler for CreateNote { type DataType = InstanceHandle; type Error = crate::error::Error; diff --git a/examples/federation/activities/follow.rs b/examples/federation-actix/activities/follow.rs similarity index 88% rename from examples/federation/activities/follow.rs rename to examples/federation-actix/activities/follow.rs index da9f04c..0cc58f2 100644 --- a/examples/federation/activities/follow.rs +++ b/examples/federation-actix/activities/follow.rs @@ -34,7 +34,7 @@ impl Follow { } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl ActivityHandler for Follow { type DataType = InstanceHandle; type Error = crate::error::Error; @@ -63,11 +63,12 @@ impl ActivityHandler for Follow { request_counter: &mut i32, ) -> Result<(), Self::Error> { // add to followers - let mut users = data.users.lock().unwrap(); - let local_user = users.first_mut().unwrap(); - local_user.followers.push(self.actor.inner().clone()); - let local_user = local_user.clone(); - drop(users); + let local_user = { + let mut users = data.users.lock().unwrap(); + let local_user = users.first_mut().unwrap(); + local_user.followers.push(self.actor.inner().clone()); + local_user.clone() + }; // send back an accept let follower = self diff --git a/examples/federation/activities/mod.rs b/examples/federation-actix/activities/mod.rs similarity index 100% rename from examples/federation/activities/mod.rs rename to examples/federation-actix/activities/mod.rs diff --git a/examples/federation/error.rs b/examples/federation-actix/error.rs similarity index 100% rename from examples/federation/error.rs rename to examples/federation-actix/error.rs index 8e87a25..b460545 100644 --- a/examples/federation/error.rs +++ b/examples/federation-actix/error.rs @@ -5,8 +5,6 @@ use std::fmt::{Display, Formatter}; #[derive(Debug)] pub struct Error(anyhow::Error); -impl ResponseError for Error {} - impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { std::fmt::Display::fmt(&self.0, f) @@ -21,3 +19,5 @@ where Error(t.into()) } } + +impl ResponseError for Error {} diff --git a/examples/federation/instance.rs b/examples/federation-actix/instance.rs similarity index 91% rename from examples/federation/instance.rs rename to examples/federation-actix/instance.rs index 1e12ea2..786771b 100644 --- a/examples/federation/instance.rs +++ b/examples/federation-actix/instance.rs @@ -6,8 +6,13 @@ use crate::{ person::{MyUser, PersonAcceptedActivities}, }, }; + use activitypub_federation::{ - core::{inbox::receive_activity, object_id::ObjectId, signatures::generate_actor_keypair}, + core::{ + actix::inbox::receive_activity, + object_id::ObjectId, + signatures::generate_actor_keypair, + }, data::Data, deser::context::WithContext, traits::ApubObject, @@ -16,11 +21,10 @@ use activitypub_federation::{ UrlVerifier, APUB_JSON_CONTENT_TYPE, }; + use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer}; use async_trait::async_trait; -use http_signature_normalization_actix::prelude::VerifyDigest; use reqwest::Client; -use sha2::{Digest, Sha256}; use std::{ ops::Deref, sync::{Arc, Mutex}, @@ -88,9 +92,6 @@ impl Instance { .route("/objects/{user_name}", web::get().to(http_get_user)) .service( web::scope("") - // Important: this ensures that the activity json matches the hashsum in signed - // HTTP header - .wrap(VerifyDigest::new(Sha256::new())) // Just a single, global inbox for simplicity .route("/inbox", web::post().to(http_post_user_inbox)), ) @@ -116,6 +117,7 @@ async fn http_get_user( .await? .into_apub(&data) .await?; + Ok(HttpResponse::Ok() .content_type(APUB_JSON_CONTENT_TYPE) .json(WithContext::new_default(user))) diff --git a/examples/federation/main.rs b/examples/federation-actix/main.rs similarity index 100% rename from examples/federation/main.rs rename to examples/federation-actix/main.rs diff --git a/examples/federation/objects/mod.rs b/examples/federation-actix/objects/mod.rs similarity index 100% rename from examples/federation/objects/mod.rs rename to examples/federation-actix/objects/mod.rs diff --git a/examples/federation/objects/note.rs b/examples/federation-actix/objects/note.rs similarity index 98% rename from examples/federation/objects/note.rs rename to examples/federation-actix/objects/note.rs index 993dc08..d59b1c4 100644 --- a/examples/federation/objects/note.rs +++ b/examples/federation-actix/objects/note.rs @@ -39,7 +39,7 @@ pub struct Note { content: String, } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl ApubObject for MyPost { type DataType = InstanceHandle; type ApubType = Note; diff --git a/examples/federation/objects/person.rs b/examples/federation-actix/objects/person.rs similarity index 98% rename from examples/federation/objects/person.rs rename to examples/federation-actix/objects/person.rs index 8df7907..9b7d19c 100644 --- a/examples/federation/objects/person.rs +++ b/examples/federation-actix/objects/person.rs @@ -115,7 +115,7 @@ impl MyUser { local_instance: &LocalInstance, ) -> Result<(), ::Error> where - Activity: ActivityHandler + Serialize, + Activity: ActivityHandler + Serialize + Send + Sync, ::Error: From + From, { let activity = WithContext::new_default(activity); @@ -131,7 +131,7 @@ impl MyUser { } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl ApubObject for MyUser { type DataType = InstanceHandle; type ApubType = Person; diff --git a/examples/federation/utils.rs b/examples/federation-actix/utils.rs similarity index 100% rename from examples/federation/utils.rs rename to examples/federation-actix/utils.rs diff --git a/examples/federation-axum/activities/accept.rs b/examples/federation-axum/activities/accept.rs new file mode 100644 index 0000000..1022685 --- /dev/null +++ b/examples/federation-axum/activities/accept.rs @@ -0,0 +1,56 @@ +use crate::{activities::follow::Follow, instance::InstanceHandle, objects::person::MyUser}; +use activitypub_federation::{core::object_id::ObjectId, data::Data, traits::ActivityHandler}; +use activitystreams_kinds::activity::AcceptType; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Accept { + actor: ObjectId, + object: Follow, + #[serde(rename = "type")] + kind: AcceptType, + id: Url, +} + +impl Accept { + pub fn new(actor: ObjectId, object: Follow, id: Url) -> Accept { + Accept { + actor, + object, + kind: Default::default(), + id, + } + } +} + +#[async_trait::async_trait] +impl ActivityHandler for Accept { + type DataType = InstanceHandle; + type Error = crate::error::Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify( + &self, + _data: &Data, + _request_counter: &mut i32, + ) -> Result<(), Self::Error> { + Ok(()) + } + + async fn receive( + self, + _data: &Data, + _request_counter: &mut i32, + ) -> Result<(), Self::Error> { + Ok(()) + } +} diff --git a/examples/federation-axum/activities/create_note.rs b/examples/federation-axum/activities/create_note.rs new file mode 100644 index 0000000..3d28878 --- /dev/null +++ b/examples/federation-axum/activities/create_note.rs @@ -0,0 +1,70 @@ +use crate::{ + instance::InstanceHandle, + objects::{note::Note, person::MyUser}, + MyPost, +}; +use activitypub_federation::{ + core::object_id::ObjectId, + data::Data, + deser::helpers::deserialize_one_or_many, + traits::{ActivityHandler, ApubObject}, +}; +use activitystreams_kinds::activity::CreateType; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct CreateNote { + pub(crate) actor: ObjectId, + #[serde(deserialize_with = "deserialize_one_or_many")] + pub(crate) to: Vec, + pub(crate) object: Note, + #[serde(rename = "type")] + pub(crate) kind: CreateType, + pub(crate) id: Url, +} + +impl CreateNote { + pub fn new(note: Note, id: Url) -> CreateNote { + CreateNote { + actor: note.attributed_to.clone(), + to: note.to.clone(), + object: note, + kind: CreateType::Create, + id, + } + } +} + +#[async_trait::async_trait] +impl ActivityHandler for CreateNote { + type DataType = InstanceHandle; + type Error = crate::error::Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify( + &self, + data: &Data, + request_counter: &mut i32, + ) -> Result<(), Self::Error> { + MyPost::verify(&self.object, self.id(), data, request_counter).await?; + Ok(()) + } + + async fn receive( + self, + data: &Data, + request_counter: &mut i32, + ) -> Result<(), Self::Error> { + MyPost::from_apub(self.object, data, request_counter).await?; + Ok(()) + } +} diff --git a/examples/federation-axum/activities/follow.rs b/examples/federation-axum/activities/follow.rs new file mode 100644 index 0000000..0cc58f2 --- /dev/null +++ b/examples/federation-axum/activities/follow.rs @@ -0,0 +1,89 @@ +use crate::{ + activities::accept::Accept, + generate_object_id, + instance::InstanceHandle, + objects::person::MyUser, +}; +use activitypub_federation::{ + core::object_id::ObjectId, + data::Data, + traits::{ActivityHandler, Actor}, +}; +use activitystreams_kinds::activity::FollowType; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Follow { + pub(crate) actor: ObjectId, + pub(crate) object: ObjectId, + #[serde(rename = "type")] + kind: FollowType, + id: Url, +} + +impl Follow { + pub fn new(actor: ObjectId, object: ObjectId, id: Url) -> Follow { + Follow { + actor, + object, + kind: Default::default(), + id, + } + } +} + +#[async_trait::async_trait] +impl ActivityHandler for Follow { + type DataType = InstanceHandle; + type Error = crate::error::Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn verify( + &self, + _data: &Data, + _request_counter: &mut i32, + ) -> Result<(), Self::Error> { + Ok(()) + } + + // Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446 + #[allow(clippy::await_holding_lock)] + async fn receive( + self, + data: &Data, + request_counter: &mut i32, + ) -> Result<(), Self::Error> { + // add to followers + let local_user = { + let mut users = data.users.lock().unwrap(); + let local_user = users.first_mut().unwrap(); + local_user.followers.push(self.actor.inner().clone()); + local_user.clone() + }; + + // send back an accept + let follower = self + .actor + .dereference(data, data.local_instance(), request_counter) + .await?; + let id = generate_object_id(data.local_instance().hostname())?; + let accept = Accept::new(local_user.ap_id.clone(), self, id.clone()); + local_user + .send( + accept, + vec![follower.shared_inbox_or_inbox()], + data.local_instance(), + ) + .await?; + Ok(()) + } +} diff --git a/examples/federation-axum/activities/mod.rs b/examples/federation-axum/activities/mod.rs new file mode 100644 index 0000000..59d2fb0 --- /dev/null +++ b/examples/federation-axum/activities/mod.rs @@ -0,0 +1,3 @@ +pub mod accept; +pub mod create_note; +pub mod follow; diff --git a/examples/federation-axum/error.rs b/examples/federation-axum/error.rs new file mode 100644 index 0000000..0b4302d --- /dev/null +++ b/examples/federation-axum/error.rs @@ -0,0 +1,24 @@ +/// Necessary because of this issue: https://github.com/actix/actix-web/issues/1711 +#[derive(Debug)] +pub struct Error(anyhow::Error); + +impl From for Error +where + T: Into, +{ + fn from(t: T) -> Self { + Error(t.into()) + } +} + +mod axum { + use super::Error; + use axum::response::{IntoResponse, Response}; + use http::StatusCode; + + impl IntoResponse for Error { + fn into_response(self) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", self.0)).into_response() + } + } +} diff --git a/examples/federation-axum/instance.rs b/examples/federation-axum/instance.rs new file mode 100644 index 0000000..1b7fc6f --- /dev/null +++ b/examples/federation-axum/instance.rs @@ -0,0 +1,160 @@ +use crate::{ + error::Error, + generate_object_id, + objects::{ + note::MyPost, + person::{MyUser, PersonAcceptedActivities}, + }, +}; + +use activitypub_federation::{ + core::{object_id::ObjectId, signatures::generate_actor_keypair}, + data::Data, + deser::context::WithContext, + traits::ApubObject, + InstanceSettings, + LocalInstance, + UrlVerifier, +}; + +use activitypub_federation::core::axum::{verify_request_payload, DigestVerified}; +use async_trait::async_trait; +use axum::{ + body, + body::Body, + extract::{Json, OriginalUri, State}, + middleware, + response::IntoResponse, + routing::{get, post}, + Extension, + Router, +}; +use http::{HeaderMap, Method, Request}; +use reqwest::Client; +use std::{ + net::ToSocketAddrs, + sync::{Arc, Mutex}, +}; +use tokio::task; +use tower::ServiceBuilder; +use tower_http::ServiceBuilderExt; +use url::Url; + +pub type InstanceHandle = Arc; + +pub struct Instance { + /// This holds all library data + local_instance: LocalInstance, + /// Our "database" which contains all known users (local and federated) + pub users: Mutex>, + /// Same, but for posts + pub posts: Mutex>, +} + +/// Use this to store your federation blocklist, or a database connection needed to retrieve it. +#[derive(Clone)] +struct MyUrlVerifier(); + +#[async_trait] +impl UrlVerifier for MyUrlVerifier { + async fn verify(&self, url: &Url) -> Result<(), &'static str> { + if url.domain() == Some("malicious.com") { + Err("malicious domain") + } else { + Ok(()) + } + } +} + +impl Instance { + pub fn new(hostname: String) -> Result { + let settings = InstanceSettings::builder() + .debug(true) + .url_verifier(Box::new(MyUrlVerifier())) + .build()?; + let local_instance = + LocalInstance::new(hostname.clone(), Client::default().into(), settings); + let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?); + let instance = Arc::new(Instance { + local_instance, + users: Mutex::new(vec![local_user]), + posts: Mutex::new(vec![]), + }); + Ok(instance) + } + + pub fn local_user(&self) -> MyUser { + self.users.lock().unwrap().first().cloned().unwrap() + } + + pub fn local_instance(&self) -> &LocalInstance { + &self.local_instance + } + + pub fn listen(instance: &InstanceHandle) -> Result<(), Error> { + let hostname = instance.local_instance.hostname(); + let instance = instance.clone(); + let app = Router::new() + .route("/inbox", post(http_post_user_inbox)) + .layer( + ServiceBuilder::new() + .map_request_body(body::boxed) + .layer(middleware::from_fn(verify_request_payload)), + ) + .route("/objects/:user_name", get(http_get_user)) + .with_state(instance) + .layer(TraceLayer::new_for_http()); + + // run it + let addr = hostname + .to_socket_addrs()? + .next() + .expect("Failed to lookup domain name"); + let server = axum::Server::bind(&addr).serve(app.into_make_service()); + + task::spawn(server); + Ok(()) + } +} + +use crate::objects::person::Person; +use activitypub_federation::core::axum::{inbox::receive_activity, json::ApubJson}; +use tower_http::trace::TraceLayer; + +async fn http_get_user( + State(data): State, + request: Request, +) -> Result>, Error> { + let hostname: String = data.local_instance.hostname().to_string(); + let request_url = format!("http://{}{}", hostname, &request.uri()); + + let url = Url::parse(&request_url).expect("Failed to parse url"); + + let user = ObjectId::::new(url) + .dereference_local(&data) + .await? + .into_apub(&data) + .await?; + + Ok(ApubJson(WithContext::new_default(user))) +} + +async fn http_post_user_inbox( + headers: HeaderMap, + method: Method, + OriginalUri(uri): OriginalUri, + State(data): State, + Extension(digest_verified): Extension, + Json(activity): Json>, +) -> impl IntoResponse { + receive_activity::, MyUser, InstanceHandle>( + digest_verified, + activity, + &data.clone().local_instance, + &Data::new(data), + headers, + method, + uri, + ) + .await +} diff --git a/examples/federation-axum/main.rs b/examples/federation-axum/main.rs new file mode 100644 index 0000000..3222bd4 --- /dev/null +++ b/examples/federation-axum/main.rs @@ -0,0 +1,48 @@ +use crate::{error::Error, instance::Instance, objects::note::MyPost, utils::generate_object_id}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +mod activities; +mod error; +mod instance; +mod objects; +mod utils; + +#[actix_rt::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new( + std::env::var("RUST_LOG").unwrap_or_else(|_| { + "activitypub_federation=debug,federation-axum=debug,tower_http=debug".into() + }), + )) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let alpha = Instance::new("localhost:8001".to_string())?; + let beta = Instance::new("localhost:8002".to_string())?; + Instance::listen(&alpha)?; + Instance::listen(&beta)?; + + // alpha user follows beta user + alpha + .local_user() + .follow(&beta.local_user(), &alpha) + .await?; + + // assert that follow worked correctly + assert_eq!( + beta.local_user().followers(), + &vec![alpha.local_user().ap_id.inner().clone()] + ); + + // beta sends a post to its followers + let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id); + beta.local_user().post(sent_post.clone(), &beta).await?; + let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap(); + + // assert that alpha received the post + assert_eq!(received_post.text, sent_post.text); + assert_eq!(received_post.ap_id.inner(), sent_post.ap_id.inner()); + assert_eq!(received_post.creator.inner(), sent_post.creator.inner()); + Ok(()) +} diff --git a/examples/federation-axum/objects/mod.rs b/examples/federation-axum/objects/mod.rs new file mode 100644 index 0000000..20b1a49 --- /dev/null +++ b/examples/federation-axum/objects/mod.rs @@ -0,0 +1,2 @@ +pub mod note; +pub mod person; diff --git a/examples/federation-axum/objects/note.rs b/examples/federation-axum/objects/note.rs new file mode 100644 index 0000000..d59b1c4 --- /dev/null +++ b/examples/federation-axum/objects/note.rs @@ -0,0 +1,92 @@ +use crate::{generate_object_id, instance::InstanceHandle, objects::person::MyUser}; +use activitypub_federation::{ + core::object_id::ObjectId, + deser::helpers::deserialize_one_or_many, + traits::ApubObject, +}; +use activitystreams_kinds::{object::NoteType, public}; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Clone, Debug)] +pub struct MyPost { + pub text: String, + pub ap_id: ObjectId, + pub creator: ObjectId, + pub local: bool, +} + +impl MyPost { + pub fn new(text: String, creator: ObjectId) -> MyPost { + MyPost { + text, + ap_id: ObjectId::new(generate_object_id(creator.inner().domain().unwrap()).unwrap()), + creator, + local: true, + } + } +} + +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Note { + #[serde(rename = "type")] + kind: NoteType, + id: ObjectId, + pub(crate) attributed_to: ObjectId, + #[serde(deserialize_with = "deserialize_one_or_many")] + pub(crate) to: Vec, + content: String, +} + +#[async_trait::async_trait] +impl ApubObject for MyPost { + type DataType = InstanceHandle; + type ApubType = Note; + type DbType = (); + type Error = crate::error::Error; + + async fn read_from_apub_id( + _object_id: Url, + _data: &Self::DataType, + ) -> Result, Self::Error> { + todo!() + } + + async fn into_apub(self, data: &Self::DataType) -> Result { + let creator = self.creator.dereference_local(data).await?; + Ok(Note { + kind: Default::default(), + id: self.ap_id, + attributed_to: self.creator, + to: vec![public(), creator.followers_url()?], + content: self.text, + }) + } + + async fn verify( + _apub: &Self::ApubType, + _expected_domain: &Url, + _data: &Self::DataType, + _request_counter: &mut i32, + ) -> Result<(), Self::Error> { + Ok(()) + } + + async fn from_apub( + apub: Self::ApubType, + data: &Self::DataType, + _request_counter: &mut i32, + ) -> Result { + let post = MyPost { + text: apub.content, + ap_id: apub.id, + creator: apub.attributed_to, + local: false, + }; + + let mut lock = data.posts.lock().unwrap(); + lock.push(post.clone()); + Ok(post) + } +} diff --git a/examples/federation-axum/objects/person.rs b/examples/federation-axum/objects/person.rs new file mode 100644 index 0000000..9b7d19c --- /dev/null +++ b/examples/federation-axum/objects/person.rs @@ -0,0 +1,195 @@ +use crate::{ + activities::{accept::Accept, create_note::CreateNote, follow::Follow}, + error::Error, + instance::InstanceHandle, + objects::note::MyPost, + utils::generate_object_id, +}; +use activitypub_federation::{ + core::{ + activity_queue::send_activity, + object_id::ObjectId, + signatures::{Keypair, PublicKey}, + }, + data::Data, + deser::context::WithContext, + traits::{ActivityHandler, Actor, ApubObject}, + LocalInstance, +}; +use activitystreams_kinds::actor::PersonType; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Debug, Clone)] +pub struct MyUser { + pub ap_id: ObjectId, + pub inbox: Url, + // exists for all users (necessary to verify http signatures) + public_key: String, + // exists only for local users + private_key: Option, + pub followers: Vec, + pub local: bool, +} + +/// List of all activities which this actor can receive. +#[derive(Deserialize, Serialize, Debug)] +#[serde(untagged)] +#[enum_delegate::implement(ActivityHandler)] +pub enum PersonAcceptedActivities { + Follow(Follow), + Accept(Accept), + CreateNote(CreateNote), +} + +impl MyUser { + pub fn new(ap_id: Url, keypair: Keypair) -> MyUser { + let mut inbox = ap_id.clone(); + inbox.set_path("/inbox"); + let ap_id = ObjectId::new(ap_id); + MyUser { + ap_id, + inbox, + public_key: keypair.public_key, + private_key: Some(keypair.private_key), + followers: vec![], + local: true, + } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Person { + #[serde(rename = "type")] + kind: PersonType, + id: ObjectId, + inbox: Url, + public_key: PublicKey, +} + +impl MyUser { + pub fn followers(&self) -> &Vec { + &self.followers + } + + pub fn followers_url(&self) -> Result { + Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?) + } + + fn public_key(&self) -> PublicKey { + PublicKey::new_main_key(self.ap_id.clone().into_inner(), self.public_key.clone()) + } + + pub async fn follow(&self, other: &MyUser, instance: &InstanceHandle) -> Result<(), Error> { + let id = generate_object_id(instance.local_instance().hostname())?; + let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); + self.send( + follow, + vec![other.shared_inbox_or_inbox()], + instance.local_instance(), + ) + .await?; + Ok(()) + } + + pub async fn post(&self, post: MyPost, instance: &InstanceHandle) -> Result<(), Error> { + let id = generate_object_id(instance.local_instance().hostname())?; + let create = CreateNote::new(post.into_apub(instance).await?, id.clone()); + let mut inboxes = vec![]; + for f in self.followers.clone() { + let user: MyUser = ObjectId::new(f) + .dereference(instance, instance.local_instance(), &mut 0) + .await?; + inboxes.push(user.shared_inbox_or_inbox()); + } + self.send(create, inboxes, instance.local_instance()) + .await?; + Ok(()) + } + + pub(crate) async fn send( + &self, + activity: Activity, + recipients: Vec, + local_instance: &LocalInstance, + ) -> Result<(), ::Error> + where + Activity: ActivityHandler + Serialize + Send + Sync, + ::Error: From + From, + { + let activity = WithContext::new_default(activity); + send_activity( + activity, + self.public_key(), + self.private_key.clone().expect("has private key"), + recipients, + local_instance, + ) + .await?; + Ok(()) + } +} + +#[async_trait::async_trait] +impl ApubObject for MyUser { + type DataType = InstanceHandle; + type ApubType = Person; + type DbType = MyUser; + type Error = crate::error::Error; + + async fn read_from_apub_id( + object_id: Url, + data: &Self::DataType, + ) -> Result, Self::Error> { + let users = data.users.lock().unwrap(); + let res = users + .clone() + .into_iter() + .find(|u| u.ap_id.inner() == &object_id); + Ok(res) + } + + async fn into_apub(self, _data: &Self::DataType) -> Result { + Ok(Person { + kind: Default::default(), + id: self.ap_id.clone(), + inbox: self.inbox.clone(), + public_key: self.public_key(), + }) + } + + async fn verify( + _apub: &Self::ApubType, + _expected_domain: &Url, + _data: &Self::DataType, + _request_counter: &mut i32, + ) -> Result<(), Self::Error> { + Ok(()) + } + + async fn from_apub( + apub: Self::ApubType, + _data: &Self::DataType, + _request_counter: &mut i32, + ) -> Result { + Ok(MyUser { + ap_id: apub.id, + inbox: apub.inbox, + public_key: apub.public_key.public_key_pem, + private_key: None, + followers: vec![], + local: false, + }) + } +} + +impl Actor for MyUser { + fn public_key(&self) -> &str { + &self.public_key + } + + fn inbox(&self) -> Url { + self.inbox.clone() + } +} diff --git a/examples/federation-axum/utils.rs b/examples/federation-axum/utils.rs new file mode 100644 index 0000000..87c421e --- /dev/null +++ b/examples/federation-axum/utils.rs @@ -0,0 +1,13 @@ +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use url::{ParseError, Url}; + +/// Just generate random url as object id. In a real project, you probably want to use +/// an url which contains the database id for easy retrieval (or store the random id in db). +pub fn generate_object_id(hostname: &str) -> Result { + let id: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + Url::parse(&format!("http://{}/objects/{}", hostname, id)) +} diff --git a/src/core/activity_queue.rs b/src/core/activity_queue.rs index 9a9e292..be37cfb 100644 --- a/src/core/activity_queue.rs +++ b/src/core/activity_queue.rs @@ -1,7 +1,6 @@ use crate::{ core::signatures::{sign_request, PublicKey}, traits::ActivityHandler, - utils::verify_url_valid, Error, InstanceSettings, LocalInstance, @@ -27,7 +26,7 @@ use std::{ pin::Pin, time::{Duration, SystemTime}, }; -use tracing::{info, log::debug, warn}; +use tracing::{debug, info, warn}; use url::Url; /// Send out the given activity to all inboxes, automatically generating the HTTP signatures. By @@ -60,9 +59,10 @@ where let activity_queue = &instance.activity_queue; for inbox in inboxes { - if verify_url_valid(&inbox, &instance.settings).await.is_err() { + if instance.verify_url_valid(&inbox).await.is_err() { continue; } + let message = SendActivityTask { activity_id: activity_id.clone(), inbox, @@ -137,7 +137,7 @@ async fn do_send( ) -> Result<(), anyhow::Error> { debug!("Sending {} to {}", task.activity_id, task.inbox); let request_builder = client - .post(&task.inbox.to_string()) + .post(task.inbox.to_string()) .timeout(timeout) .headers(generate_request_headers(&task.inbox)); let request = sign_request( diff --git a/src/core/actix/inbox.rs b/src/core/actix/inbox.rs new file mode 100644 index 0000000..6848723 --- /dev/null +++ b/src/core/actix/inbox.rs @@ -0,0 +1,51 @@ +use crate::{ + core::object_id::ObjectId, + data::Data, + traits::{ActivityHandler, Actor, ApubObject}, + Error, + LocalInstance, +}; + +use crate::core::signatures::verify_signature; +use actix_web::{HttpRequest, HttpResponse}; +use serde::de::DeserializeOwned; +use tracing::debug; + +/// Receive an activity and perform some basic checks, including HTTP signature verification. +pub async fn receive_activity( + request: HttpRequest, + activity: Activity, + local_instance: &LocalInstance, + data: &Data, +) -> Result::Error> +where + Activity: ActivityHandler + DeserializeOwned + Send + 'static, + ActorT: ApubObject + Actor + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>, + ::Error: From + + From + + From<::Error> + + From, + ::Error: From + From, +{ + local_instance.verify_url_and_domain(&activity).await?; + + let request_counter = &mut 0; + let actor = ObjectId::::new(activity.actor().clone()) + .dereference(data, local_instance, request_counter) + .await?; + + verify_signature( + request.headers(), + request.method(), + request.uri(), + actor.public_key(), + )?; + + debug!("Verifying activity {}", activity.id().to_string()); + activity.verify(data, request_counter).await?; + + debug!("Receiving activity {}", activity.id().to_string()); + activity.receive(data, request_counter).await?; + Ok(HttpResponse::Ok().finish()) +} diff --git a/src/core/actix/mod.rs b/src/core/actix/mod.rs new file mode 100644 index 0000000..730098e --- /dev/null +++ b/src/core/actix/mod.rs @@ -0,0 +1 @@ +pub mod inbox; diff --git a/src/core/axum/digest.rs b/src/core/axum/digest.rs new file mode 100644 index 0000000..2046cbd --- /dev/null +++ b/src/core/axum/digest.rs @@ -0,0 +1,45 @@ +use axum::http::HeaderValue; +use sha2::{Digest, Sha256}; + +#[derive(Clone, Debug)] +pub struct DigestPart { + pub algorithm: String, + pub digest: String, +} + +impl DigestPart { + pub fn try_from_header(h: &HeaderValue) -> Option> { + let h = h.to_str().ok()?.split(';').next()?; + let v: Vec<_> = h + .split(',') + .filter_map(|p| { + let mut iter = p.splitn(2, '='); + iter.next() + .and_then(|alg| iter.next().map(|value| (alg, value))) + }) + .map(|(alg, value)| DigestPart { + algorithm: alg.to_owned(), + digest: value.to_owned(), + }) + .collect(); + + if v.is_empty() { + None + } else { + Some(v) + } + } +} + +pub fn verify_sha256(digests: &[DigestPart], payload: &[u8]) -> bool { + let mut hasher = Sha256::new(); + + for part in digests { + hasher.update(payload); + if base64::encode(hasher.finalize_reset()) != part.digest { + return false; + } + } + + true +} diff --git a/src/core/axum/inbox.rs b/src/core/axum/inbox.rs new file mode 100644 index 0000000..70b0b6c --- /dev/null +++ b/src/core/axum/inbox.rs @@ -0,0 +1,47 @@ +use crate::{ + core::{axum::DigestVerified, object_id::ObjectId, signatures::verify_signature}, + data::Data, + traits::{ActivityHandler, Actor, ApubObject}, + Error, + LocalInstance, +}; +use http::{HeaderMap, Method, Uri}; +use serde::de::DeserializeOwned; +use tracing::debug; + +/// Receive an activity and perform some basic checks, including HTTP signature verification. +pub async fn receive_activity( + _digest_verified: DigestVerified, + activity: Activity, + local_instance: &LocalInstance, + data: &Data, + headers: HeaderMap, + method: Method, + uri: Uri, +) -> Result<(), ::Error> +where + Activity: ActivityHandler + DeserializeOwned + Send + 'static, + ActorT: ApubObject + Actor + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>, + ::Error: From + + From + + From<::Error> + + From, + ::Error: From + From, +{ + local_instance.verify_url_and_domain(&activity).await?; + + let request_counter = &mut 0; + let actor = ObjectId::::new(activity.actor().clone()) + .dereference(data, local_instance, request_counter) + .await?; + + verify_signature(&headers, &method, &uri, actor.public_key())?; + + debug!("Verifying activity {}", activity.id().to_string()); + activity.verify(data, request_counter).await?; + + debug!("Receiving activity {}", activity.id().to_string()); + activity.receive(data, request_counter).await?; + Ok(()) +} diff --git a/src/core/axum/json.rs b/src/core/axum/json.rs new file mode 100644 index 0000000..6962942 --- /dev/null +++ b/src/core/axum/json.rs @@ -0,0 +1,32 @@ +use crate::APUB_JSON_CONTENT_TYPE; +use axum::response::IntoResponse; +use http::header; +use serde::Serialize; + +/// A wrapper struct to respond with [`APUB_JSON_CONTENT_TYPE`] +/// in axum handlers +/// +/// ## Example: +/// ```rust, no_run +/// use activitypub_federation::deser::context::WithContext; +/// async fn http_get_user() -> Result>, Error> { +/// let user = WithContext::new_default(M); +/// +/// Ok(ApubJson(WithContext::new_default(MyUser::default()))) +/// } +/// ``` +#[derive(Debug, Clone, Copy, Default)] +pub struct ApubJson(pub Json); + +impl IntoResponse for ApubJson { + fn into_response(self) -> axum::response::Response { + let mut response = axum::response::Json(self.0).into_response(); + response.headers_mut().insert( + header::CONTENT_TYPE, + APUB_JSON_CONTENT_TYPE + .parse() + .expect("Parsing 'application/activity+json' should never fail"), + ); + response + } +} diff --git a/src/core/axum/mod.rs b/src/core/axum/mod.rs new file mode 100644 index 0000000..ad6c81b --- /dev/null +++ b/src/core/axum/mod.rs @@ -0,0 +1,80 @@ +use axum::{ + async_trait, + body::{self, BoxBody, Bytes, Full}, + extract::FromRequest, + http::{Request, StatusCode}, + middleware::Next, + response::{IntoResponse, Response}, +}; +use digest::{verify_sha256, DigestPart}; + +mod digest; +pub mod inbox; +pub mod json; + +/// A request guard to ensure digest has been verified request has been +/// see [`receive_activity`] +#[derive(Clone)] +pub struct DigestVerified; + +pub struct BufferRequestBody(pub Bytes); + +pub async fn verify_request_payload( + request: Request, + next: Next, +) -> Result { + let mut request = verify_payload(request).await?; + request.extensions_mut().insert(DigestVerified); + Ok(next.run(request).await) +} + +async fn verify_payload(request: Request) -> Result, Response> { + let (parts, body) = request.into_parts(); + + // this wont work if the body is an long running stream + let bytes = hyper::body::to_bytes(body) + .await + .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?; + + match parts.headers.get("Digest") { + None => Err(( + StatusCode::UNAUTHORIZED, + "Missing digest header".to_string(), + ) + .into_response()), + Some(digest) => match DigestPart::try_from_header(digest) { + None => Err(( + StatusCode::UNAUTHORIZED, + "Malformed digest header".to_string(), + ) + .into_response()), + Some(digests) => { + if !verify_sha256(&digests, bytes.as_ref()) { + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Digest does not match payload".to_string(), + ) + .into_response()) + } else { + Ok(Request::from_parts(parts, body::boxed(Full::from(bytes)))) + } + } + }, + } +} + +#[async_trait] +impl FromRequest for BufferRequestBody +where + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request(req: Request, state: &S) -> Result { + let body = Bytes::from_request(req, state) + .await + .map_err(IntoResponse::into_response)?; + + Ok(Self(body)) + } +} diff --git a/src/core/inbox.rs b/src/core/inbox.rs index 5241f51..e69de29 100644 --- a/src/core/inbox.rs +++ b/src/core/inbox.rs @@ -1,53 +0,0 @@ -use crate::{ - core::{object_id::ObjectId, signatures::verify_signature}, - data::Data, - traits::{ActivityHandler, Actor, ApubObject}, - utils::{verify_domains_match, verify_url_valid}, - Error, - LocalInstance, -}; -use actix_web::{dev::Payload, FromRequest, HttpRequest, HttpResponse}; -use http_signature_normalization_actix::prelude::DigestVerified; -use serde::de::DeserializeOwned; -use tracing::log::debug; - -/// Receive an activity and perform some basic checks, including HTTP signature verification. -pub async fn receive_activity( - request: HttpRequest, - activity: Activity, - local_instance: &LocalInstance, - data: &Data, -) -> Result::Error> -where - Activity: ActivityHandler + DeserializeOwned + Send + 'static, - ActorT: ApubObject + Actor + Send + 'static, - for<'de2> ::ApubType: serde::Deserialize<'de2>, - ::Error: From - + From - + From<::Error> - + From - + From, - ::Error: From + From, -{ - // ensure that payload hash was checked against digest header by middleware - DigestVerified::from_request(&request, &mut Payload::None).await?; - - verify_domains_match(activity.id(), activity.actor())?; - verify_url_valid(activity.id(), &local_instance.settings).await?; - if local_instance.is_local_url(activity.id()) { - return Err(Error::UrlVerificationError("Activity was sent from local instance").into()); - } - - let request_counter = &mut 0; - let actor = ObjectId::::new(activity.actor().clone()) - .dereference(data, local_instance, request_counter) - .await?; - verify_signature(&request, actor.public_key())?; - - debug!("Verifying activity {}", activity.id().to_string()); - activity.verify(data, request_counter).await?; - - debug!("Receiving activity {}", activity.id().to_string()); - activity.receive(data, request_counter).await?; - Ok(HttpResponse::Ok().finish()) -} diff --git a/src/core/mod.rs b/src/core/mod.rs index fa173b2..0f5c1dd 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,4 +1,9 @@ pub mod activity_queue; -pub mod inbox; pub mod object_id; pub mod signatures; + +#[cfg(feature = "axum")] +pub mod axum; + +#[cfg(feature = "actix")] +pub mod actix; diff --git a/src/core/object_id.rs b/src/core/object_id.rs index 5f9c48e..7bd9c61 100644 --- a/src/core/object_id.rs +++ b/src/core/object_id.rs @@ -195,7 +195,7 @@ mod tests { #[derive(Debug)] struct TestObject {} - #[async_trait::async_trait(?Send)] + #[async_trait::async_trait] impl ApubObject for TestObject { type DataType = TestObject; type ApubType = (); diff --git a/src/core/signatures.rs b/src/core/signatures.rs index bf53050..4d22c4e 100644 --- a/src/core/signatures.rs +++ b/src/core/signatures.rs @@ -1,6 +1,6 @@ -use actix_web::HttpRequest; +use crate::utils::header_to_map; use anyhow::anyhow; -use http_signature_normalization_actix::Config as ConfigActix; +use http::{header::HeaderName, uri::PathAndQuery, HeaderValue, Method, Uri}; use http_signature_normalization_reqwest::prelude::{Config, SignExt}; use once_cell::sync::{Lazy, OnceCell}; use openssl::{ @@ -17,7 +17,6 @@ use std::io::{Error, ErrorKind}; use tracing::debug; use url::Url; -static CONFIG2: Lazy = Lazy::new(ConfigActix::new); static HTTP_SIG_CONFIG: OnceCell = OnceCell::new(); /// A private/public key pair used for HTTP signatures @@ -80,33 +79,6 @@ pub(crate) async fn sign_request( .await } -/// Verifies the HTTP signature on an incoming inbox request. -pub fn verify_signature(request: &HttpRequest, public_key: &str) -> Result<(), anyhow::Error> { - let verified = CONFIG2 - .begin_verify( - request.method(), - request.uri().path_and_query(), - request.headers().clone(), - )? - .verify(|signature, signing_string| -> Result { - debug!( - "Verifying with key {}, message {}", - &public_key, &signing_string - ); - let public_key = PKey::public_key_from_pem(public_key.as_bytes())?; - let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?; - verifier.update(signing_string.as_bytes())?; - Ok(verifier.verify(&base64::decode(signature)?)?) - })?; - - if verified { - debug!("verified signature for {}", &request.uri()); - Ok(()) - } else { - Err(anyhow!("Invalid signature on request: {}", &request.uri())) - } -} - #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct PublicKey { @@ -131,3 +103,40 @@ impl PublicKey { } } } + +static CONFIG2: Lazy = + Lazy::new(http_signature_normalization::Config::new); + +/// Verifies the HTTP signature on an incoming inbox request. +pub fn verify_signature<'a, H>( + headers: H, + method: &Method, + uri: &Uri, + public_key: &str, +) -> Result<(), anyhow::Error> +where + H: IntoIterator, +{ + let headers = header_to_map(headers); + let path_and_query = uri.path_and_query().map(PathAndQuery::as_str).unwrap_or(""); + + let verified = CONFIG2 + .begin_verify(method.as_str(), path_and_query, headers)? + .verify(|signature, signing_string| -> anyhow::Result { + debug!( + "Verifying with key {}, message {}", + &public_key, &signing_string + ); + let public_key = PKey::public_key_from_pem(public_key.as_bytes())?; + let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?; + verifier.update(signing_string.as_bytes())?; + Ok(verifier.verify(&base64::decode(signature)?)?) + })?; + + if verified { + debug!("verified signature for {}", uri); + Ok(()) + } else { + Err(anyhow!("Invalid signature on request: {}", uri)) + } +} diff --git a/src/deser/context.rs b/src/deser/context.rs index 670890b..d5ca763 100644 --- a/src/deser/context.rs +++ b/src/deser/context.rs @@ -28,10 +28,10 @@ impl WithContext { } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl ActivityHandler for WithContext where - T: ActivityHandler, + T: ActivityHandler + Send + Sync, { type DataType = ::DataType; type Error = ::Error; diff --git a/src/lib.rs b/src/lib.rs index 5d641ab..4063628 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,14 @@ -use crate::core::activity_queue::create_activity_queue; +use crate::{ + core::activity_queue::create_activity_queue, + traits::ActivityHandler, + utils::verify_domains_match, +}; use async_trait::async_trait; use background_jobs::Manager; use derive_builder::Builder; use dyn_clone::{clone_trait_object, DynClone}; use reqwest_middleware::ClientWithMiddleware; +use serde::de::DeserializeOwned; use std::time::Duration; use url::Url; @@ -25,6 +30,62 @@ pub struct LocalInstance { settings: InstanceSettings, } +impl LocalInstance { + async fn verify_url_and_domain( + &self, + activity: &Activity, + ) -> Result<(), Error> + where + Activity: ActivityHandler + DeserializeOwned + Send + 'static, + { + verify_domains_match(activity.id(), activity.actor())?; + self.verify_url_valid(activity.id()).await?; + if self.is_local_url(activity.id()) { + return Err(Error::UrlVerificationError( + "Activity was sent from local instance", + )); + } + + Ok(()) + } + + /// Perform some security checks on URLs as mentioned in activitypub spec, and call user-supplied + /// [`InstanceSettings.verify_url_function`]. + /// + /// https://www.w3.org/TR/activitypub/#security-considerations + async fn verify_url_valid(&self, url: &Url) -> Result<(), Error> { + match url.scheme() { + "https" => {} + "http" => { + if !self.settings.debug { + return Err(Error::UrlVerificationError( + "Http urls are only allowed in debug mode", + )); + } + } + _ => return Err(Error::UrlVerificationError("Invalid url scheme")), + }; + + if url.domain().is_none() { + return Err(Error::UrlVerificationError("Url must have a domain")); + } + + if url.domain() == Some("localhost") && !self.settings.debug { + return Err(Error::UrlVerificationError( + "Localhost is only allowed in debug mode", + )); + } + + self.settings + .url_verifier + .verify(url) + .await + .map_err(Error::UrlVerificationError)?; + + Ok(()) + } +} + #[async_trait] pub trait UrlVerifier: DynClone + Send { async fn verify(&self, url: &Url) -> Result<(), &'static str>; diff --git a/src/signature.rs b/src/signature.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/traits.rs b/src/traits.rs index 1ff9b67..ad7c85b 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -4,10 +4,10 @@ use std::ops::Deref; use url::Url; /// Trait which allows verification and reception of incoming activities. -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] #[enum_delegate::register] pub trait ActivityHandler { - type DataType; + type DataType: Send + Sync; type Error; /// `id` field of the activity @@ -34,10 +34,10 @@ pub trait ActivityHandler { } /// Allow for boxing of enum variants -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] impl ActivityHandler for Box where - T: ActivityHandler, + T: ActivityHandler + Send + Sync, { type DataType = T::DataType; type Error = T::Error; @@ -67,9 +67,9 @@ where } } -#[async_trait::async_trait(?Send)] +#[async_trait::async_trait] pub trait ApubObject { - type DataType; + type DataType: Send + Sync; type ApubType; type DbType; type Error; diff --git a/src/utils.rs b/src/utils.rs index 45e4037..62a1da4 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,8 @@ -use crate::{Error, InstanceSettings, LocalInstance, APUB_JSON_CONTENT_TYPE}; -use http::StatusCode; +use crate::{Error, LocalInstance, APUB_JSON_CONTENT_TYPE}; +use http::{header::HeaderName, HeaderValue, StatusCode}; use serde::de::DeserializeOwned; -use tracing::log::info; +use std::collections::BTreeMap; +use tracing::info; use url::Url; pub async fn fetch_object_http( @@ -11,7 +12,7 @@ pub async fn fetch_object_http( ) -> Result { // dont fetch local objects this way debug_assert!(url.domain() != Some(&instance.hostname)); - verify_url_valid(url, &instance.settings).await?; + instance.verify_url_valid(url).await?; info!("Fetching remote object {}", url.to_string()); *request_counter += 1; @@ -51,38 +52,18 @@ pub fn verify_urls_match(a: &Url, b: &Url) -> Result<(), Error> { Ok(()) } -/// Perform some security checks on URLs as mentioned in activitypub spec, and call user-supplied -/// [`InstanceSettings.verify_url_function`]. -/// -/// https://www.w3.org/TR/activitypub/#security-considerations -pub async fn verify_url_valid(url: &Url, settings: &InstanceSettings) -> Result<(), Error> { - match url.scheme() { - "https" => {} - "http" => { - if !settings.debug { - return Err(Error::UrlVerificationError( - "Http urls are only allowed in debug mode", - )); - } +/// Utility to converts either actix or axum headermap to a BTreeMap +pub fn header_to_map<'a, H>(headers: H) -> BTreeMap +where + H: IntoIterator, +{ + let mut header_map = BTreeMap::new(); + + for (name, value) in headers { + if let Ok(value) = value.to_str() { + header_map.insert(name.to_string(), value.to_string()); } - _ => return Err(Error::UrlVerificationError("Invalid url scheme")), - }; - - if url.domain().is_none() { - return Err(Error::UrlVerificationError("Url must have a domain")); } - if url.domain() == Some("localhost") && !settings.debug { - return Err(Error::UrlVerificationError( - "Localhost is only allowed in debug mode", - )); - } - - settings - .url_verifier - .verify(url) - .await - .map_err(Error::UrlVerificationError)?; - - Ok(()) + header_map }