Various improvements for usability, examples and docs

This commit is contained in:
Felix Ableitner 2023-02-19 21:26:01 +09:00
parent 5a5c015bfc
commit 69e77dfa74
40 changed files with 1409 additions and 1019 deletions

142
Cargo.lock generated
View file

@ -12,6 +12,7 @@ dependencies = [
"anyhow",
"async-trait",
"axum",
"axum-macros",
"background-jobs",
"base64",
"bytes",
@ -37,11 +38,9 @@ dependencies = [
"serde_json",
"sha2",
"thiserror",
"tokio",
"tower",
"tower-http",
"tracing",
"tracing-subscriber",
"url",
]
@ -272,6 +271,17 @@ dependencies = [
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -332,9 +342,9 @@ dependencies = [
[[package]]
name = "axum-macros"
version = "0.3.0"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4df0fc33ada14a338b799002f7e8657711422b25d4e16afb032708d6b185621"
checksum = "5fbf955307ff8addb48d2399393c9e2740dd491537ec562b66ab364fc4a38841"
dependencies = [
"heck",
"proc-macro2",
@ -554,9 +564,9 @@ dependencies = [
[[package]]
name = "darling"
version = "0.14.2"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0dd3cd20dc6b5a876612a6e5accfe7f3dd883db6d07acfbf14c128f61550dfa"
checksum = "c0808e1bd8671fb44a113a14e13497557533369847788fa2ae912b6ebfce9fa8"
dependencies = [
"darling_core",
"darling_macro",
@ -564,9 +574,9 @@ dependencies = [
[[package]]
name = "darling_core"
version = "0.14.2"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a784d2ccaf7c98501746bf0be29b2022ba41fd62a2e622af997a03e9f972859f"
checksum = "001d80444f28e193f30c2f293455da62dcf9a6b29918a4253152ae2b1de592cb"
dependencies = [
"fnv",
"ident_case",
@ -578,9 +588,9 @@ dependencies = [
[[package]]
name = "darling_macro"
version = "0.14.2"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7618812407e9402654622dd402b0a89dff9ba93badd6540781526117b92aab7e"
checksum = "b36230598a2d5de7ec1c6f51f72d8a99a9208daff41de2084d06e3fd3ea56685"
dependencies = [
"darling_core",
"quote",
@ -589,18 +599,18 @@ dependencies = [
[[package]]
name = "derive_builder"
version = "0.11.2"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07adf7be193b71cc36b193d0f5fe60b918a3a9db4dad0449f57bcfd519704a3"
checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8"
dependencies = [
"derive_builder_macro",
]
[[package]]
name = "derive_builder_core"
version = "0.11.2"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f91d4cfa921f1c05904dc3c57b4a32c38aed3340cce209f3a6fd1478babafc4"
checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f"
dependencies = [
"darling",
"proc-macro2",
@ -610,9 +620,9 @@ dependencies = [
[[package]]
name = "derive_builder_macro"
version = "0.11.2"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f0314b72bed045f3a68671b3c86328386762c93f82d98c65c3cb5e5f573dd68"
checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e"
dependencies = [
"derive_builder_core",
"syn",
@ -692,7 +702,11 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]]
@ -930,6 +944,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.23"
@ -1120,15 +1140,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "matchit"
version = "0.6.0"
@ -1187,16 +1198,6 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@ -1277,12 +1278,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1430,15 +1425,6 @@ dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.28"
@ -1655,15 +1641,6 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -1773,15 +1750,6 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.3.17"
@ -1835,7 +1803,6 @@ dependencies = [
"libc",
"memchr",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
@ -1912,7 +1879,6 @@ dependencies = [
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@ -1958,7 +1924,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
@ -1971,35 +1936,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
name = "try-lock"
version = "0.2.3"
@ -2070,12 +2006,6 @@ dependencies = [
"serde",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"

View file

@ -25,17 +25,18 @@ http = "0.2.8"
sha2 = "0.10.6"
background-jobs = "0.13.0"
thiserror = "1.0.37"
derive_builder = "0.11.2"
derive_builder = "0.12.0"
itertools = "0.10.5"
dyn-clone = "1.0.9"
enum_delegate = "0.2.0"
httpdate = "1.0.2"
http-signature-normalization-reqwest = { version = "0.7.1", default-features = false, features = ["sha-2", "middleware"] }
http-signature-normalization = "0.6.0"
actix-rt = { version = "2.7.0" }
actix-rt = "2.7.0"
bytes = "1.3.0"
futures-core = { version = "0.3.25", default-features = false }
pin-project-lite = "0.2.9"
activitystreams-kinds = "0.2.1"
# Actix-web
actix-web = { version = "4.2.1", default-features = false, optional = true }
@ -48,25 +49,18 @@ hyper = { version = "0.14", optional = true }
[features]
default = []
actix-web = ["dep:actix-web"]
axum = [
"dep:axum",
"dep:tower",
"dep:hyper",
]
axum = ["dep:axum", "dep:tower", "dep:hyper"]
[dev-dependencies]
activitystreams-kinds = "0.2.1"
rand = "0.8.5"
actix-rt = "2.7.0"
tokio = { version = "1.21.2", features = ["full"] }
env_logger = { version = "0.9.3", default-features = false }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower-http = { version = "0.3", features = ["map-request-body", "util", "trace"] }
env_logger = "0.9.3"
tower-http = { version = "0.3", features = ["map-request-body", "util"] }
axum-macros = "0.3.4"
[profile.dev]
strip = "symbols"
debug = 0
[[example]]
name = "simple_federation"
path = "examples/simple_federation/main.rs"
name = "local_federation"
path = "examples/local_federation/main.rs"

View file

@ -1,17 +1,17 @@
use crate::{activities::follow::Follow, instance::DatabaseHandle, objects::person::MyUser};
use crate::{activities::follow::Follow, instance::DatabaseHandle, objects::person::DbUser};
use activitypub_federation::{
core::object_id::ObjectId,
kinds::activity::AcceptType,
request_data::RequestData,
traits::ActivityHandler,
};
use activitystreams_kinds::activity::AcceptType;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Accept {
actor: ObjectId<MyUser>,
actor: ObjectId<DbUser>,
object: Follow,
#[serde(rename = "type")]
kind: AcceptType,
@ -19,7 +19,7 @@ pub struct Accept {
}
impl Accept {
pub fn new(actor: ObjectId<MyUser>, object: Follow, id: Url) -> Accept {
pub fn new(actor: ObjectId<DbUser>, object: Follow, id: Url) -> Accept {
Accept {
actor,
object,

View file

@ -1,22 +1,22 @@
use crate::{
instance::DatabaseHandle,
objects::{note::Note, person::MyUser},
MyPost,
objects::{person::DbUser, post::Note},
DbPost,
};
use activitypub_federation::{
core::object_id::ObjectId,
deser::helpers::deserialize_one_or_many,
kinds::activity::CreateType,
protocol::helpers::deserialize_one_or_many,
request_data::RequestData,
traits::{ActivityHandler, ApubObject},
};
use activitystreams_kinds::activity::CreateType;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct CreateNote {
pub(crate) actor: ObjectId<MyUser>,
pub(crate) actor: ObjectId<DbUser>,
#[serde(deserialize_with = "deserialize_one_or_many")]
pub(crate) to: Vec<Url>,
pub(crate) object: Note,
@ -51,7 +51,7 @@ impl ActivityHandler for CreateNote {
}
async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
MyPost::from_apub(self.object, data).await?;
DbPost::from_apub(self.object, data).await?;
Ok(())
}
}

View file

@ -2,29 +2,29 @@ use crate::{
activities::accept::Accept,
generate_object_id,
instance::DatabaseHandle,
objects::person::MyUser,
objects::person::DbUser,
};
use activitypub_federation::{
core::object_id::ObjectId,
kinds::activity::FollowType,
request_data::RequestData,
traits::{ActivityHandler, Actor},
};
use activitystreams_kinds::activity::FollowType;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Follow {
pub(crate) actor: ObjectId<MyUser>,
pub(crate) object: ObjectId<MyUser>,
pub(crate) actor: ObjectId<DbUser>,
pub(crate) object: ObjectId<DbUser>,
#[serde(rename = "type")]
kind: FollowType,
id: Url,
}
impl Follow {
pub fn new(actor: ObjectId<MyUser>, object: ObjectId<MyUser>, id: Url) -> Follow {
pub fn new(actor: ObjectId<DbUser>, object: ObjectId<DbUser>, id: Url) -> Follow {
Follow {
actor,
object,
@ -60,7 +60,7 @@ impl ActivityHandler for Follow {
// send back an accept
let follower = self.actor.dereference(data).await?;
let id = generate_object_id(data.local_instance().hostname())?;
let id = generate_object_id(data.hostname())?;
let accept = Accept::new(local_user.ap_id.clone(), self, id.clone());
local_user
.send(accept, vec![follower.shared_inbox_or_inbox()], data)

View file

@ -1,3 +1,3 @@
pub mod accept;
pub mod create_note;
pub mod create_post;
pub mod follow;

View file

@ -0,0 +1,58 @@
use crate::{
error::Error,
instance::DatabaseHandle,
objects::person::{DbUser, PersonAcceptedActivities},
};
use activitypub_federation::{
config::FederationConfig,
core::actix_web::inbox::receive_activity,
protocol::context::WithContext,
request_data::{ApubMiddleware, RequestData},
traits::ApubObject,
APUB_JSON_CONTENT_TYPE,
};
use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer};
use anyhow::anyhow;
pub fn listen(data: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
let hostname = data.hostname();
let data = data.clone();
let server = HttpServer::new(move || {
App::new()
.wrap(ApubMiddleware::new(data.clone()))
.route("/{user}", web::get().to(http_get_user))
.route("/{user}/inbox", web::post().to(http_post_user_inbox))
})
.bind(hostname)?
.run();
actix_rt::spawn(server);
Ok(())
}
/// Handles requests to fetch user json over HTTP
pub async fn http_get_user(
user_name: web::Path<String>,
data: RequestData<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let db_user = data.local_user();
if user_name.into_inner() == db_user.name {
let apub_user = db_user.into_apub(&data).await?;
Ok(HttpResponse::Ok()
.content_type(APUB_JSON_CONTENT_TYPE)
.json(WithContext::new_default(apub_user)))
} else {
Err(anyhow!("Invalid user").into())
}
}
/// Handles messages received in user inbox
pub async fn http_post_user_inbox(
request: HttpRequest,
body: Bytes,
data: RequestData<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
request, body, &data,
)
.await
}

View file

@ -0,0 +1,64 @@
use crate::{
error::Error,
instance::DatabaseHandle,
objects::person::{DbUser, Person, PersonAcceptedActivities},
};
use activitypub_federation::{
config::FederationConfig,
core::axum::{inbox::receive_activity, json::ApubJson, ActivityData},
protocol::context::WithContext,
request_data::{ApubMiddleware, RequestData},
traits::ApubObject,
};
use anyhow::anyhow;
use axum::{
extract::Path,
response::IntoResponse,
routing::{get, post},
Router,
};
use std::net::ToSocketAddrs;
pub fn listen(data: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
let hostname = data.hostname();
let data = data.clone();
let app = Router::new()
.route("/:user/inbox", post(http_post_user_inbox))
.route("/:user", get(http_get_user))
.layer(ApubMiddleware::new(data));
let addr = hostname
.to_socket_addrs()?
.next()
.expect("Failed to lookup domain name");
let server = axum::Server::bind(&addr).serve(app.into_make_service());
actix_rt::spawn(server);
Ok(())
}
#[axum_macros::debug_handler]
async fn http_get_user(
Path(user): Path<String>,
data: RequestData<DatabaseHandle>,
) -> Result<ApubJson<WithContext<Person>>, Error> {
let db_user = data.local_user();
if user == db_user.name {
let apub_user = db_user.into_apub(&data).await?;
Ok(ApubJson(WithContext::new_default(apub_user)))
} else {
Err(anyhow!("Invalid user {user}").into())
}
}
#[axum_macros::debug_handler]
async fn http_post_user_inbox(
data: RequestData<DatabaseHandle>,
activity_data: ActivityData,
) -> impl IntoResponse {
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
activity_data,
&data,
)
.await
}

View file

@ -0,0 +1,63 @@
use crate::{
objects::{person::DbUser, post::DbPost},
Error,
};
use activitypub_federation::config::{FederationConfig, UrlVerifier};
use async_trait::async_trait;
use std::sync::{Arc, Mutex};
use url::Url;
pub type DatabaseHandle = Arc<Database>;
/// Our "database" which contains all known posts users (local and federated)
pub struct Database {
pub users: Mutex<Vec<DbUser>>,
pub posts: Mutex<Vec<DbPost>>,
}
/// Use this to store your federation blocklist, or a database connection needed to retrieve it.
#[derive(Clone)]
struct MyUrlVerifier();
#[async_trait]
impl UrlVerifier for MyUrlVerifier {
async fn verify(&self, url: &Url) -> Result<(), &'static str> {
if url.domain() == Some("malicious.com") {
Err("malicious domain")
} else {
Ok(())
}
}
}
pub fn listen(data: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
if cfg!(feature = "actix-web") == cfg!(feature = "axum") {
panic!("Exactly one of features \"actix-web\" and \"axum\" must be enabled");
}
#[cfg(feature = "actix-web")]
crate::actix_web::http::listen(data)?;
#[cfg(feature = "axum")]
crate::axum::http::listen(data)?;
Ok(())
}
impl Database {
pub fn new(hostname: &str, name: String) -> Result<FederationConfig<DatabaseHandle>, Error> {
let local_user = DbUser::new(hostname, name)?;
let database = Arc::new(Database {
users: Mutex::new(vec![local_user]),
posts: Mutex::new(vec![]),
});
let config = FederationConfig::builder()
.hostname(hostname)
.app_data(database)
.debug(true)
.build()?;
Ok(config)
}
pub fn local_user(&self) -> DbUser {
let lock = self.users.lock().unwrap();
lock.first().unwrap().clone()
}
}

View file

@ -1,10 +1,10 @@
use crate::{
instance::{listen, Database},
objects::note::MyPost,
objects::post::DbPost,
utils::generate_object_id,
};
use error::Error;
use tracing::log::LevelFilter;
use tracing::log::{info, LevelFilter};
mod activities;
#[cfg(feature = "actix-web")]
@ -19,35 +19,41 @@ mod utils;
#[actix_rt::main]
async fn main() -> Result<(), Error> {
env_logger::builder()
.filter_level(LevelFilter::Debug)
.filter_level(LevelFilter::Warn)
.filter_module("local_federation", LevelFilter::Info)
.format_timestamp(None)
.init();
let alpha = Database::new("localhost:8001".to_string())?;
let beta = Database::new("localhost:8002".to_string())?;
info!("Starting local instances alpha and beta on localhost:8001, localhost:8002");
let alpha = Database::new("localhost:8001", "alpha".to_string())?;
let beta = Database::new("localhost:8002", "beta".to_string())?;
listen(&alpha)?;
listen(&beta)?;
info!("Local instances started");
// alpha user follows beta user
info!("Alpha user follows beta user");
alpha
.local_user()
.follow(&beta.local_user(), &alpha.to_request_data())
.await?;
// assert that follow worked correctly
assert_eq!(
beta.local_user().followers(),
&vec![alpha.local_user().ap_id.inner().clone()]
);
info!("Follow was successful");
// beta sends a post to its followers
let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id);
info!("Beta sends a post to its followers");
let sent_post = DbPost::new("Hello world!".to_string(), beta.local_user().ap_id)?;
beta.local_user()
.post(sent_post.clone(), &beta.to_request_data())
.await?;
let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap();
info!("Alpha received post: {}", received_post.text);
// assert that alpha received the post
assert_eq!(received_post.text, sent_post.text);
assert_eq!(received_post.ap_id.inner(), sent_post.ap_id.inner());
assert_eq!(received_post.creator.inner(), sent_post.creator.inner());
info!("Test completed");
Ok(())
}

View file

@ -1,2 +1,2 @@
pub mod note;
pub mod person;
pub mod post;

View file

@ -1,27 +1,29 @@
use crate::{
activities::{accept::Accept, create_note::CreateNote, follow::Follow},
activities::{accept::Accept, create_post::CreateNote, follow::Follow},
error::Error,
instance::DatabaseHandle,
objects::note::MyPost,
objects::post::DbPost,
utils::generate_object_id,
};
use activitypub_federation::{
core::{
activity_queue::send_activity,
object_id::ObjectId,
signatures::{Keypair, PublicKey},
signatures::{generate_actor_keypair, PublicKey},
},
deser::context::WithContext,
kinds::actor::PersonType,
protocol::context::WithContext,
request_data::RequestData,
traits::{ActivityHandler, Actor, ApubObject},
};
use activitystreams_kinds::actor::PersonType;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use url::Url;
#[derive(Debug, Clone)]
pub struct MyUser {
pub ap_id: ObjectId<MyUser>,
pub struct DbUser {
pub name: String,
pub ap_id: ObjectId<DbUser>,
pub inbox: Url,
// exists for all users (necessary to verify http signatures)
public_key: String,
@ -41,19 +43,20 @@ pub enum PersonAcceptedActivities {
CreateNote(CreateNote),
}
impl MyUser {
pub fn new(ap_id: Url, keypair: Keypair) -> MyUser {
let mut inbox = ap_id.clone();
inbox.set_path("/inbox");
let ap_id = ObjectId::new(ap_id);
MyUser {
impl DbUser {
pub fn new(hostname: &str, name: String) -> Result<DbUser, Error> {
let ap_id = Url::parse(&format!("http://{}/{}", hostname, &name))?.into();
let inbox = Url::parse(&format!("http://{}/{}/inbox", hostname, &name))?;
let keypair = generate_actor_keypair()?;
Ok(DbUser {
name,
ap_id,
inbox,
public_key: keypair.public_key,
private_key: Some(keypair.private_key),
followers: vec![],
local: true,
}
})
}
}
@ -62,12 +65,13 @@ impl MyUser {
pub struct Person {
#[serde(rename = "type")]
kind: PersonType,
id: ObjectId<MyUser>,
preferred_username: String,
id: ObjectId<DbUser>,
inbox: Url,
public_key: PublicKey,
}
impl MyUser {
impl DbUser {
pub fn followers(&self) -> &Vec<Url> {
&self.followers
}
@ -82,29 +86,29 @@ impl MyUser {
pub async fn follow(
&self,
other: &MyUser,
instance: &RequestData<DatabaseHandle>,
other: &DbUser,
data: &RequestData<DatabaseHandle>,
) -> Result<(), Error> {
let id = generate_object_id(instance.local_instance().hostname())?;
let id = generate_object_id(data.hostname())?;
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone());
self.send(follow, vec![other.shared_inbox_or_inbox()], instance)
self.send(follow, vec![other.shared_inbox_or_inbox()], data)
.await?;
Ok(())
}
pub async fn post(
&self,
post: MyPost,
instance: &RequestData<DatabaseHandle>,
post: DbPost,
data: &RequestData<DatabaseHandle>,
) -> Result<(), Error> {
let id = generate_object_id(instance.local_instance().hostname())?;
let create = CreateNote::new(post.into_apub(instance).await?, id.clone());
let id = generate_object_id(data.hostname())?;
let create = CreateNote::new(post.into_apub(data).await?, id.clone());
let mut inboxes = vec![];
for f in self.followers.clone() {
let user: MyUser = ObjectId::new(f).dereference(instance).await?;
let user: DbUser = ObjectId::new(f).dereference(data).await?;
inboxes.push(user.shared_inbox_or_inbox());
}
self.send(create, inboxes, instance).await?;
self.send(create, inboxes, data).await?;
Ok(())
}
@ -115,16 +119,16 @@ impl MyUser {
data: &RequestData<DatabaseHandle>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize + Send + Sync,
Activity: ActivityHandler + Serialize + Debug + Send + Sync,
<Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>,
{
let activity = WithContext::new_default(activity);
send_activity(
activity,
self.public_key(),
self.private_key.clone().expect("has private key"),
self.private_key.clone().unwrap(),
recipients,
data.local_instance(),
data,
)
.await?;
Ok(())
@ -132,10 +136,9 @@ impl MyUser {
}
#[async_trait::async_trait]
impl ApubObject for MyUser {
impl ApubObject for DbUser {
type DataType = DatabaseHandle;
type ApubType = Person;
type DbType = MyUser;
type Error = Error;
async fn read_from_apub_id(
@ -155,6 +158,7 @@ impl ApubObject for MyUser {
_data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error> {
Ok(Person {
preferred_username: self.name.clone(),
kind: Default::default(),
id: self.ap_id.clone(),
inbox: self.inbox.clone(),
@ -166,7 +170,8 @@ impl ApubObject for MyUser {
apub: Self::ApubType,
_data: &RequestData<Self::DataType>,
) -> Result<Self, Self::Error> {
Ok(MyUser {
Ok(DbUser {
name: apub.preferred_username,
ap_id: apub.id,
inbox: apub.inbox,
public_key: apub.public_key.public_key_pem,
@ -177,7 +182,7 @@ impl ApubObject for MyUser {
}
}
impl Actor for MyUser {
impl Actor for DbUser {
fn public_key(&self) -> &str {
&self.public_key
}

View file

@ -1,30 +1,31 @@
use crate::{error::Error, generate_object_id, instance::DatabaseHandle, objects::person::MyUser};
use crate::{error::Error, generate_object_id, instance::DatabaseHandle, objects::person::DbUser};
use activitypub_federation::{
core::object_id::ObjectId,
deser::helpers::deserialize_one_or_many,
kinds::{object::NoteType, public},
protocol::helpers::deserialize_one_or_many,
request_data::RequestData,
traits::ApubObject,
};
use activitystreams_kinds::{object::NoteType, public};
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Clone, Debug)]
pub struct MyPost {
pub struct DbPost {
pub text: String,
pub ap_id: ObjectId<MyPost>,
pub creator: ObjectId<MyUser>,
pub ap_id: ObjectId<DbPost>,
pub creator: ObjectId<DbUser>,
pub local: bool,
}
impl MyPost {
pub fn new(text: String, creator: ObjectId<MyUser>) -> MyPost {
MyPost {
impl DbPost {
pub fn new(text: String, creator: ObjectId<DbUser>) -> Result<DbPost, Error> {
let ap_id = generate_object_id(creator.inner().domain().unwrap())?.into();
Ok(DbPost {
text,
ap_id: ObjectId::new(generate_object_id(creator.inner().domain().unwrap()).unwrap()),
ap_id,
creator,
local: true,
}
})
}
}
@ -33,18 +34,17 @@ impl MyPost {
pub struct Note {
#[serde(rename = "type")]
kind: NoteType,
id: ObjectId<MyPost>,
pub(crate) attributed_to: ObjectId<MyUser>,
id: ObjectId<DbPost>,
pub(crate) attributed_to: ObjectId<DbUser>,
#[serde(deserialize_with = "deserialize_one_or_many")]
pub(crate) to: Vec<Url>,
content: String,
}
#[async_trait::async_trait]
impl ApubObject for MyPost {
impl ApubObject for DbPost {
type DataType = DatabaseHandle;
type ApubType = Note;
type DbType = ();
type Error = Error;
async fn read_from_apub_id(
@ -72,7 +72,7 @@ impl ApubObject for MyPost {
apub: Self::ApubType,
data: &RequestData<Self::DataType>,
) -> Result<Self, Self::Error> {
let post = MyPost {
let post = DbPost {
text: apub.content,
ap_id: apub.id,
creator: apub.attributed_to,

View file

@ -1,66 +0,0 @@
use crate::{
error::Error,
instance::DatabaseHandle,
objects::person::{MyUser, PersonAcceptedActivities},
};
use activitypub_federation::{
core::{actix_web::inbox::receive_activity, object_id::ObjectId},
deser::context::WithContext,
request_data::{ApubContext, ApubMiddleware, RequestData},
traits::ApubObject,
APUB_JSON_CONTENT_TYPE,
};
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use tokio::task;
use url::Url;
pub fn listen(data: &ApubContext<DatabaseHandle>) -> Result<(), Error> {
let hostname = data.local_instance().hostname();
let data = data.clone();
let server = HttpServer::new(move || {
App::new()
.wrap(ApubMiddleware::new(data.clone()))
.route("/objects/{user_name}", web::get().to(http_get_user))
.service(
web::scope("")
// Just a single, global inbox for simplicity
.route("/inbox", web::post().to(http_post_user_inbox)),
)
})
.bind(hostname)?
.run();
task::spawn(server);
Ok(())
}
/// Handles requests to fetch user json over HTTP
pub async fn http_get_user(
request: HttpRequest,
data: RequestData<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let hostname: String = data.local_instance().hostname().to_string();
let request_url = format!("http://{}{}", hostname, &request.uri().to_string());
let url = Url::parse(&request_url)?;
let user = ObjectId::<MyUser>::new(url)
.dereference_local(&data)
.await?
.into_apub(&data)
.await?;
Ok(HttpResponse::Ok()
.content_type(APUB_JSON_CONTENT_TYPE)
.json(WithContext::new_default(user)))
}
/// Handles messages received in user inbox
pub async fn http_post_user_inbox(
request: HttpRequest,
payload: String,
data: RequestData<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let activity = serde_json::from_str(&payload)?;
receive_activity::<WithContext<PersonAcceptedActivities>, MyUser, DatabaseHandle>(
request, activity, &data,
)
.await
}

View file

@ -1,93 +0,0 @@
use crate::{
error::Error,
instance::DatabaseHandle,
objects::person::{MyUser, Person, PersonAcceptedActivities},
};
use activitypub_federation::{
core::{
axum::{inbox::receive_activity, json::ApubJson, verify_request_payload, DigestVerified},
object_id::ObjectId,
},
deser::context::WithContext,
request_data::{ApubContext, ApubMiddleware, RequestData},
traits::ApubObject,
};
use axum::{
body,
extract::OriginalUri,
middleware,
response::IntoResponse,
routing::{get, post},
Extension,
Json,
Router,
};
use http::{HeaderMap, Method, Request};
use hyper::Body;
use std::net::ToSocketAddrs;
use tokio::task;
use tower::ServiceBuilder;
use tower_http::{trace::TraceLayer, ServiceBuilderExt};
use url::Url;
pub fn listen(data: &ApubContext<DatabaseHandle>) -> Result<(), Error> {
let hostname = data.local_instance().hostname();
let data = data.clone();
let app = Router::new()
.route("/inbox", post(http_post_user_inbox))
.layer(
ServiceBuilder::new()
.map_request_body(body::boxed)
.layer(middleware::from_fn(verify_request_payload)),
)
.route("/objects/:user_name", get(http_get_user))
.layer(ApubMiddleware::new(data))
.layer(TraceLayer::new_for_http());
// run it
let addr = hostname
.to_socket_addrs()?
.next()
.expect("Failed to lookup domain name");
let server = axum::Server::bind(&addr).serve(app.into_make_service());
task::spawn(server);
Ok(())
}
async fn http_get_user(
data: RequestData<DatabaseHandle>,
request: Request<Body>,
) -> Result<ApubJson<WithContext<Person>>, Error> {
let hostname: String = data.local_instance().hostname().to_string();
let request_url = format!("http://{}{}", hostname, &request.uri());
let url = Url::parse(&request_url).expect("Failed to parse url");
let user = ObjectId::<MyUser>::new(url)
.dereference_local(&data)
.await?
.into_apub(&data)
.await?;
Ok(ApubJson(WithContext::new_default(user)))
}
async fn http_post_user_inbox(
headers: HeaderMap,
method: Method,
OriginalUri(uri): OriginalUri,
data: RequestData<DatabaseHandle>,
Extension(digest_verified): Extension<DigestVerified>,
Json(activity): Json<WithContext<PersonAcceptedActivities>>,
) -> impl IntoResponse {
receive_activity::<WithContext<PersonAcceptedActivities>, MyUser, DatabaseHandle>(
digest_verified,
activity,
&data,
headers,
method,
uri,
)
.await
}

View file

@ -1,68 +0,0 @@
use crate::{
generate_object_id,
objects::{note::MyPost, person::MyUser},
Error,
};
use activitypub_federation::{
core::signatures::generate_actor_keypair,
request_data::ApubContext,
FederationSettings,
InstanceConfig,
UrlVerifier,
};
use async_trait::async_trait;
use reqwest::Client;
use std::sync::{Arc, Mutex};
use url::Url;
pub type DatabaseHandle = Arc<Database>;
/// Our "database" which contains all known posts users (local and federated)
pub struct Database {
pub users: Mutex<Vec<MyUser>>,
pub posts: Mutex<Vec<MyPost>>,
}
/// Use this to store your federation blocklist, or a database connection needed to retrieve it.
#[derive(Clone)]
struct MyUrlVerifier();
#[async_trait]
impl UrlVerifier for MyUrlVerifier {
async fn verify(&self, url: &Url) -> Result<(), &'static str> {
if url.domain() == Some("malicious.com") {
Err("malicious domain")
} else {
Ok(())
}
}
}
pub fn listen(data: &ApubContext<DatabaseHandle>) -> Result<(), Error> {
#[cfg(feature = "actix-web")]
crate::actix_web::http::listen(data)?;
#[cfg(feature = "axum")]
crate::axum::http::listen(data)?;
Ok(())
}
impl Database {
pub fn new(hostname: String) -> Result<ApubContext<DatabaseHandle>, Error> {
let settings = FederationSettings::builder()
.debug(true)
.url_verifier(Box::new(MyUrlVerifier()))
.build()?;
let local_instance =
InstanceConfig::new(hostname.clone(), Client::default().into(), settings);
let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?);
let instance = Arc::new(Database {
users: Mutex::new(vec![local_user]),
posts: Mutex::new(vec![]),
});
Ok(ApubContext::new(instance, local_instance))
}
pub fn local_user(&self) -> MyUser {
self.users.lock().unwrap().first().cloned().unwrap()
}
}

230
src/config.rs Normal file
View file

@ -0,0 +1,230 @@
use crate::{
core::activity_queue::create_activity_queue,
request_data::RequestData,
traits::ActivityHandler,
utils::verify_domains_match,
Error,
};
use async_trait::async_trait;
use background_jobs::Manager;
use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone};
use reqwest_middleware::ClientWithMiddleware;
use serde::de::DeserializeOwned;
use std::{ops::Deref, sync::Arc, time::Duration};
use url::Url;
/// Various settings related to Activitypub federation.
///
/// Use [FederationSettings.builder()] to initialize this.
///
/// ```
/// # use activitypub_federation::config::FederationConfig;
/// # let _ = actix_rt::System::new();
/// let settings = FederationConfig::builder()
/// .hostname("example.com")
/// .app_data(())
/// .http_fetch_limit(50)
/// .worker_count(16)
/// .build()?;
/// # Ok::<(), anyhow::Error>(())
/// ```
#[derive(Builder, Clone)]
#[builder(build_fn(private, name = "partial_build"))]
pub struct FederationConfig<T: Clone> {
/// The domain where this federated instance is running
#[builder(setter(into))]
pub(crate) hostname: String,
/// Data which the application requires in handlers, such as database connection
/// or configuration.
pub(crate) app_data: T,
/// Maximum number of outgoing HTTP requests per incoming HTTP request. See
/// [crate::utils::fetch_object_http] for more details.
#[builder(default = "20")]
pub(crate) http_fetch_limit: i32,
#[builder(default = "reqwest::Client::default().into()")]
/// HTTP client used for all outgoing requests. Middleware can be used to add functionality
/// like log tracing or retry of failed requests.
pub(crate) client: ClientWithMiddleware,
/// Number of worker threads for sending outgoing activities
#[builder(default = "64")]
pub(crate) worker_count: u64,
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends
/// outgoing activities synchronously, not in background thread. This helps to make tests
/// more consistent.
/// Do not use for production.
#[builder(default = "false")]
pub(crate) debug: bool,
/// Timeout for all HTTP requests. HTTP signatures are valid for 10s, so it makes sense to
/// use the same as timeout when sending
#[builder(default = "Duration::from_secs(10)")]
pub(crate) request_timeout: Duration,
/// Function used to verify that urls are valid, See [UrlVerifier] for details.
#[builder(default = "Box::new(DefaultUrlVerifier())")]
pub(crate) url_verifier: Box<dyn UrlVerifier + Sync>,
/// Enable to sign HTTP signatures according to draft 10, which does not include (created) and
/// (expires) fields. This is required for compatibility with some software like Pleroma.
/// <https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-10>
/// <https://git.pleroma.social/pleroma/pleroma/-/issues/2939>
#[builder(default = "false")]
pub(crate) http_signature_compat: bool,
/// Queue for sending outgoing activities. Only optional to make builder work, its always
/// present once constructed.
#[builder(setter(skip))]
pub(crate) activity_queue: Option<Arc<Manager>>,
}
impl<T: Clone> FederationConfig<T> {
pub fn builder() -> FederationConfigBuilder<T> {
FederationConfigBuilder::default()
}
pub(crate) async fn verify_url_and_domain<Activity, Datatype>(
&self,
activity: &Activity,
) -> Result<(), Error>
where
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
{
verify_domains_match(activity.id(), activity.actor())?;
self.verify_url_valid(activity.id()).await?;
if self.is_local_url(activity.id()) {
return Err(Error::UrlVerificationError(
"Activity was sent from local instance",
));
}
Ok(())
}
/// Create new [RequestData] from this. You should prefer to use a middleware if possible.
pub fn to_request_data(&self) -> RequestData<T> {
RequestData {
config: self.clone(),
request_counter: Default::default(),
}
}
/// Perform some security checks on URLs as mentioned in activitypub spec, and call user-supplied
/// [`InstanceSettings.verify_url_function`].
///
/// https://www.w3.org/TR/activitypub/#security-considerations
pub(crate) async fn verify_url_valid(&self, url: &Url) -> Result<(), Error> {
match url.scheme() {
"https" => {}
"http" => {
if !self.debug {
return Err(Error::UrlVerificationError(
"Http urls are only allowed in debug mode",
));
}
}
_ => return Err(Error::UrlVerificationError("Invalid url scheme")),
};
if url.domain().is_none() {
return Err(Error::UrlVerificationError("Url must have a domain"));
}
if url.domain() == Some("localhost") && !self.debug {
return Err(Error::UrlVerificationError(
"Localhost is only allowed in debug mode",
));
}
self.url_verifier
.verify(url)
.await
.map_err(Error::UrlVerificationError)?;
Ok(())
}
/// Returns true if the url refers to this instance. Handles hostnames like `localhost:8540` for
/// local debugging.
pub(crate) fn is_local_url(&self, url: &Url) -> bool {
let mut domain = url.domain().expect("id has domain").to_string();
if let Some(port) = url.port() {
domain = format!("{}:{}", domain, port);
}
domain == self.hostname
}
/// Returns the local hostname
pub fn hostname(&self) -> &str {
&self.hostname
}
}
impl<T: Clone> FederationConfigBuilder<T> {
pub fn build(&mut self) -> Result<FederationConfig<T>, FederationConfigBuilderError> {
let mut config = self.partial_build()?;
let queue = create_activity_queue(
config.client.clone(),
config.worker_count,
config.request_timeout,
config.debug,
);
config.activity_queue = Some(Arc::new(queue));
Ok(config)
}
}
impl<T: Clone> Deref for FederationConfig<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.app_data
}
}
/// Handler for validating URLs.
///
/// This is used for implementing domain blocklists and similar functionality. It is called
/// with the ID of newly received activities, when fetching remote data from a given URL
/// and before sending an activity to a given inbox URL. If processing for this domain/URL should
/// be aborted, return an error. In case of `Ok(())`, processing continues.
///
/// ```
/// # use async_trait::async_trait;
/// use url::Url;
/// # use activitypub_federation::config::UrlVerifier;
/// # #[derive(Clone)]
/// # struct DatabaseConnection();
/// # async fn get_blocklist(_: &DatabaseConnection) -> Vec<String> {
/// # vec![]
/// # }
/// #[derive(Clone)]
/// struct Verifier {
/// db_connection: DatabaseConnection,
/// }
///
/// #[async_trait]
/// impl UrlVerifier for Verifier {
/// async fn verify(&self, url: &Url) -> Result<(), &'static str> {
/// let blocklist = get_blocklist(&self.db_connection).await;
/// let domain = url.domain().unwrap().to_string();
/// if blocklist.contains(&domain) {
/// Err("Domain is blocked")
/// } else {
/// Ok(())
/// }
/// }
/// }
/// ```
#[async_trait]
pub trait UrlVerifier: DynClone + Send {
async fn verify(&self, url: &Url) -> Result<(), &'static str>;
}
/// Default URL verifier which does nothing.
#[derive(Clone)]
struct DefaultUrlVerifier();
#[async_trait]
impl UrlVerifier for DefaultUrlVerifier {
async fn verify(&self, _url: &Url) -> Result<(), &'static str> {
Ok(())
}
}
clone_trait_object!(UrlVerifier);

View file

@ -1,10 +1,9 @@
use crate::{
core::signatures::{sign_request, PublicKey},
request_data::RequestData,
traits::ActivityHandler,
utils::reqwest_shim::ResponseExt,
Error,
FederationSettings,
InstanceConfig,
APUB_JSON_CONTENT_TYPE,
};
use anyhow::anyhow;
@ -30,8 +29,10 @@ use std::{
use tracing::{debug, info, warn};
use url::Url;
/// Send out the given activity to all inboxes, automatically generating the HTTP signatures. By
/// default, sending is done on a background thread, and automatically retried on failure with
/// Hands off an activity for delivery to remote actors.
///
/// Send out the given activity to all recipient inboxes, automatically generating the HTTP
/// signatures. In production builds, sending is done on a background thread, and automatically retried on failure with
/// exponential backoff.
///
/// - `activity`: The activity to be sent, gets converted to json
@ -39,28 +40,35 @@ use url::Url;
/// - `private_key`: The sending actor's private key for signing HTTP signature
/// - `recipients`: List of actors who should receive the activity. This gets deduplicated, and
/// local/invalid inbox urls removed
pub async fn send_activity<Activity>(
///
/// TODO: how can this only take a single pubkey? seems completely wrong, should be one per recipient
/// TODO: example
/// TODO: consider reading privkey from activity
pub async fn send_activity<Activity, T>(
activity: Activity,
public_key: PublicKey,
private_key: String,
recipients: Vec<Url>,
instance: &InstanceConfig,
data: &RequestData<T>,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler + Serialize,
<Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>,
T: Clone,
{
let config = &data.config;
let activity_id = activity.id();
let activity_serialized = serde_json::to_string_pretty(&activity)?;
let inboxes: Vec<Url> = recipients
.into_iter()
.unique()
.filter(|i| !instance.is_local_url(i))
.filter(|i| !config.is_local_url(i))
.collect();
let activity_queue = &instance.activity_queue;
// This field is only optional to make builder work, its always present at this point
let activity_queue = config.activity_queue.as_ref().unwrap();
for inbox in inboxes {
if instance.verify_url_valid(&inbox).await.is_err() {
if config.verify_url_valid(&inbox).await.is_err() {
continue;
}
@ -70,10 +78,10 @@ where
activity: activity_serialized.clone(),
public_key: public_key.clone(),
private_key: private_key.clone(),
http_signature_compat: instance.settings.http_signature_compat,
http_signature_compat: config.http_signature_compat,
};
if instance.settings.debug {
let res = do_send(message, &instance.client, instance.settings.request_timeout).await;
if config.debug {
let res = do_send(message, &config.client, config.request_timeout).await;
// Don't fail on error, as we intentionally do some invalid actions in tests, to verify that
// they are rejected on the receiving side. These errors shouldn't bubble up to make the API
// call fail. This matches the behaviour in production.
@ -90,7 +98,7 @@ where
stats.dead.this_hour(),
stats.complete.this_hour()
);
if stats.running as u64 == instance.settings.worker_count {
if stats.running as u64 == config.worker_count {
warn!("Maximum number of activitypub workers reached. Consider increasing worker count to avoid federation delays");
}
}
@ -211,20 +219,17 @@ fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
pub(crate) fn create_activity_queue(
client: ClientWithMiddleware,
settings: &FederationSettings,
worker_count: u64,
request_timeout: Duration,
debug: bool,
) -> Manager {
// queue is not used in debug mod, so dont create any workers to avoid log spam
let worker_count = if settings.debug {
0
} else {
settings.worker_count
};
let timeout = settings.request_timeout;
let worker_count = if debug { 0 } else { worker_count };
// Configure and start our workers
WorkerConfig::new_managed(Storage::new(ActixTimer), move |_| MyState {
client: client.clone(),
timeout,
timeout: request_timeout,
})
.register::<SendActivityTask>()
.set_worker_count("default", worker_count)

View file

@ -1,17 +1,20 @@
use crate::{
core::{object_id::ObjectId, signatures::verify_signature},
core::{
object_id::ObjectId,
signatures::{verify_inbox_hash, verify_signature},
},
request_data::RequestData,
traits::{ActivityHandler, Actor, ApubObject},
Error,
};
use actix_web::{HttpRequest, HttpResponse};
use actix_web::{web::Bytes, HttpRequest, HttpResponse};
use serde::de::DeserializeOwned;
use tracing::debug;
/// Receive an activity and perform some basic checks, including HTTP signature verification.
pub async fn receive_activity<Activity, ActorT, Datatype>(
request: HttpRequest,
activity: Activity,
body: Bytes,
data: &RequestData<Datatype>,
) -> Result<HttpResponse, <Activity as ActivityHandler>::Error>
where
@ -23,11 +26,12 @@ where
+ From<<ActorT as ApubObject>::Error>
+ From<serde_json::Error>,
<ActorT as ApubObject>::Error: From<Error> + From<anyhow::Error>,
Datatype: Clone,
{
data.local_instance()
.verify_url_and_domain(&activity)
.await?;
verify_inbox_hash(request.headers().get("Digest"), &body)?;
let activity: Activity = serde_json::from_slice(&body)?;
data.config.verify_url_and_domain(&activity).await?;
let actor = ObjectId::<ActorT>::new(activity.actor().clone())
.dereference(data)
.await?;
@ -43,3 +47,99 @@ where
activity.receive(data).await?;
Ok(HttpResponse::Ok().finish())
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
config::FederationConfig,
core::signatures::{sign_request, PublicKey},
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
};
use actix_web::test::TestRequest;
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use url::Url;
#[actix_rt::test]
async fn test_receive_activity() {
let (body, incoming_request, config) = setup_receive_test().await;
receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body.into(),
&config.to_request_data(),
)
.await
.unwrap();
}
#[actix_rt::test]
async fn test_receive_activity_invalid_body_signature() {
let (_, incoming_request, config) = setup_receive_test().await;
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
"invalid".into(),
&config.to_request_data(),
)
.await
.err()
.unwrap();
let e = err.root_cause().downcast_ref::<Error>().unwrap();
assert_eq!(e, &Error::ActivityBodyDigestInvalid)
}
#[actix_rt::test]
async fn test_receive_activity_invalid_path() {
let (body, incoming_request, config) = setup_receive_test().await;
let incoming_request = incoming_request.uri("/wrong");
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body.into(),
&config.to_request_data(),
)
.await
.err()
.unwrap();
let e = err.root_cause().downcast_ref::<Error>().unwrap();
assert_eq!(e, &Error::ActivitySignatureInvalid)
}
async fn setup_receive_test() -> (String, TestRequest, FederationConfig<DbConnection>) {
let request_builder =
ClientWithMiddleware::from(Client::default()).post("https://example.com/inbox");
let public_key = PublicKey::new_main_key(
Url::parse("https://example.com").unwrap(),
DB_USER_KEYPAIR.public_key.clone(),
);
let activity = Follow {
actor: "http://localhost:123".try_into().unwrap(),
object: "http://localhost:124".try_into().unwrap(),
kind: Default::default(),
id: "http://localhost:123/1".try_into().unwrap(),
};
let body = serde_json::to_string(&activity).unwrap();
let outgoing_request = sign_request(
request_builder,
body.to_string(),
public_key,
DB_USER_KEYPAIR.private_key.clone(),
false,
)
.await
.unwrap();
let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path());
for h in outgoing_request.headers() {
incoming_request = incoming_request.append_header(h);
}
let config = FederationConfig::builder()
.hostname("localhost:8002")
.app_data(DbConnection)
.debug(true)
.build()
.unwrap();
(body, incoming_request, config)
}
}

View file

@ -1,4 +1,7 @@
use crate::request_data::{ApubContext, ApubMiddleware, RequestData};
use crate::{
config::FederationConfig,
request_data::{ApubMiddleware, RequestData},
};
use actix_web::{
dev::{forward_ready, Payload, Service, ServiceRequest, ServiceResponse, Transform},
Error,
@ -24,19 +27,19 @@ where
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(ApubService {
service,
context: self.0.clone(),
config: self.0.clone(),
}))
}
}
pub struct ApubService<S, T>
pub struct ApubService<S, T: Clone>
where
S: Service<ServiceRequest, Error = Error>,
S::Future: 'static,
T: Sync,
{
service: S,
context: ApubContext<T>,
config: FederationConfig<T>,
}
impl<S, B, T> Service<ServiceRequest> for ApubService<S, T>
@ -53,7 +56,7 @@ where
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
req.extensions_mut().insert(self.context.clone());
req.extensions_mut().insert(self.config.clone());
self.service.call(req)
}
@ -64,7 +67,7 @@ impl<T: Clone + 'static> FromRequest for RequestData<T> {
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
ready(match req.extensions().get::<ApubContext<T>>() {
ready(match req.extensions().get::<FederationConfig<T>>() {
Some(c) => Ok(c.to_request_data()),
None => Err(actix_web::error::ErrorBadRequest(
"Missing extension, did you register ApubMiddleware?",

View file

@ -1,45 +0,0 @@
use axum::http::HeaderValue;
use sha2::{Digest, Sha256};
#[derive(Clone, Debug)]
pub struct DigestPart {
pub algorithm: String,
pub digest: String,
}
impl DigestPart {
pub fn try_from_header(h: &HeaderValue) -> Option<Vec<DigestPart>> {
let h = h.to_str().ok()?.split(';').next()?;
let v: Vec<_> = h
.split(',')
.filter_map(|p| {
let mut iter = p.splitn(2, '=');
iter.next()
.and_then(|alg| iter.next().map(|value| (alg, value)))
})
.map(|(alg, value)| DigestPart {
algorithm: alg.to_owned(),
digest: value.to_owned(),
})
.collect();
if v.is_empty() {
None
} else {
Some(v)
}
}
}
pub fn verify_sha256(digests: &[DigestPart], payload: &[u8]) -> bool {
let mut hasher = Sha256::new();
for part in digests {
hasher.update(payload);
if base64::encode(hasher.finalize_reset()) != part.digest {
return false;
}
}
true
}

View file

@ -1,21 +1,20 @@
use crate::{
core::{axum::DigestVerified, object_id::ObjectId, signatures::verify_signature},
core::{
axum::ActivityData,
object_id::ObjectId,
signatures::{verify_inbox_hash, verify_signature},
},
request_data::RequestData,
traits::{ActivityHandler, Actor, ApubObject},
Error,
};
use http::{HeaderMap, Method, Uri};
use serde::de::DeserializeOwned;
use tracing::debug;
/// Receive an activity and perform some basic checks, including HTTP signature verification.
pub async fn receive_activity<Activity, ActorT, Datatype>(
_digest_verified: DigestVerified,
activity: Activity,
activity_data: ActivityData,
data: &RequestData<Datatype>,
headers: HeaderMap,
method: Method,
uri: Uri,
) -> Result<(), <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
@ -26,18 +25,26 @@ where
+ From<<ActorT as ApubObject>::Error>
+ From<serde_json::Error>,
<ActorT as ApubObject>::Error: From<Error> + From<anyhow::Error>,
Datatype: Clone,
{
data.local_instance()
.verify_url_and_domain(&activity)
.await?;
verify_inbox_hash(activity_data.headers.get("Digest"), &activity_data.body)?;
let activity: Activity = serde_json::from_slice(&activity_data.body)?;
data.config.verify_url_and_domain(&activity).await?;
let actor = ObjectId::<ActorT>::new(activity.actor().clone())
.dereference(data)
.await?;
verify_signature(&headers, &method, &uri, actor.public_key())?;
verify_signature(
&activity_data.headers,
&activity_data.method,
&activity_data.uri,
actor.public_key(),
)?;
debug!("Receiving activity {}", activity.id().to_string());
activity.receive(data).await?;
Ok(())
}
// TODO: copy tests from actix-web inbox and implement for axum as well

View file

@ -6,13 +6,19 @@ use serde::Serialize;
/// A wrapper struct to respond with [`APUB_JSON_CONTENT_TYPE`]
/// in axum handlers
///
/// ## Example:
/// ```rust, no_run
/// use activitypub_federation::deser::context::WithContext;
/// async fn http_get_user() -> Result<ApubJson<WithContext<Person>>, Error> {
/// let user = WithContext::new_default(M);
/// ```
/// # use anyhow::Error;
/// # use axum::extract::Path;
/// # use activitypub_federation::core::axum::json::ApubJson;
/// # use activitypub_federation::protocol::context::WithContext;
/// # use activitypub_federation::request_data::RequestData;
/// # use activitypub_federation::traits::ApubObject;
/// # use activitypub_federation::traits::tests::{DbConnection, DbUser, Person};
/// async fn http_get_user(Path(name): Path<String>, data: RequestData<DbConnection>) -> Result<ApubJson<WithContext<Person>>, Error> {
/// let user: DbUser = data.read_local_user(name).await?;
/// let person = user.into_apub(&data).await?;
///
/// Ok(ApubJson(WithContext::new_default(MyUser::default())))
/// Ok(ApubJson(WithContext::new_default(person)))
/// }
/// ```
#[derive(Debug, Clone, Copy, Default)]

View file

@ -1,4 +1,7 @@
use crate::request_data::{ApubContext, ApubMiddleware, RequestData};
use crate::{
config::FederationConfig,
request_data::{ApubMiddleware, RequestData},
};
use axum::{async_trait, body::Body, extract::FromRequestParts, http::Request, response::Response};
use http::{request::Parts, StatusCode};
use std::task::{Context, Poll};
@ -10,15 +13,15 @@ impl<S, T: Clone> Layer<S> for ApubMiddleware<T> {
fn layer(&self, inner: S) -> Self::Service {
ApubService {
inner,
context: self.0.clone(),
config: self.0.clone(),
}
}
}
#[derive(Clone)]
pub struct ApubService<S, T> {
pub struct ApubService<S, T: Clone> {
inner: S,
context: ApubContext<T>,
config: FederationConfig<T>,
}
impl<S, T> Service<Request<Body>> for ApubService<S, T>
@ -36,7 +39,7 @@ where
}
fn call(&mut self, mut request: Request<Body>) -> Self::Future {
request.extensions_mut().insert(self.context.clone());
request.extensions_mut().insert(self.config.clone());
self.inner.call(request)
}
}
@ -50,8 +53,7 @@ where
type Rejection = (StatusCode, &'static str);
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
// TODO: need to set this extension from middleware
match parts.extensions.get::<ApubContext<T>>() {
match parts.extensions.get::<FederationConfig<T>>() {
Some(c) => Ok(c.to_request_data()),
None => Err((
StatusCode::INTERNAL_SERVER_ERROR,

View file

@ -1,81 +1,50 @@
use axum::{
async_trait,
body::{self, BoxBody, Bytes, Full},
body::{Bytes, HttpBody},
extract::FromRequest,
http::{Request, StatusCode},
middleware::Next,
response::{IntoResponse, Response},
};
use digest::{verify_sha256, DigestPart};
use http::{HeaderMap, Method, Uri};
mod digest;
pub mod inbox;
pub mod json;
pub mod middleware;
/// A request guard to ensure digest has been verified request has been
/// see [`receive_activity`]
#[derive(Clone)]
pub struct DigestVerified;
pub struct BufferRequestBody(pub Bytes);
pub async fn verify_request_payload(
request: Request<BoxBody>,
next: Next<BoxBody>,
) -> Result<impl IntoResponse, Response> {
let mut request = verify_payload(request).await?;
request.extensions_mut().insert(DigestVerified);
Ok(next.run(request).await)
}
async fn verify_payload(request: Request<BoxBody>) -> Result<Request<BoxBody>, Response> {
let (parts, body) = request.into_parts();
// this wont work if the body is an long running stream
let bytes = hyper::body::to_bytes(body)
.await
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;
match parts.headers.get("Digest") {
None => Err((
StatusCode::UNAUTHORIZED,
"Missing digest header".to_string(),
)
.into_response()),
Some(digest) => match DigestPart::try_from_header(digest) {
None => Err((
StatusCode::UNAUTHORIZED,
"Malformed digest header".to_string(),
)
.into_response()),
Some(digests) => {
if !verify_sha256(&digests, bytes.as_ref()) {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"Digest does not match payload".to_string(),
)
.into_response())
} else {
Ok(Request::from_parts(parts, body::boxed(Full::from(bytes))))
}
}
},
}
/// Contains everything that is necessary to verify HTTP signatures and receive an
/// activity, including the request body.
#[derive(Debug)]
pub struct ActivityData {
headers: HeaderMap,
method: Method,
uri: Uri,
body: Vec<u8>,
}
#[async_trait]
impl<S> FromRequest<S, BoxBody> for BufferRequestBody
impl<S, B> FromRequest<S, B> for ActivityData
where
Bytes: FromRequest<S, B>,
B: HttpBody + Send + 'static,
S: Send + Sync,
<B as HttpBody>::Error: std::fmt::Display,
<B as HttpBody>::Data: Send,
{
type Rejection = Response;
async fn from_request(req: Request<BoxBody>, state: &S) -> Result<Self, Self::Rejection> {
let body = Bytes::from_request(req, state)
.await
.map_err(IntoResponse::into_response)?;
async fn from_request(req: Request<B>, _state: &S) -> Result<Self, Self::Rejection> {
let (parts, body) = req.into_parts();
Ok(Self(body))
// this wont work if the body is an long running stream
let bytes = hyper::body::to_bytes(body)
.await
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;
Ok(Self {
headers: parts.headers,
method: parts.method,
uri: parts.uri,
body: bytes.to_vec(),
})
}
}

View file

@ -8,8 +8,41 @@ use std::{
};
use url::Url;
/// We store Url on the heap because it is quite large (88 bytes).
#[derive(Serialize, Deserialize, Debug)]
/// Typed wrapper for Activitypub Object ID.
///
/// It provides convenient methods for fetching the object from remote server or local database.
/// Objects are automatically cached locally, so they don't have to be fetched every time. Much of
/// the crate functionality relies on this wrapper.
///
/// Every time an object is fetched via HTTP, [RequestData.request_counter] is incremented by one.
/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with
/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers
/// infinite, recursive fetching of data.
///
/// ```
/// # use activitypub_federation::core::object_id::ObjectId;
/// # use activitypub_federation::config::FederationConfig;
/// # use activitypub_federation::Error::NotFound;
/// # use activitypub_federation::traits::tests::{DbConnection, DbUser};
/// # let _ = actix_rt::System::new();
/// # actix_rt::Runtime::new().unwrap().block_on(async {
/// # let db_connection = DbConnection;
/// let config = FederationConfig::builder()
/// .hostname("example.com")
/// .app_data(db_connection)
/// .build()?;
/// let request_data = config.to_request_data();
/// let object_id: ObjectId::<DbUser> = "https://lemmy.ml/u/nutomic".try_into()?;
/// // Attempt to fetch object from local database or fall back to remote server
/// let user = object_id.dereference(&request_data).await;
/// assert!(user.is_ok());
/// // Now you can also read the object from local database without network requests
/// let user = object_id.dereference_local(&request_data).await;
/// assert!(user.is_ok());
/// # Ok::<(), anyhow::Error>(())
/// # }).unwrap();
/// ```
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct ObjectId<Kind>(Box<Url>, PhantomData<Kind>)
where
@ -21,6 +54,7 @@ where
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
/// Construct a new objectid instance
pub fn new<T>(url: T) -> Self
where
T: Into<Url>,
@ -47,7 +81,7 @@ where
let db_object = self.dereference_from_db(data).await?;
// if its a local object, only fetch it from the database and not over http
if data.local_instance().is_local_url(&self.0) {
if data.config.is_local_url(&self.0) {
return match db_object {
None => Err(Error::NotFound.into()),
Some(o) => Ok(o),
@ -132,9 +166,6 @@ static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 20;
/// Determines when a remote actor should be refetched from its instance. In release builds, this is
/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
///
/// TODO it won't pick up new avatars, summaries etc until a day after.
/// Actors need an "update" activity pushed to other servers to fix this.
fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool {
let update_interval = if cfg!(debug_assertions) {
// avoid infinite loop when fetching community outbox
@ -151,10 +182,18 @@ where
Kind: ApubObject,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
#[allow(clippy::recursive_format_impl)]
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// Use to_string here because Url.display is not useful for us
write!(f, "{}", self.0)
write!(f, "{}", self.0.as_str())
}
}
impl<Kind> Debug for ObjectId<Kind>
where
Kind: ApubObject,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.as_str())
}
}
@ -178,6 +217,18 @@ where
}
}
impl<'a, Kind> TryFrom<&'a str> for ObjectId<Kind>
where
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
type Error = url::ParseError;
fn try_from(value: &'a str) -> Result<Self, Self::Error> {
Ok(ObjectId::new(Url::parse(value)?))
}
}
impl<Kind> PartialEq for ObjectId<Kind>
where
Kind: ApubObject,
@ -189,67 +240,28 @@ where
}
#[cfg(test)]
mod tests {
pub mod tests {
use super::*;
use crate::core::object_id::should_refetch_object;
use anyhow::Error;
#[derive(Debug, Clone)]
struct TestObject {}
#[async_trait::async_trait]
impl ApubObject for TestObject {
type DataType = TestObject;
type ApubType = ();
type DbType = ();
type Error = Error;
async fn read_from_apub_id(
_object_id: Url,
_data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error>
where
Self: Sized,
{
todo!()
}
async fn into_apub(
self,
_data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error> {
todo!()
}
async fn from_apub(
_apub: Self::ApubType,
_data: &RequestData<Self::DataType>,
) -> Result<Self, Self::Error>
where
Self: Sized,
{
todo!()
}
}
use crate::{core::object_id::should_refetch_object, traits::tests::DbUser};
#[test]
fn test_deserialize() {
let url = Url::parse("http://test.com/").unwrap();
let id = ObjectId::<TestObject>::new(url);
let id = ObjectId::<DbUser>::new(url);
let string = serde_json::to_string(&id).unwrap();
assert_eq!("\"http://test.com/\"", string);
let parsed: ObjectId<TestObject> = serde_json::from_str(&string).unwrap();
let parsed: ObjectId<DbUser> = serde_json::from_str(&string).unwrap();
assert_eq!(parsed, id);
}
#[test]
fn test_should_refetch_object() {
let one_second_ago = Utc::now().naive_utc() - ChronoDuration::seconds(1);
assert!(!should_refetch_object(one_second_ago));
assert_eq!(false, should_refetch_object(one_second_ago));
let two_days_ago = Utc::now().naive_utc() - ChronoDuration::days(2);
assert!(should_refetch_object(two_days_ago));
assert_eq!(true, should_refetch_object(two_days_ago));
}
}

View file

@ -1,4 +1,4 @@
use crate::utils::header_to_map;
use crate::{utils::header_to_map, Error, Error::ActivitySignatureInvalid};
use anyhow::anyhow;
use http::{header::HeaderName, uri::PathAndQuery, HeaderValue, Method, Uri};
use http_signature_normalization_reqwest::prelude::{Config, SignExt};
@ -13,7 +13,7 @@ use reqwest::Request;
use reqwest_middleware::RequestBuilder;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::io::{Error, ErrorKind};
use std::io::ErrorKind;
use tracing::debug;
use url::Url;
@ -26,15 +26,15 @@ pub struct Keypair {
pub public_key: String,
}
/// Generate the asymmetric keypair for ActivityPub HTTP signatures.
pub fn generate_actor_keypair() -> Result<Keypair, Error> {
/// Generate a random asymmetric keypair for ActivityPub HTTP signatures.
pub fn generate_actor_keypair() -> Result<Keypair, std::io::Error> {
let rsa = Rsa::generate(2048)?;
let pkey = PKey::from_rsa(rsa)?;
let public_key = pkey.public_key_to_pem()?;
let private_key = pkey.private_key_to_pem_pkcs8()?;
let key_to_string = |key| match String::from_utf8(key) {
Ok(s) => Ok(s),
Err(e) => Err(Error::new(
Err(e) => Err(std::io::Error::new(
ErrorKind::Other,
format!("Failed converting key to string: {}", e),
)),
@ -79,6 +79,8 @@ pub(crate) async fn sign_request(
.await
}
/// Public key of actors which is used for HTTP signatures. This needs to be federated in the
/// `public_key` field of all actors.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PublicKey {
@ -113,7 +115,7 @@ pub fn verify_signature<'a, H>(
method: &Method,
uri: &Uri,
public_key: &str,
) -> Result<(), anyhow::Error>
) -> Result<(), Error>
where
H: IntoIterator<Item = (&'a HeaderName, &'a HeaderValue)>,
{
@ -121,7 +123,8 @@ where
let path_and_query = uri.path_and_query().map(PathAndQuery::as_str).unwrap_or("");
let verified = CONFIG2
.begin_verify(method.as_str(), path_and_query, headers)?
.begin_verify(method.as_str(), path_and_query, headers)
.map_err(Error::conv)?
.verify(|signature, signing_string| -> anyhow::Result<bool> {
debug!(
"Verifying with key {}, message {}",
@ -131,12 +134,61 @@ where
let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?;
verifier.update(signing_string.as_bytes())?;
Ok(verifier.verify(&base64::decode(signature)?)?)
})?;
})
.map_err(Error::conv)?;
if verified {
debug!("verified signature for {}", uri);
Ok(())
} else {
Err(anyhow!("Invalid signature on request: {}", uri))
Err(ActivitySignatureInvalid)
}
}
#[derive(Clone, Debug)]
struct DigestPart {
pub algorithm: String,
pub digest: String,
}
impl DigestPart {
fn try_from_header(h: &HeaderValue) -> Option<Vec<DigestPart>> {
let h = h.to_str().ok()?.split(';').next()?;
let v: Vec<_> = h
.split(',')
.filter_map(|p| {
let mut iter = p.splitn(2, '=');
iter.next()
.and_then(|alg| iter.next().map(|value| (alg, value)))
})
.map(|(alg, value)| DigestPart {
algorithm: alg.to_owned(),
digest: value.to_owned(),
})
.collect();
if v.is_empty() {
None
} else {
Some(v)
}
}
}
/// Verify body of an inbox request against the hash provided in `Digest` header.
pub(crate) fn verify_inbox_hash(
digest_header: Option<&HeaderValue>,
body: &[u8],
) -> Result<(), crate::Error> {
let digests = DigestPart::try_from_header(digest_header.unwrap()).unwrap();
let mut hasher = Sha256::new();
for part in digests {
hasher.update(body);
if base64::encode(hasher.finalize_reset()) != part.digest {
return Err(crate::Error::ActivityBodyDigestInvalid);
}
}
Ok(())
}

View file

@ -1,83 +0,0 @@
use serde::{Deserialize, Deserializer};
/// Deserialize either a single json value, or a json array. In either case, the items are returned
/// as an array.
///
/// Usage:
/// `#[serde(deserialize_with = "deserialize_one_or_many")]`
pub fn deserialize_one_or_many<'de, T, D>(deserializer: D) -> Result<Vec<T>, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum OneOrMany<T> {
One(T),
Many(Vec<T>),
}
let result: OneOrMany<T> = Deserialize::deserialize(deserializer)?;
Ok(match result {
OneOrMany::Many(list) => list,
OneOrMany::One(value) => vec![value],
})
}
/// Deserialize either a single json value, or a json array with one element. In both cases it
/// returns an array with a single element.
///
/// Usage:
/// `#[serde(deserialize_with = "deserialize_one")]`
pub fn deserialize_one<'de, T, D>(deserializer: D) -> Result<[T; 1], D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum MaybeArray<T> {
Simple(T),
Array([T; 1]),
}
let result: MaybeArray<T> = Deserialize::deserialize(deserializer)?;
Ok(match result {
MaybeArray::Simple(value) => [value],
MaybeArray::Array(value) => value,
})
}
/// Attempts to deserialize the item. If any error happens, its ignored and the type's default
/// value is returned.
///
/// Usage:
/// `#[serde(deserialize_with = "deserialize_skip_error")]`
pub fn deserialize_skip_error<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: Deserialize<'de> + Default,
D: Deserializer<'de>,
{
let value = serde_json::Value::deserialize(deserializer)?;
let inner = T::deserialize(value).unwrap_or_default();
Ok(inner)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deserialize_skip_error() {
#[derive(Debug, Deserialize)]
pub struct MyData {
#[serde(deserialize_with = "deserialize_skip_error")]
pub data: Option<String>,
}
// data has type object
let _: MyData = serde_json::from_str(r#"{ "data": {} }"#).unwrap();
// data has type array
let _: MyData = serde_json::from_str(r#"{"data": []}"#).unwrap();
}
}

View file

@ -1,19 +1,9 @@
use crate::{
core::activity_queue::create_activity_queue,
traits::ActivityHandler,
utils::verify_domains_match,
};
use async_trait::async_trait;
use background_jobs::Manager;
use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone};
use reqwest_middleware::ClientWithMiddleware;
use serde::de::DeserializeOwned;
use std::time::Duration;
use url::Url;
//#![deny(missing_docs)]
pub use activitystreams_kinds as kinds;
pub mod config;
pub mod core;
pub mod deser;
pub mod protocol;
pub mod request_data;
pub mod traits;
pub mod utils;
@ -21,157 +11,10 @@ pub mod utils;
/// Mime type for Activitypub, used for `Accept` and `Content-Type` HTTP headers
pub static APUB_JSON_CONTENT_TYPE: &str = "application/activity+json";
/// Represents configuration for a single, federated instance. There should usually be only one of
/// this per application.
pub struct InstanceConfig {
hostname: String,
client: ClientWithMiddleware,
activity_queue: Manager,
settings: FederationSettings,
}
impl InstanceConfig {
async fn verify_url_and_domain<Activity, Datatype>(
&self,
activity: &Activity,
) -> Result<(), Error>
where
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
{
verify_domains_match(activity.id(), activity.actor())?;
self.verify_url_valid(activity.id()).await?;
if self.is_local_url(activity.id()) {
return Err(Error::UrlVerificationError(
"Activity was sent from local instance",
));
}
Ok(())
}
/// Perform some security checks on URLs as mentioned in activitypub spec, and call user-supplied
/// [`InstanceSettings.verify_url_function`].
///
/// https://www.w3.org/TR/activitypub/#security-considerations
async fn verify_url_valid(&self, url: &Url) -> Result<(), Error> {
match url.scheme() {
"https" => {}
"http" => {
if !self.settings.debug {
return Err(Error::UrlVerificationError(
"Http urls are only allowed in debug mode",
));
}
}
_ => return Err(Error::UrlVerificationError("Invalid url scheme")),
};
if url.domain().is_none() {
return Err(Error::UrlVerificationError("Url must have a domain"));
}
if url.domain() == Some("localhost") && !self.settings.debug {
return Err(Error::UrlVerificationError(
"Localhost is only allowed in debug mode",
));
}
self.settings
.url_verifier
.verify(url)
.await
.map_err(Error::UrlVerificationError)?;
Ok(())
}
}
#[async_trait]
pub trait UrlVerifier: DynClone + Send {
async fn verify(&self, url: &Url) -> Result<(), &'static str>;
}
clone_trait_object!(UrlVerifier);
/// Various settings related to Activitypub federation.
///
/// Use [FederationSettings.builder()] to initialize this.
#[derive(Builder)]
pub struct FederationSettings {
/// Maximum number of outgoing HTTP requests per incoming activity
#[builder(default = "20")]
http_fetch_limit: i32,
/// Number of worker threads for sending outgoing activities
#[builder(default = "64")]
worker_count: u64,
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends
/// outgoing activities synchronously, not in background thread. This helps to make tests
/// more consistent.
/// Do not use for production.
#[builder(default = "false")]
debug: bool,
/// Timeout for all HTTP requests. HTTP signatures are valid for 10s, so it makes sense to
/// use the same as timeout when sending
#[builder(default = "Duration::from_secs(10)")]
request_timeout: Duration,
/// Function used to verify that urls are valid, used when receiving activities or fetching remote
/// objects. Use this to implement functionality like federation blocklists. In case verification
/// fails, it should return an error message.
#[builder(default = "Box::new(DefaultUrlVerifier())")]
url_verifier: Box<dyn UrlVerifier + Sync>,
/// Enable to sign HTTP signatures according to draft 10, which does not include (created) and
/// (expires) fields. This is required for compatibility with some software like Pleroma.
/// https://datatracker.ietf.org/doc/html/draft-cavage-http-signatures-10
/// https://git.pleroma.social/pleroma/pleroma/-/issues/2939
#[builder(default = "false")]
http_signature_compat: bool,
}
impl FederationSettings {
/// Returns a new settings builder.
pub fn builder() -> FederationSettingsBuilder {
<_>::default()
}
}
#[derive(Clone)]
struct DefaultUrlVerifier();
#[async_trait]
impl UrlVerifier for DefaultUrlVerifier {
async fn verify(&self, _url: &Url) -> Result<(), &'static str> {
Ok(())
}
}
impl InstanceConfig {
pub fn new(domain: String, client: ClientWithMiddleware, settings: FederationSettings) -> Self {
let activity_queue = create_activity_queue(client.clone(), &settings);
InstanceConfig {
hostname: domain,
client,
activity_queue,
settings,
}
}
/// Returns true if the url refers to this instance. Handles hostnames like `localhost:8540` for
/// local debugging.
fn is_local_url(&self, url: &Url) -> bool {
let mut domain = url.domain().expect("id has domain").to_string();
if let Some(port) = url.port() {
domain = format!("{}:{}", domain, port);
}
domain == self.hostname
}
/// Returns the local hostname
pub fn hostname(&self) -> &str {
&self.hostname
}
}
/// Error messages returned by this library.
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Object was not found in database")]
#[error("Object was not found in local database")]
NotFound,
#[error("Request limit was reached during fetch")]
RequestLimit,
@ -181,6 +24,10 @@ pub enum Error {
ObjectDeleted,
#[error("{0}")]
UrlVerificationError(&'static str),
#[error("Incoming activity has invalid digest for body")]
ActivityBodyDigestInvalid,
#[error("incoming activity has invalid signature")]
ActivitySignatureInvalid,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@ -193,3 +40,9 @@ impl Error {
Error::Other(error.into())
}
}
impl PartialEq for Error {
fn eq(&self, other: &Self) -> bool {
std::mem::discriminant(self) == std::mem::discriminant(other)
}
}

View file

@ -1,5 +1,5 @@
use crate::{
deser::helpers::deserialize_one_or_many,
protocol::helpers::deserialize_one_or_many,
request_data::RequestData,
traits::ActivityHandler,
};
@ -8,10 +8,29 @@ use serde_json::Value;
use std::str::FromStr;
use url::Url;
/// Default context used in Activitypub
const DEFAULT_CONTEXT: &str = "[\"https://www.w3.org/ns/activitystreams\"]";
/// Simple wrapper which adds json-ld context to an object or activity. Doing it this way ensures
/// that nested objects dont have any context, but only the outermost one.
/// Wrapper for federated structs which handles `@context` field.
///
/// This wrapper can be used when sending Activitypub data, to automatically add `@context`. It
/// avoids having to repeat the `@context` property on every struct, and getting multiple contexts
/// in nested structs.
///
/// ```
/// # use activitypub_federation::protocol::context::WithContext;
/// #[derive(serde::Serialize)]
/// struct Note {
/// content: String
/// }
/// let note = Note {
/// content: "Hello world".to_string()
/// };
/// let note_with_context = WithContext::new_default(note);
/// let serialized = serde_json::to_string(&note_with_context)?;
/// assert_eq!(serialized, r#"{"@context":[["https://www.w3.org/ns/activitystreams"]],"content":"Hello world"}"#);
/// Ok::<(), serde_json::error::Error>(())
/// ```
#[derive(Serialize, Deserialize, Debug)]
pub struct WithContext<T> {
#[serde(rename = "@context")]
@ -22,11 +41,13 @@ pub struct WithContext<T> {
}
impl<T> WithContext<T> {
/// Create a new wrapper with the default Activitypub context.
pub fn new_default(inner: T) -> WithContext<T> {
let context = vec![Value::from_str(DEFAULT_CONTEXT).expect("valid context")];
WithContext::new(inner, context)
}
/// Create new wrapper with custom context. Use this in case you are implementing extensions.
pub fn new(inner: T, context: Vec<Value>) -> WithContext<T> {
WithContext { context, inner }
}
@ -39,7 +60,7 @@ impl<T> WithContext<T> {
#[async_trait::async_trait]
impl<T> ActivityHandler for WithContext<T>
where
T: ActivityHandler + Send + Sync,
T: ActivityHandler + Send,
{
type DataType = <T as ActivityHandler>::DataType;
type Error = <T as ActivityHandler>::Error;

115
src/protocol/helpers.rs Normal file
View file

@ -0,0 +1,115 @@
use serde::{Deserialize, Deserializer};
/// Deserialize JSON single value or array into Vec.
///
/// Useful if your application can handle multiple values for a field, but another federated
/// platform only sends a single one.
///
/// ```
/// # use activitypub_federation::protocol::helpers::deserialize_one_or_many;
/// # use url::Url;
/// #[derive(serde::Deserialize)]
/// struct Note {
/// #[serde(deserialize_with = "deserialize_one_or_many")]
/// to: Vec<Url>
/// }
///
/// let single: Note = serde_json::from_str(r#"{"to": "https://example.com/u/alice" }"#)?;
/// assert_eq!(single.to.len(), 1);
///
/// let multiple: Note = serde_json::from_str(
/// r#"{"to": [
/// "https://example.com/u/alice",
/// "https://lemmy.ml/u/bob"
/// ]}"#)?;
/// assert_eq!(multiple.to.len(), 2);
/// Ok::<(), anyhow::Error>(())
pub fn deserialize_one_or_many<'de, T, D>(deserializer: D) -> Result<Vec<T>, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum OneOrMany<T> {
One(T),
Many(Vec<T>),
}
let result: OneOrMany<T> = Deserialize::deserialize(deserializer)?;
Ok(match result {
OneOrMany::Many(list) => list,
OneOrMany::One(value) => vec![value],
})
}
/// Deserialize JSON single value or single element array into single value.
///
/// Useful if your application can only handle a single value for a field, but another federated
/// platform sends single value wrapped in array. Fails if array contains multiple items.
///
/// ```
/// # use activitypub_federation::protocol::helpers::deserialize_one;
/// # use url::Url;
/// #[derive(serde::Deserialize)]
/// struct Note {
/// #[serde(deserialize_with = "deserialize_one")]
/// to: Url
/// }
///
/// let note = serde_json::from_str::<Note>(r#"{"to": ["https://example.com/u/alice"] }"#);
/// assert!(note.is_ok());
pub fn deserialize_one<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum MaybeArray<T> {
Simple(T),
Array([T; 1]),
}
let result: MaybeArray<T> = Deserialize::deserialize(deserializer)?;
Ok(match result {
MaybeArray::Simple(value) => value,
MaybeArray::Array([value]) => value,
})
}
/// Attempts to deserialize item, in case of error falls back to the type's default value.
///
/// Useful for optional fields which are sent with a different type from another platform,
/// eg object instead of array. Should always be used together with `#[serde(default)]`, so
/// that a mssing value doesn't cause an error.
///
/// ```
/// # use activitypub_federation::protocol::helpers::deserialize_skip_error;
/// # use url::Url;
/// #[derive(serde::Deserialize)]
/// struct Note {
/// content: String,
/// #[serde(deserialize_with = "deserialize_skip_error", default)]
/// source: Option<String>
/// }
///
/// let note = serde_json::from_str::<Note>(
/// r#"{
/// "content": "How are you?",
/// "source": {
/// "content": "How are you?",
/// "mediaType": "text/markdown"
/// }
/// }"#);
/// assert_eq!(note.unwrap().source, None);
/// # Ok::<(), anyhow::Error>(())
pub fn deserialize_skip_error<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: Deserialize<'de> + Default,
D: Deserializer<'de>,
{
let value = serde_json::Value::deserialize(deserializer)?;
let inner = T::deserialize(value).unwrap_or_default();
Ok(inner)
}

View file

@ -14,7 +14,7 @@
//! ```
//! use serde_json::from_str;
//! use serde::{Deserialize, Serialize};
//! use activitypub_federation::deser::values::MediaTypeMarkdown;
//! use activitypub_federation::protocol::values::MediaTypeMarkdown;
//!
//! #[derive(Deserialize, Serialize)]
//! struct MyObject {

View file

@ -1,60 +1,23 @@
use crate::InstanceConfig;
use std::{
ops::Deref,
sync::{atomic::AtomicI32, Arc},
};
use crate::config::FederationConfig;
use std::{ops::Deref, sync::atomic::AtomicI32};
/// Stores context data which is necessary for the library to work.
#[derive(Clone)]
pub struct ApubContext<T> {
/// Data which the application requires in handlers, such as database connection
/// or configuration.
application_data: Arc<T>,
/// Configuration of this library.
pub(crate) local_instance: Arc<InstanceConfig>,
}
impl<T: Clone> ApubContext<T> {
pub fn new(state: T, local_instance: InstanceConfig) -> ApubContext<T> {
ApubContext {
application_data: Arc::new(state),
local_instance: Arc::new(local_instance),
}
}
pub fn local_instance(&self) -> &InstanceConfig {
self.local_instance.deref()
}
/// Create new [RequestData] from this. You should prefer to use a middleware if possible.
pub fn to_request_data(&self) -> RequestData<T> {
RequestData {
apub_context: self.clone(),
request_counter: AtomicI32::default(),
}
}
}
/// Stores data for handling one specific HTTP request. Most importantly this contains a
/// counter for outgoing HTTP requests. This is necessary to prevent denial of service attacks,
/// where an attacker triggers fetching of recursive objects.
/// Stores data for handling one specific HTTP request.
///
/// https://www.w3.org/TR/activitypub/#security-recursive-objects
pub struct RequestData<T> {
pub(crate) apub_context: ApubContext<T>,
/// Most importantly this contains a counter for outgoing HTTP requests. This is necessary to
/// prevent denial of service attacks, where an attacker triggers fetching of recursive objects.
///
/// <https://www.w3.org/TR/activitypub/#security-recursive-objects>
pub struct RequestData<T: Clone> {
pub(crate) config: FederationConfig<T>,
pub(crate) request_counter: AtomicI32,
}
impl<T> RequestData<T> {
pub fn local_instance(&self) -> &InstanceConfig {
self.apub_context.local_instance.deref()
impl<T: Clone> RequestData<T> {
pub fn app_data(&self) -> &T {
&self.config.app_data
}
}
impl<T: Clone> Deref for ApubContext<T> {
type Target = T;
fn deref(&self) -> &T {
&self.application_data
pub fn hostname(&self) -> &str {
&self.config.hostname
}
}
@ -62,15 +25,15 @@ impl<T: Clone> Deref for RequestData<T> {
type Target = T;
fn deref(&self) -> &T {
&self.apub_context.application_data
&self.config.app_data
}
}
#[derive(Clone)]
pub struct ApubMiddleware<T: Clone>(pub(crate) ApubContext<T>);
pub struct ApubMiddleware<T: Clone>(pub(crate) FederationConfig<T>);
impl<T: Clone> ApubMiddleware<T> {
pub fn new(apub_context: ApubContext<T>) -> Self {
ApubMiddleware(apub_context)
pub fn new(config: FederationConfig<T>) -> Self {
ApubMiddleware(config)
}
}

View file

@ -1,13 +1,169 @@
use crate::request_data::RequestData;
use async_trait::async_trait;
use chrono::NaiveDateTime;
use std::ops::Deref;
use url::Url;
/// Trait which allows verification and reception of incoming activities.
#[async_trait::async_trait]
/// Helper for converting between database structs and federated protocol structs.
///
/// ```
/// # use url::Url;
/// # use activitypub_federation::core::signatures::PublicKey;
/// # use activitypub_federation::request_data::RequestData;
/// # use activitypub_federation::traits::ApubObject;
/// # use activitypub_federation::traits::tests::{DbConnection, Person};
/// # pub struct DbUser {
/// # pub name: String,
/// # pub ap_id: Url,
/// # pub inbox: Url,
/// # pub public_key: String,
/// # pub local: bool,
/// # }
///
/// #[async_trait::async_trait]
/// impl ApubObject for DbUser {
/// type DataType = DbConnection;
/// type ApubType = Person;
/// type Error = anyhow::Error;
///
/// async fn read_from_apub_id(object_id: Url, data: &RequestData<Self::DataType>) -> Result<Option<Self>, Self::Error> {
/// // Attempt to read object from local database. Return Ok(None) if not found.
/// let user: Option<DbUser> = data.read_user_from_apub_id(object_id).await?;
/// Ok(user)
/// }
///
/// async fn into_apub(self, data: &RequestData<Self::DataType>) -> Result<Self::ApubType, Self::Error> {
/// // Called when a local object gets sent out over Activitypub. Simply convert it to the
/// // protocol struct
/// Ok(Person {
/// kind: Default::default(),
/// preferred_username: self.name,
/// id: self.ap_id.clone().into(),
/// inbox: self.inbox,
/// public_key: PublicKey::new_main_key(self.ap_id, self.public_key),
/// })
/// }
///
/// async fn from_apub(apub: Self::ApubType, data: &RequestData<Self::DataType>) -> Result<Self, Self::Error> {
/// // Called when a remote object gets received over Activitypub. Validate and insert it
/// // into the database.
///
/// let user = DbUser {
/// name: apub.preferred_username,
/// ap_id: apub.id.into_inner(),
/// inbox: apub.inbox,
/// public_key: apub.public_key.public_key_pem,
/// local: false,
/// };
///
/// // Make sure not to overwrite any local object
/// // TODO: this should be handled by library so the method doesnt get called for local object
/// if data.hostname() == user.ap_id.domain().unwrap() {
/// // Activitypub doesnt distinguish between creating and updating an object. Thats why we
/// // need to use upsert functionality here
/// data.upsert(&user).await?;
/// }
/// Ok(user)
/// }
///
/// }
#[async_trait]
pub trait ApubObject: Sized {
/// App data type passed to handlers. Must be identical to [crate::config::FederationConfig::app_data].
type DataType: Clone + Send + Sync;
/// The type of protocol struct which gets sent over network to federate this database struct.
type ApubType;
/// Error type returned by handler methods
type Error;
/// Returns the last time this object was updated.
///
/// Used to avoid refetching an object over HTTP every time it is dereferenced. Only called
/// for remote objects.
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
None
}
/// Try to read the object with given `id` from local database.
///
/// Should return `Ok(None)` if not found.
async fn read_from_apub_id(
object_id: Url,
data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error>;
/// Mark remote object as deleted in local database.
///
/// Called when a `Delete` activity is received, or if fetch returns a `Tombstone` object.
async fn delete(self, _data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
/// Convert database type to Activitypub type.
///
/// Called when a local object gets fetched by another instance over HTTP, or when an object
/// gets sent in an activity.
async fn into_apub(
self,
data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error>;
/// Convert object from ActivityPub type to database type.
///
/// Called when an object is received from HTTP fetch or as part of an activity. This method
/// should do verification and write the received object to database. Note that there is no
/// distinction between create and update, so an `upsert` operation should be used.
async fn from_apub(
apub: Self::ApubType,
data: &RequestData<Self::DataType>,
) -> Result<Self, Self::Error>;
}
/// Handler for receiving incoming activities.
///
/// ```
/// # use activitystreams_kinds::activity::FollowType;
/// # use url::Url;
/// # use activitypub_federation::core::object_id::ObjectId;
/// # use activitypub_federation::request_data::RequestData;
/// # use activitypub_federation::traits::ActivityHandler;
/// # use activitypub_federation::traits::tests::{DbConnection, DbUser};
/// #[derive(serde::Deserialize)]
/// struct Follow {
/// actor: ObjectId<DbUser>,
/// object: ObjectId<DbUser>,
/// #[serde(rename = "type")]
/// kind: FollowType,
/// id: Url,
/// }
///
/// #[async_trait::async_trait]
/// impl ActivityHandler for Follow {
/// type DataType = DbConnection;
/// type Error = anyhow::Error;
///
/// fn id(&self) -> &Url {
/// &self.id
/// }
///
/// fn actor(&self) -> &Url {
/// self.actor.inner()
/// }
///
/// async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
/// let local_user = self.object.dereference(data).await?;
/// let follower = self.actor.dereference(data).await?;
/// data.add_follower(local_user, follower).await?;
/// Ok(())
/// }
/// }
/// ```
#[async_trait]
#[enum_delegate::register]
pub trait ActivityHandler {
type DataType: Send + Sync;
/// App data type passed to handlers. Must be identical to [crate::config::FederationConfig::app_data].
type DataType: Clone + Send + Sync;
/// Error type returned by handler methods
type Error;
/// `id` field of the activity
@ -16,12 +172,34 @@ pub trait ActivityHandler {
/// `actor` field of activity
fn actor(&self) -> &Url;
/// Receives the activity and stores its action in database.
/// Called when an activity is received.
///
/// Should perform validation and possibly write action to the database. In case the activity
/// has a nested `object` field, must call `object.from_apub` handler.
async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error>;
}
/// Trait to allow retrieving common Actor data.
pub trait Actor: ApubObject {
/// The actor's public key for verification of HTTP signatures
fn public_key(&self) -> &str;
/// The inbox where activities for this user should be sent to
fn inbox(&self) -> Url;
/// The actor's shared inbox, if any
fn shared_inbox(&self) -> Option<Url> {
None
}
/// Returns shared inbox if it exists, normal inbox otherwise.
fn shared_inbox_or_inbox(&self) -> Url {
self.shared_inbox().unwrap_or_else(|| self.inbox())
}
}
/// Allow for boxing of enum variants
#[async_trait::async_trait]
#[async_trait]
impl<T> ActivityHandler for Box<T>
where
T: ActivityHandler + Send,
@ -42,69 +220,150 @@ where
}
}
#[async_trait::async_trait]
pub trait ApubObject {
type DataType: Send + Sync;
type ApubType;
type DbType;
type Error;
/// Some impls of these traits for use in tests. Dont use this from external crates.
///
/// TODO: Should be using `cfg[doctest]` but blocked by <https://github.com/rust-lang/rust/issues/67295>
#[doc(hidden)]
pub mod tests {
use super::*;
use crate::core::{
object_id::ObjectId,
signatures::{generate_actor_keypair, Keypair, PublicKey},
};
use activitystreams_kinds::{activity::FollowType, actor::PersonType};
use anyhow::Error;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
/// If the object is stored in the database, this method should return the fetch time. Used to
/// update actors after certain interval.
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
None
#[derive(Clone)]
pub struct DbConnection;
impl DbConnection {
pub async fn read_user_from_apub_id<T>(&self, _: Url) -> Result<Option<T>, Error> {
Ok(None)
}
pub async fn read_local_user(&self, _: String) -> Result<DbUser, Error> {
todo!()
}
pub async fn upsert<T>(&self, _: &T) -> Result<(), Error> {
Ok(())
}
pub async fn add_follower(&self, _: DbUser, _: DbUser) -> Result<(), Error> {
Ok(())
}
}
/// Try to read the object with given ID from local database. Returns Ok(None) if it doesn't exist.
async fn read_from_apub_id(
object_id: Url,
data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error>
where
Self: Sized;
/// Marks the object as deleted in local db. Called when a delete activity is received, or if
/// fetch returns a tombstone.
async fn delete(self, _data: &RequestData<Self::DataType>) -> Result<(), Self::Error>
where
Self: Sized,
{
Ok(())
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Person {
#[serde(rename = "type")]
pub kind: PersonType,
pub preferred_username: String,
pub id: ObjectId<DbUser>,
pub inbox: Url,
pub public_key: PublicKey,
}
#[derive(Debug, Clone)]
pub struct DbUser {
pub name: String,
pub ap_id: Url,
pub inbox: Url,
// exists for all users (necessary to verify http signatures)
pub public_key: String,
// exists only for local users
private_key: Option<String>,
pub followers: Vec<Url>,
pub local: bool,
}
/// Trait for converting an object or actor into the respective ActivityPub type.
async fn into_apub(
self,
data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error>;
pub static DB_USER_KEYPAIR: Lazy<Keypair> = Lazy::new(|| generate_actor_keypair().unwrap());
/// Converts an object from ActivityPub type to Lemmy internal type.
///
/// * `apub` The object to read from
/// * `context` LemmyContext which holds DB pool, HTTP client etc
/// * `expected_domain` Domain where the object was received from. None in case of mod action.
/// * `mod_action_allowed` True if the object can be a mod activity, ignore `expected_domain` in this case
async fn from_apub(
apub: Self::ApubType,
data: &RequestData<Self::DataType>,
) -> Result<Self, Self::Error>
where
Self: Sized;
}
#[async_trait]
impl ApubObject for DbUser {
type DataType = DbConnection;
type ApubType = Person;
type Error = Error;
pub trait Actor: ApubObject {
/// Returns the actor's public key for verification of HTTP signatures
fn public_key(&self) -> &str;
async fn read_from_apub_id(
object_id: Url,
_data: &RequestData<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
Ok(Some(DbUser {
name: "".to_string(),
ap_id: object_id.clone().into(),
inbox: object_id.into(),
public_key: DB_USER_KEYPAIR.public_key.clone(),
private_key: None,
followers: vec![],
local: false,
}))
}
/// The inbox where activities for this user should be sent to
fn inbox(&self) -> Url;
async fn into_apub(
self,
_data: &RequestData<Self::DataType>,
) -> Result<Self::ApubType, Self::Error> {
let public_key = PublicKey::new_main_key(self.ap_id.clone(), self.public_key.clone());
Ok(Person {
preferred_username: self.name.clone(),
kind: Default::default(),
id: self.ap_id.into(),
inbox: self.inbox,
public_key,
})
}
/// The actor's shared inbox, if any
fn shared_inbox(&self) -> Option<Url> {
None
async fn from_apub(
apub: Self::ApubType,
_data: &RequestData<Self::DataType>,
) -> Result<Self, Self::Error> {
Ok(DbUser {
name: apub.preferred_username,
ap_id: apub.id.into(),
inbox: apub.inbox,
public_key: apub.public_key.public_key_pem,
private_key: None,
followers: vec![],
local: false,
})
}
}
fn shared_inbox_or_inbox(&self) -> Url {
self.shared_inbox().unwrap_or_else(|| self.inbox())
impl Actor for DbUser {
fn public_key(&self) -> &str {
&self.public_key
}
fn inbox(&self) -> Url {
todo!()
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Follow {
pub actor: ObjectId<DbUser>,
pub object: ObjectId<DbUser>,
#[serde(rename = "type")]
pub kind: FollowType,
pub id: Url,
}
#[async_trait]
impl ActivityHandler for Follow {
type DataType = DbConnection;
type Error = Error;
fn id(&self) -> &Url {
&self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn receive(self, data: &RequestData<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
}
}

View file

@ -12,26 +12,36 @@ use url::Url;
pub(crate) mod reqwest_shim;
pub async fn fetch_object_http<T, Kind: DeserializeOwned>(
/// Fetch a remote object over HTTP and convert to `Kind`.
///
/// [crate::core::object_id::ObjectId::dereference] wraps this function to add caching and
/// conversion to database type. Only use this function directly in exceptional cases where that
/// behaviour is undesired.
///
/// Every time an object is fetched via HTTP, [RequestData.request_counter] is incremented by one.
/// If the value exceeds [FederationSettings.http_fetch_limit], the request is aborted with
/// [Error::RequestLimit]. This prevents denial of service attacks where an attack triggers
/// infinite, recursive fetching of data.
pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
url: &Url,
data: &RequestData<T>,
) -> Result<Kind, Error> {
let instance = &data.local_instance();
let config = &data.config;
// dont fetch local objects this way
debug_assert!(url.domain() != Some(&instance.hostname));
instance.verify_url_valid(url).await?;
debug_assert!(url.domain() != Some(&config.hostname));
config.verify_url_valid(url).await?;
info!("Fetching remote object {}", url.to_string());
let counter = data.request_counter.fetch_add(1, Ordering::SeqCst);
if counter > instance.settings.http_fetch_limit {
if counter > config.http_fetch_limit {
return Err(Error::RequestLimit);
}
let res = instance
let res = config
.client
.get(url.as_str())
.header("Accept", APUB_JSON_CONTENT_TYPE)
.timeout(instance.settings.request_timeout)
.timeout(config.request_timeout)
.send()
.await
.map_err(Error::conv)?;
@ -44,6 +54,15 @@ pub async fn fetch_object_http<T, Kind: DeserializeOwned>(
}
/// Check that both urls have the same domain. If not, return UrlVerificationError.
///
/// ```
/// # use url::Url;
/// # use activitypub_federation::utils::verify_domains_match;
/// let a = Url::parse("https://example.com/abc")?;
/// let b = Url::parse("https://sample.net/abc")?;
/// assert!(verify_domains_match(&a, &b).is_err());
/// # Ok::<(), url::ParseError>(())
/// ```
pub fn verify_domains_match(a: &Url, b: &Url) -> Result<(), Error> {
if a.domain() != b.domain() {
return Err(Error::UrlVerificationError("Domains do not match"));
@ -52,6 +71,15 @@ pub fn verify_domains_match(a: &Url, b: &Url) -> Result<(), Error> {
}
/// Check that both urls are identical. If not, return UrlVerificationError.
///
/// ```
/// # use url::Url;
/// # use activitypub_federation::utils::verify_urls_match;
/// let a = Url::parse("https://example.com/abc")?;
/// let b = Url::parse("https://example.com/123")?;
/// assert!(verify_urls_match(&a, &b).is_err());
/// # Ok::<(), url::ParseError>(())
/// ```
pub fn verify_urls_match(a: &Url, b: &Url) -> Result<(), Error> {
if a != b {
return Err(Error::UrlVerificationError("Urls do not match"));