From 83ad4bfdc16dc74f8df11ea12deacf4f132a34fa Mon Sep 17 00:00:00 2001 From: Nutomic Date: Sat, 11 Feb 2023 21:32:35 +0900 Subject: [PATCH] Merge handler params into single struct (#25) --- .../federation-actix/activities/accept.rs | 16 ++-- .../activities/create_note.rs | 14 ++-- .../federation-actix/activities/follow.rs | 23 ++---- examples/federation-actix/instance.rs | 62 ++++++--------- examples/federation-actix/main.rs | 16 ++-- examples/federation-actix/objects/note.rs | 15 ++-- examples/federation-actix/objects/person.rs | 48 ++++++------ examples/federation-axum/activities/accept.rs | 16 ++-- .../federation-axum/activities/create_note.rs | 14 ++-- examples/federation-axum/activities/follow.rs | 23 ++---- examples/federation-axum/instance.rs | 70 +++++++---------- examples/federation-axum/main.rs | 16 ++-- examples/federation-axum/objects/note.rs | 15 ++-- examples/federation-axum/objects/person.rs | 54 ++++++------- src/core/activity_queue.rs | 8 +- src/core/actix/inbox.rs | 19 ++--- src/core/actix/middleware.rs | 74 ++++++++++++++++++ src/core/actix/mod.rs | 1 + src/core/axum/inbox.rs | 15 ++-- src/core/axum/middleware.rs | 62 +++++++++++++++ src/core/axum/mod.rs | 1 + src/core/inbox.rs | 0 src/core/object_id.rs | 59 +++++++------- src/data.rs | 37 --------- src/deser/context.rs | 14 ++-- src/lib.rs | 28 +++---- src/request_data.rs | 76 +++++++++++++++++++ src/signature.rs | 0 src/traits.rs | 30 +++----- src/utils.rs | 19 +++-- 30 files changed, 489 insertions(+), 356 deletions(-) create mode 100644 src/core/actix/middleware.rs create mode 100644 src/core/axum/middleware.rs delete mode 100644 src/core/inbox.rs delete mode 100644 src/data.rs create mode 100644 src/request_data.rs delete mode 100644 src/signature.rs diff --git a/examples/federation-actix/activities/accept.rs b/examples/federation-actix/activities/accept.rs index a08048f..f3e1243 100644 --- a/examples/federation-actix/activities/accept.rs +++ b/examples/federation-actix/activities/accept.rs @@ -1,5 +1,9 @@ -use crate::{activities::follow::Follow, instance::InstanceHandle, objects::person::MyUser}; -use activitypub_federation::{core::object_id::ObjectId, data::Data, traits::ActivityHandler}; +use crate::{activities::follow::Follow, instance::DatabaseHandle, objects::person::MyUser}; +use activitypub_federation::{ + core::object_id::ObjectId, + request_data::RequestData, + traits::ActivityHandler, +}; use activitystreams_kinds::activity::AcceptType; use serde::{Deserialize, Serialize}; use url::Url; @@ -27,7 +31,7 @@ impl Accept { #[async_trait::async_trait] impl ActivityHandler for Accept { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type Error = crate::error::Error; fn id(&self) -> &Url { @@ -38,11 +42,7 @@ impl ActivityHandler for Accept { self.actor.inner() } - async fn receive( - self, - _data: &Data, - _request_counter: &mut i32, - ) -> Result<(), Self::Error> { + async fn receive(self, _data: &RequestData) -> Result<(), Self::Error> { Ok(()) } } diff --git a/examples/federation-actix/activities/create_note.rs b/examples/federation-actix/activities/create_note.rs index 7ec2060..ac35979 100644 --- a/examples/federation-actix/activities/create_note.rs +++ b/examples/federation-actix/activities/create_note.rs @@ -1,12 +1,12 @@ use crate::{ - instance::InstanceHandle, + instance::DatabaseHandle, objects::{note::Note, person::MyUser}, MyPost, }; use activitypub_federation::{ core::object_id::ObjectId, - data::Data, deser::helpers::deserialize_one_or_many, + request_data::RequestData, traits::{ActivityHandler, ApubObject}, }; use activitystreams_kinds::activity::CreateType; @@ -39,7 +39,7 @@ impl CreateNote { #[async_trait::async_trait] impl ActivityHandler for CreateNote { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type Error = crate::error::Error; fn id(&self) -> &Url { @@ -50,12 +50,8 @@ impl ActivityHandler for CreateNote { self.actor.inner() } - async fn receive( - self, - data: &Data, - request_counter: &mut i32, - ) -> Result<(), Self::Error> { - MyPost::from_apub(self.object, data, request_counter).await?; + async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { + MyPost::from_apub(self.object, data).await?; Ok(()) } } diff --git a/examples/federation-actix/activities/follow.rs b/examples/federation-actix/activities/follow.rs index 73a7f92..b430299 100644 --- a/examples/federation-actix/activities/follow.rs +++ b/examples/federation-actix/activities/follow.rs @@ -1,12 +1,12 @@ use crate::{ activities::accept::Accept, generate_object_id, - instance::InstanceHandle, + instance::DatabaseHandle, objects::person::MyUser, }; use activitypub_federation::{ core::object_id::ObjectId, - data::Data, + request_data::RequestData, traits::{ActivityHandler, Actor}, }; use activitystreams_kinds::activity::FollowType; @@ -36,7 +36,7 @@ impl Follow { #[async_trait::async_trait] impl ActivityHandler for Follow { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type Error = crate::error::Error; fn id(&self) -> &Url { @@ -49,11 +49,7 @@ impl ActivityHandler for Follow { // Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446 #[allow(clippy::await_holding_lock)] - async fn receive( - self, - data: &Data, - request_counter: &mut i32, - ) -> Result<(), Self::Error> { + async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { // add to followers let local_user = { let mut users = data.users.lock().unwrap(); @@ -63,18 +59,11 @@ impl ActivityHandler for Follow { }; // send back an accept - let follower = self - .actor - .dereference(data, data.local_instance(), request_counter) - .await?; + let follower = self.actor.dereference(data).await?; let id = generate_object_id(data.local_instance().hostname())?; let accept = Accept::new(local_user.ap_id.clone(), self, id.clone()); local_user - .send( - accept, - vec![follower.shared_inbox_or_inbox()], - data.local_instance(), - ) + .send(accept, vec![follower.shared_inbox_or_inbox()], data) .await?; Ok(()) } diff --git a/examples/federation-actix/instance.rs b/examples/federation-actix/instance.rs index 786771b..7ee2e47 100644 --- a/examples/federation-actix/instance.rs +++ b/examples/federation-actix/instance.rs @@ -6,40 +6,32 @@ use crate::{ person::{MyUser, PersonAcceptedActivities}, }, }; - use activitypub_federation::{ core::{ actix::inbox::receive_activity, object_id::ObjectId, signatures::generate_actor_keypair, }, - data::Data, deser::context::WithContext, + request_data::{ApubContext, ApubMiddleware, RequestData}, traits::ApubObject, - InstanceSettings, - LocalInstance, + FederationSettings, + InstanceConfig, UrlVerifier, APUB_JSON_CONTENT_TYPE, }; - use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer}; use async_trait::async_trait; use reqwest::Client; -use std::{ - ops::Deref, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; use tokio::task; use url::Url; -pub type InstanceHandle = Arc; +pub type DatabaseHandle = Arc; -pub struct Instance { - /// This holds all library data - local_instance: LocalInstance, - /// Our "database" which contains all known users (local and federated) +/// Our "database" which contains all known posts users (local and federated) +pub struct Database { pub users: Mutex>, - /// Same, but for posts pub posts: Mutex>, } @@ -58,37 +50,32 @@ impl UrlVerifier for MyUrlVerifier { } } -impl Instance { - pub fn new(hostname: String) -> Result { - let settings = InstanceSettings::builder() +impl Database { + pub fn new(hostname: String) -> Result, Error> { + let settings = FederationSettings::builder() .debug(true) .url_verifier(Box::new(MyUrlVerifier())) .build()?; let local_instance = - LocalInstance::new(hostname.clone(), Client::default().into(), settings); + InstanceConfig::new(hostname.clone(), Client::default().into(), settings); let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?); - let instance = Arc::new(Instance { - local_instance, + let instance = Arc::new(Database { users: Mutex::new(vec![local_user]), posts: Mutex::new(vec![]), }); - Ok(instance) + Ok(ApubContext::new(instance, local_instance)) } pub fn local_user(&self) -> MyUser { self.users.lock().unwrap().first().cloned().unwrap() } - pub fn local_instance(&self) -> &LocalInstance { - &self.local_instance - } - - pub fn listen(instance: &InstanceHandle) -> Result<(), Error> { - let hostname = instance.local_instance.hostname(); - let instance = instance.clone(); + pub fn listen(data: &ApubContext) -> Result<(), Error> { + let hostname = data.local_instance().hostname(); + let data = data.clone(); let server = HttpServer::new(move || { App::new() - .app_data(web::Data::new(instance.clone())) + .wrap(ApubMiddleware::new(data.clone())) .route("/objects/{user_name}", web::get().to(http_get_user)) .service( web::scope("") @@ -106,10 +93,9 @@ impl Instance { /// Handles requests to fetch user json over HTTP async fn http_get_user( request: HttpRequest, - data: web::Data, + data: RequestData, ) -> Result { - let data: InstanceHandle = data.into_inner().deref().clone(); - let hostname: String = data.local_instance.hostname().to_string(); + let hostname: String = data.local_instance().hostname().to_string(); let request_url = format!("http://{}{}", hostname, &request.uri().to_string()); let url = Url::parse(&request_url)?; let user = ObjectId::::new(url) @@ -127,15 +113,11 @@ async fn http_get_user( async fn http_post_user_inbox( request: HttpRequest, payload: String, - data: web::Data, + data: RequestData, ) -> Result { - let data: InstanceHandle = data.into_inner().deref().clone(); let activity = serde_json::from_str(&payload)?; - receive_activity::, MyUser, InstanceHandle>( - request, - activity, - &data.clone().local_instance, - &Data::new(data), + receive_activity::, MyUser, DatabaseHandle>( + request, activity, &data, ) .await } diff --git a/examples/federation-actix/main.rs b/examples/federation-actix/main.rs index afdacf7..dc95653 100644 --- a/examples/federation-actix/main.rs +++ b/examples/federation-actix/main.rs @@ -1,4 +1,4 @@ -use crate::{error::Error, instance::Instance, objects::note::MyPost, utils::generate_object_id}; +use crate::{error::Error, instance::Database, objects::note::MyPost, utils::generate_object_id}; use tracing::log::LevelFilter; mod activities; @@ -13,15 +13,15 @@ async fn main() -> Result<(), Error> { .filter_level(LevelFilter::Debug) .init(); - let alpha = Instance::new("localhost:8001".to_string())?; - let beta = Instance::new("localhost:8002".to_string())?; - Instance::listen(&alpha)?; - Instance::listen(&beta)?; + let alpha = Database::new("localhost:8001".to_string())?; + let beta = Database::new("localhost:8002".to_string())?; + Database::listen(&alpha)?; + Database::listen(&beta)?; // alpha user follows beta user alpha .local_user() - .follow(&beta.local_user(), &alpha) + .follow(&beta.local_user(), &alpha.to_request_data()) .await?; // assert that follow worked correctly assert_eq!( @@ -31,7 +31,9 @@ async fn main() -> Result<(), Error> { // beta sends a post to its followers let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id); - beta.local_user().post(sent_post.clone(), &beta).await?; + beta.local_user() + .post(sent_post.clone(), &beta.to_request_data()) + .await?; let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap(); // assert that alpha received the post diff --git a/examples/federation-actix/objects/note.rs b/examples/federation-actix/objects/note.rs index d6b58d6..4d7db9d 100644 --- a/examples/federation-actix/objects/note.rs +++ b/examples/federation-actix/objects/note.rs @@ -1,7 +1,8 @@ -use crate::{generate_object_id, instance::InstanceHandle, objects::person::MyUser}; +use crate::{generate_object_id, instance::DatabaseHandle, objects::person::MyUser}; use activitypub_federation::{ core::object_id::ObjectId, deser::helpers::deserialize_one_or_many, + request_data::RequestData, traits::ApubObject, }; use activitystreams_kinds::{object::NoteType, public}; @@ -41,19 +42,22 @@ pub struct Note { #[async_trait::async_trait] impl ApubObject for MyPost { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type ApubType = Note; type DbType = (); type Error = crate::error::Error; async fn read_from_apub_id( _object_id: Url, - _data: &Self::DataType, + _data: &RequestData, ) -> Result, Self::Error> { todo!() } - async fn into_apub(self, data: &Self::DataType) -> Result { + async fn into_apub( + self, + data: &RequestData, + ) -> Result { let creator = self.creator.dereference_local(data).await?; Ok(Note { kind: Default::default(), @@ -66,8 +70,7 @@ impl ApubObject for MyPost { async fn from_apub( apub: Self::ApubType, - data: &Self::DataType, - _request_counter: &mut i32, + data: &RequestData, ) -> Result { let post = MyPost { text: apub.content, diff --git a/examples/federation-actix/objects/person.rs b/examples/federation-actix/objects/person.rs index 311d4fa..fdf4442 100644 --- a/examples/federation-actix/objects/person.rs +++ b/examples/federation-actix/objects/person.rs @@ -1,7 +1,7 @@ use crate::{ activities::{accept::Accept, create_note::CreateNote, follow::Follow}, error::Error, - instance::InstanceHandle, + instance::DatabaseHandle, objects::note::MyPost, utils::generate_object_id, }; @@ -11,10 +11,9 @@ use activitypub_federation::{ object_id::ObjectId, signatures::{Keypair, PublicKey}, }, - data::Data, deser::context::WithContext, + request_data::RequestData, traits::{ActivityHandler, Actor, ApubObject}, - LocalInstance, }; use activitystreams_kinds::actor::PersonType; use serde::{Deserialize, Serialize}; @@ -81,30 +80,31 @@ impl MyUser { PublicKey::new_main_key(self.ap_id.clone().into_inner(), self.public_key.clone()) } - pub async fn follow(&self, other: &MyUser, instance: &InstanceHandle) -> Result<(), Error> { + pub async fn follow( + &self, + other: &MyUser, + instance: &RequestData, + ) -> Result<(), Error> { let id = generate_object_id(instance.local_instance().hostname())?; let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); - self.send( - follow, - vec![other.shared_inbox_or_inbox()], - instance.local_instance(), - ) - .await?; + self.send(follow, vec![other.shared_inbox_or_inbox()], instance) + .await?; Ok(()) } - pub async fn post(&self, post: MyPost, instance: &InstanceHandle) -> Result<(), Error> { + pub async fn post( + &self, + post: MyPost, + instance: &RequestData, + ) -> Result<(), Error> { let id = generate_object_id(instance.local_instance().hostname())?; let create = CreateNote::new(post.into_apub(instance).await?, id.clone()); let mut inboxes = vec![]; for f in self.followers.clone() { - let user: MyUser = ObjectId::new(f) - .dereference(instance, instance.local_instance(), &mut 0) - .await?; + let user: MyUser = ObjectId::new(f).dereference(instance).await?; inboxes.push(user.shared_inbox_or_inbox()); } - self.send(create, inboxes, instance.local_instance()) - .await?; + self.send(create, inboxes, instance).await?; Ok(()) } @@ -112,7 +112,7 @@ impl MyUser { &self, activity: Activity, recipients: Vec, - local_instance: &LocalInstance, + data: &RequestData, ) -> Result<(), ::Error> where Activity: ActivityHandler + Serialize + Send + Sync, @@ -124,7 +124,7 @@ impl MyUser { self.public_key(), self.private_key.clone().expect("has private key"), recipients, - local_instance, + data.local_instance(), ) .await?; Ok(()) @@ -133,14 +133,14 @@ impl MyUser { #[async_trait::async_trait] impl ApubObject for MyUser { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type ApubType = Person; type DbType = MyUser; type Error = crate::error::Error; async fn read_from_apub_id( object_id: Url, - data: &Self::DataType, + data: &RequestData, ) -> Result, Self::Error> { let users = data.users.lock().unwrap(); let res = users @@ -150,7 +150,10 @@ impl ApubObject for MyUser { Ok(res) } - async fn into_apub(self, _data: &Self::DataType) -> Result { + async fn into_apub( + self, + _data: &RequestData, + ) -> Result { Ok(Person { kind: Default::default(), id: self.ap_id.clone(), @@ -161,8 +164,7 @@ impl ApubObject for MyUser { async fn from_apub( apub: Self::ApubType, - _data: &Self::DataType, - _request_counter: &mut i32, + _data: &RequestData, ) -> Result { Ok(MyUser { ap_id: apub.id, diff --git a/examples/federation-axum/activities/accept.rs b/examples/federation-axum/activities/accept.rs index a08048f..f3e1243 100644 --- a/examples/federation-axum/activities/accept.rs +++ b/examples/federation-axum/activities/accept.rs @@ -1,5 +1,9 @@ -use crate::{activities::follow::Follow, instance::InstanceHandle, objects::person::MyUser}; -use activitypub_federation::{core::object_id::ObjectId, data::Data, traits::ActivityHandler}; +use crate::{activities::follow::Follow, instance::DatabaseHandle, objects::person::MyUser}; +use activitypub_federation::{ + core::object_id::ObjectId, + request_data::RequestData, + traits::ActivityHandler, +}; use activitystreams_kinds::activity::AcceptType; use serde::{Deserialize, Serialize}; use url::Url; @@ -27,7 +31,7 @@ impl Accept { #[async_trait::async_trait] impl ActivityHandler for Accept { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type Error = crate::error::Error; fn id(&self) -> &Url { @@ -38,11 +42,7 @@ impl ActivityHandler for Accept { self.actor.inner() } - async fn receive( - self, - _data: &Data, - _request_counter: &mut i32, - ) -> Result<(), Self::Error> { + async fn receive(self, _data: &RequestData) -> Result<(), Self::Error> { Ok(()) } } diff --git a/examples/federation-axum/activities/create_note.rs b/examples/federation-axum/activities/create_note.rs index 7ec2060..ac35979 100644 --- a/examples/federation-axum/activities/create_note.rs +++ b/examples/federation-axum/activities/create_note.rs @@ -1,12 +1,12 @@ use crate::{ - instance::InstanceHandle, + instance::DatabaseHandle, objects::{note::Note, person::MyUser}, MyPost, }; use activitypub_federation::{ core::object_id::ObjectId, - data::Data, deser::helpers::deserialize_one_or_many, + request_data::RequestData, traits::{ActivityHandler, ApubObject}, }; use activitystreams_kinds::activity::CreateType; @@ -39,7 +39,7 @@ impl CreateNote { #[async_trait::async_trait] impl ActivityHandler for CreateNote { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type Error = crate::error::Error; fn id(&self) -> &Url { @@ -50,12 +50,8 @@ impl ActivityHandler for CreateNote { self.actor.inner() } - async fn receive( - self, - data: &Data, - request_counter: &mut i32, - ) -> Result<(), Self::Error> { - MyPost::from_apub(self.object, data, request_counter).await?; + async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { + MyPost::from_apub(self.object, data).await?; Ok(()) } } diff --git a/examples/federation-axum/activities/follow.rs b/examples/federation-axum/activities/follow.rs index 73a7f92..b430299 100644 --- a/examples/federation-axum/activities/follow.rs +++ b/examples/federation-axum/activities/follow.rs @@ -1,12 +1,12 @@ use crate::{ activities::accept::Accept, generate_object_id, - instance::InstanceHandle, + instance::DatabaseHandle, objects::person::MyUser, }; use activitypub_federation::{ core::object_id::ObjectId, - data::Data, + request_data::RequestData, traits::{ActivityHandler, Actor}, }; use activitystreams_kinds::activity::FollowType; @@ -36,7 +36,7 @@ impl Follow { #[async_trait::async_trait] impl ActivityHandler for Follow { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type Error = crate::error::Error; fn id(&self) -> &Url { @@ -49,11 +49,7 @@ impl ActivityHandler for Follow { // Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446 #[allow(clippy::await_holding_lock)] - async fn receive( - self, - data: &Data, - request_counter: &mut i32, - ) -> Result<(), Self::Error> { + async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { // add to followers let local_user = { let mut users = data.users.lock().unwrap(); @@ -63,18 +59,11 @@ impl ActivityHandler for Follow { }; // send back an accept - let follower = self - .actor - .dereference(data, data.local_instance(), request_counter) - .await?; + let follower = self.actor.dereference(data).await?; let id = generate_object_id(data.local_instance().hostname())?; let accept = Accept::new(local_user.ap_id.clone(), self, id.clone()); local_user - .send( - accept, - vec![follower.shared_inbox_or_inbox()], - data.local_instance(), - ) + .send(accept, vec![follower.shared_inbox_or_inbox()], data) .await?; Ok(()) } diff --git a/examples/federation-axum/instance.rs b/examples/federation-axum/instance.rs index 1b7fc6f..c425111 100644 --- a/examples/federation-axum/instance.rs +++ b/examples/federation-axum/instance.rs @@ -3,26 +3,27 @@ use crate::{ generate_object_id, objects::{ note::MyPost, - person::{MyUser, PersonAcceptedActivities}, + person::{MyUser, Person, PersonAcceptedActivities}, }, }; - use activitypub_federation::{ - core::{object_id::ObjectId, signatures::generate_actor_keypair}, - data::Data, + core::{ + axum::{inbox::receive_activity, json::ApubJson, verify_request_payload, DigestVerified}, + object_id::ObjectId, + signatures::generate_actor_keypair, + }, deser::context::WithContext, + request_data::{ApubContext, ApubMiddleware, RequestData}, traits::ApubObject, - InstanceSettings, - LocalInstance, + FederationSettings, + InstanceConfig, UrlVerifier, }; - -use activitypub_federation::core::axum::{verify_request_payload, DigestVerified}; use async_trait::async_trait; use axum::{ body, body::Body, - extract::{Json, OriginalUri, State}, + extract::{Json, OriginalUri}, middleware, response::IntoResponse, routing::{get, post}, @@ -37,17 +38,14 @@ use std::{ }; use tokio::task; use tower::ServiceBuilder; -use tower_http::ServiceBuilderExt; +use tower_http::{trace::TraceLayer, ServiceBuilderExt}; use url::Url; -pub type InstanceHandle = Arc; +pub type DatabaseHandle = Arc; -pub struct Instance { - /// This holds all library data - local_instance: LocalInstance, - /// Our "database" which contains all known users (local and federated) +/// Our "database" which contains all known posts and users (local and federated) +pub struct Database { pub users: Mutex>, - /// Same, but for posts pub posts: Mutex>, } @@ -66,34 +64,29 @@ impl UrlVerifier for MyUrlVerifier { } } -impl Instance { - pub fn new(hostname: String) -> Result { - let settings = InstanceSettings::builder() +impl Database { + pub fn new(hostname: String) -> Result, Error> { + let settings = FederationSettings::builder() .debug(true) .url_verifier(Box::new(MyUrlVerifier())) .build()?; let local_instance = - LocalInstance::new(hostname.clone(), Client::default().into(), settings); + InstanceConfig::new(hostname.clone(), Client::default().into(), settings); let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?); - let instance = Arc::new(Instance { - local_instance, + let instance = Arc::new(Database { users: Mutex::new(vec![local_user]), posts: Mutex::new(vec![]), }); - Ok(instance) + Ok(ApubContext::new(instance, local_instance)) } pub fn local_user(&self) -> MyUser { self.users.lock().unwrap().first().cloned().unwrap() } - pub fn local_instance(&self) -> &LocalInstance { - &self.local_instance - } - - pub fn listen(instance: &InstanceHandle) -> Result<(), Error> { - let hostname = instance.local_instance.hostname(); - let instance = instance.clone(); + pub fn listen(data: &ApubContext) -> Result<(), Error> { + let hostname = data.local_instance().hostname(); + let data = data.clone(); let app = Router::new() .route("/inbox", post(http_post_user_inbox)) .layer( @@ -102,7 +95,7 @@ impl Instance { .layer(middleware::from_fn(verify_request_payload)), ) .route("/objects/:user_name", get(http_get_user)) - .with_state(instance) + .layer(ApubMiddleware::new(data)) .layer(TraceLayer::new_for_http()); // run it @@ -117,15 +110,11 @@ impl Instance { } } -use crate::objects::person::Person; -use activitypub_federation::core::axum::{inbox::receive_activity, json::ApubJson}; -use tower_http::trace::TraceLayer; - async fn http_get_user( - State(data): State, + data: RequestData, request: Request, ) -> Result>, Error> { - let hostname: String = data.local_instance.hostname().to_string(); + let hostname: String = data.local_instance().hostname().to_string(); let request_url = format!("http://{}{}", hostname, &request.uri()); let url = Url::parse(&request_url).expect("Failed to parse url"); @@ -143,15 +132,14 @@ async fn http_post_user_inbox( headers: HeaderMap, method: Method, OriginalUri(uri): OriginalUri, - State(data): State, + data: RequestData, Extension(digest_verified): Extension, Json(activity): Json>, ) -> impl IntoResponse { - receive_activity::, MyUser, InstanceHandle>( + receive_activity::, MyUser, DatabaseHandle>( digest_verified, activity, - &data.clone().local_instance, - &Data::new(data), + &data, headers, method, uri, diff --git a/examples/federation-axum/main.rs b/examples/federation-axum/main.rs index 3222bd4..8b01f9c 100644 --- a/examples/federation-axum/main.rs +++ b/examples/federation-axum/main.rs @@ -1,4 +1,4 @@ -use crate::{error::Error, instance::Instance, objects::note::MyPost, utils::generate_object_id}; +use crate::{error::Error, instance::Database, objects::note::MyPost, utils::generate_object_id}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod activities; @@ -18,15 +18,15 @@ async fn main() -> Result<(), Error> { .with(tracing_subscriber::fmt::layer()) .init(); - let alpha = Instance::new("localhost:8001".to_string())?; - let beta = Instance::new("localhost:8002".to_string())?; - Instance::listen(&alpha)?; - Instance::listen(&beta)?; + let alpha = Database::new("localhost:8001".to_string())?; + let beta = Database::new("localhost:8002".to_string())?; + Database::listen(&alpha)?; + Database::listen(&beta)?; // alpha user follows beta user alpha .local_user() - .follow(&beta.local_user(), &alpha) + .follow(&beta.local_user(), &alpha.to_request_data()) .await?; // assert that follow worked correctly @@ -37,7 +37,9 @@ async fn main() -> Result<(), Error> { // beta sends a post to its followers let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id); - beta.local_user().post(sent_post.clone(), &beta).await?; + beta.local_user() + .post(sent_post.clone(), &beta.to_request_data()) + .await?; let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap(); // assert that alpha received the post diff --git a/examples/federation-axum/objects/note.rs b/examples/federation-axum/objects/note.rs index d6b58d6..4d7db9d 100644 --- a/examples/federation-axum/objects/note.rs +++ b/examples/federation-axum/objects/note.rs @@ -1,7 +1,8 @@ -use crate::{generate_object_id, instance::InstanceHandle, objects::person::MyUser}; +use crate::{generate_object_id, instance::DatabaseHandle, objects::person::MyUser}; use activitypub_federation::{ core::object_id::ObjectId, deser::helpers::deserialize_one_or_many, + request_data::RequestData, traits::ApubObject, }; use activitystreams_kinds::{object::NoteType, public}; @@ -41,19 +42,22 @@ pub struct Note { #[async_trait::async_trait] impl ApubObject for MyPost { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type ApubType = Note; type DbType = (); type Error = crate::error::Error; async fn read_from_apub_id( _object_id: Url, - _data: &Self::DataType, + _data: &RequestData, ) -> Result, Self::Error> { todo!() } - async fn into_apub(self, data: &Self::DataType) -> Result { + async fn into_apub( + self, + data: &RequestData, + ) -> Result { let creator = self.creator.dereference_local(data).await?; Ok(Note { kind: Default::default(), @@ -66,8 +70,7 @@ impl ApubObject for MyPost { async fn from_apub( apub: Self::ApubType, - data: &Self::DataType, - _request_counter: &mut i32, + data: &RequestData, ) -> Result { let post = MyPost { text: apub.content, diff --git a/examples/federation-axum/objects/person.rs b/examples/federation-axum/objects/person.rs index 311d4fa..3476d93 100644 --- a/examples/federation-axum/objects/person.rs +++ b/examples/federation-axum/objects/person.rs @@ -1,7 +1,7 @@ use crate::{ activities::{accept::Accept, create_note::CreateNote, follow::Follow}, error::Error, - instance::InstanceHandle, + instance::DatabaseHandle, objects::note::MyPost, utils::generate_object_id, }; @@ -11,10 +11,9 @@ use activitypub_federation::{ object_id::ObjectId, signatures::{Keypair, PublicKey}, }, - data::Data, deser::context::WithContext, + request_data::RequestData, traits::{ActivityHandler, Actor, ApubObject}, - LocalInstance, }; use activitystreams_kinds::actor::PersonType; use serde::{Deserialize, Serialize}; @@ -81,30 +80,31 @@ impl MyUser { PublicKey::new_main_key(self.ap_id.clone().into_inner(), self.public_key.clone()) } - pub async fn follow(&self, other: &MyUser, instance: &InstanceHandle) -> Result<(), Error> { - let id = generate_object_id(instance.local_instance().hostname())?; + pub async fn follow( + &self, + other: &MyUser, + data: &RequestData, + ) -> Result<(), Error> { + let id = generate_object_id(data.local_instance().hostname())?; let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); - self.send( - follow, - vec![other.shared_inbox_or_inbox()], - instance.local_instance(), - ) - .await?; + self.send(follow, vec![other.shared_inbox_or_inbox()], data) + .await?; Ok(()) } - pub async fn post(&self, post: MyPost, instance: &InstanceHandle) -> Result<(), Error> { - let id = generate_object_id(instance.local_instance().hostname())?; - let create = CreateNote::new(post.into_apub(instance).await?, id.clone()); + pub async fn post( + &self, + post: MyPost, + data: &RequestData, + ) -> Result<(), Error> { + let id = generate_object_id(data.local_instance().hostname())?; + let create = CreateNote::new(post.into_apub(data).await?, id.clone()); let mut inboxes = vec![]; for f in self.followers.clone() { - let user: MyUser = ObjectId::new(f) - .dereference(instance, instance.local_instance(), &mut 0) - .await?; + let user: MyUser = ObjectId::new(f).dereference(data).await?; inboxes.push(user.shared_inbox_or_inbox()); } - self.send(create, inboxes, instance.local_instance()) - .await?; + self.send(create, inboxes, data).await?; Ok(()) } @@ -112,7 +112,7 @@ impl MyUser { &self, activity: Activity, recipients: Vec, - local_instance: &LocalInstance, + data: &RequestData, ) -> Result<(), ::Error> where Activity: ActivityHandler + Serialize + Send + Sync, @@ -124,7 +124,7 @@ impl MyUser { self.public_key(), self.private_key.clone().expect("has private key"), recipients, - local_instance, + data.local_instance(), ) .await?; Ok(()) @@ -133,14 +133,14 @@ impl MyUser { #[async_trait::async_trait] impl ApubObject for MyUser { - type DataType = InstanceHandle; + type DataType = DatabaseHandle; type ApubType = Person; type DbType = MyUser; type Error = crate::error::Error; async fn read_from_apub_id( object_id: Url, - data: &Self::DataType, + data: &RequestData, ) -> Result, Self::Error> { let users = data.users.lock().unwrap(); let res = users @@ -150,7 +150,10 @@ impl ApubObject for MyUser { Ok(res) } - async fn into_apub(self, _data: &Self::DataType) -> Result { + async fn into_apub( + self, + _data: &RequestData, + ) -> Result { Ok(Person { kind: Default::default(), id: self.ap_id.clone(), @@ -161,8 +164,7 @@ impl ApubObject for MyUser { async fn from_apub( apub: Self::ApubType, - _data: &Self::DataType, - _request_counter: &mut i32, + _data: &RequestData, ) -> Result { Ok(MyUser { ap_id: apub.id, diff --git a/src/core/activity_queue.rs b/src/core/activity_queue.rs index cac04c7..32ce6bd 100644 --- a/src/core/activity_queue.rs +++ b/src/core/activity_queue.rs @@ -3,8 +3,8 @@ use crate::{ traits::ActivityHandler, utils::reqwest_shim::ResponseExt, Error, - InstanceSettings, - LocalInstance, + FederationSettings, + InstanceConfig, APUB_JSON_CONTENT_TYPE, }; use anyhow::anyhow; @@ -44,7 +44,7 @@ pub async fn send_activity( public_key: PublicKey, private_key: String, recipients: Vec, - instance: &LocalInstance, + instance: &InstanceConfig, ) -> Result<(), ::Error> where Activity: ActivityHandler + Serialize, @@ -211,7 +211,7 @@ fn generate_request_headers(inbox_url: &Url) -> HeaderMap { pub(crate) fn create_activity_queue( client: ClientWithMiddleware, - settings: &InstanceSettings, + settings: &FederationSettings, ) -> Manager { // queue is not used in debug mod, so dont create any workers to avoid log spam let worker_count = if settings.debug { diff --git a/src/core/actix/inbox.rs b/src/core/actix/inbox.rs index 185fbe1..c47031f 100644 --- a/src/core/actix/inbox.rs +++ b/src/core/actix/inbox.rs @@ -1,12 +1,9 @@ use crate::{ - core::object_id::ObjectId, - data::Data, + core::{object_id::ObjectId, signatures::verify_signature}, + request_data::RequestData, traits::{ActivityHandler, Actor, ApubObject}, Error, - LocalInstance, }; - -use crate::core::signatures::verify_signature; use actix_web::{HttpRequest, HttpResponse}; use serde::de::DeserializeOwned; use tracing::debug; @@ -15,8 +12,7 @@ use tracing::debug; pub async fn receive_activity( request: HttpRequest, activity: Activity, - local_instance: &LocalInstance, - data: &Data, + data: &RequestData, ) -> Result::Error> where Activity: ActivityHandler + DeserializeOwned + Send + 'static, @@ -28,11 +24,12 @@ where + From, ::Error: From + From, { - local_instance.verify_url_and_domain(&activity).await?; + data.local_instance() + .verify_url_and_domain(&activity) + .await?; - let request_counter = &mut 0; let actor = ObjectId::::new(activity.actor().clone()) - .dereference(data, local_instance, request_counter) + .dereference(data) .await?; verify_signature( @@ -43,6 +40,6 @@ where )?; debug!("Receiving activity {}", activity.id().to_string()); - activity.receive(data, request_counter).await?; + activity.receive(data).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/src/core/actix/middleware.rs b/src/core/actix/middleware.rs new file mode 100644 index 0000000..58e4af9 --- /dev/null +++ b/src/core/actix/middleware.rs @@ -0,0 +1,74 @@ +use crate::request_data::{ApubContext, ApubMiddleware, RequestData}; +use actix_web::{ + dev::{forward_ready, Payload, Service, ServiceRequest, ServiceResponse, Transform}, + Error, + FromRequest, + HttpMessage, + HttpRequest, +}; +use std::future::{ready, Ready}; + +impl Transform for ApubMiddleware +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, + T: Clone + Sync + 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type Transform = ApubService; + type InitError = (); + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(ApubService { + service, + context: self.0.clone(), + })) + } +} + +pub struct ApubService +where + S: Service, + S::Future: 'static, + T: Sync, +{ + service: S, + context: ApubContext, +} + +impl Service for ApubService +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, + T: Clone + Sync + 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type Future = S::Future; + + forward_ready!(service); + + fn call(&self, req: ServiceRequest) -> Self::Future { + req.extensions_mut().insert(self.context.clone()); + + self.service.call(req) + } +} + +impl FromRequest for RequestData { + type Error = Error; + type Future = Ready>; + + fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future { + ready(match req.extensions().get::>() { + Some(c) => Ok(c.to_request_data()), + None => Err(actix_web::error::ErrorBadRequest( + "Missing extension, did you register ApubMiddleware?", + )), + }) + } +} diff --git a/src/core/actix/mod.rs b/src/core/actix/mod.rs index 730098e..c3d734a 100644 --- a/src/core/actix/mod.rs +++ b/src/core/actix/mod.rs @@ -1 +1,2 @@ pub mod inbox; +pub mod middleware; diff --git a/src/core/axum/inbox.rs b/src/core/axum/inbox.rs index da08cf8..ecf4f9c 100644 --- a/src/core/axum/inbox.rs +++ b/src/core/axum/inbox.rs @@ -1,9 +1,8 @@ use crate::{ core::{axum::DigestVerified, object_id::ObjectId, signatures::verify_signature}, - data::Data, + request_data::RequestData, traits::{ActivityHandler, Actor, ApubObject}, Error, - LocalInstance, }; use http::{HeaderMap, Method, Uri}; use serde::de::DeserializeOwned; @@ -13,8 +12,7 @@ use tracing::debug; pub async fn receive_activity( _digest_verified: DigestVerified, activity: Activity, - local_instance: &LocalInstance, - data: &Data, + data: &RequestData, headers: HeaderMap, method: Method, uri: Uri, @@ -29,16 +27,17 @@ where + From, ::Error: From + From, { - local_instance.verify_url_and_domain(&activity).await?; + data.local_instance() + .verify_url_and_domain(&activity) + .await?; - let request_counter = &mut 0; let actor = ObjectId::::new(activity.actor().clone()) - .dereference(data, local_instance, request_counter) + .dereference(data) .await?; verify_signature(&headers, &method, &uri, actor.public_key())?; debug!("Receiving activity {}", activity.id().to_string()); - activity.receive(data, request_counter).await?; + activity.receive(data).await?; Ok(()) } diff --git a/src/core/axum/middleware.rs b/src/core/axum/middleware.rs new file mode 100644 index 0000000..2a2d9e6 --- /dev/null +++ b/src/core/axum/middleware.rs @@ -0,0 +1,62 @@ +use crate::request_data::{ApubContext, ApubMiddleware, RequestData}; +use axum::{async_trait, body::Body, extract::FromRequestParts, http::Request, response::Response}; +use http::{request::Parts, StatusCode}; +use std::task::{Context, Poll}; +use tower::{Layer, Service}; + +impl Layer for ApubMiddleware { + type Service = ApubService; + + fn layer(&self, inner: S) -> Self::Service { + ApubService { + inner, + context: self.0.clone(), + } + } +} + +#[derive(Clone)] +pub struct ApubService { + inner: S, + context: ApubContext, +} + +impl Service> for ApubService +where + S: Service, Response = Response> + Send + 'static, + S::Future: Send + 'static, + T: Clone + Send + Sync + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut request: Request) -> Self::Future { + request.extensions_mut().insert(self.context.clone()); + self.inner.call(request) + } +} + +#[async_trait] +impl FromRequestParts for RequestData +where + S: Send + Sync, + T: Send + Sync, +{ + type Rejection = (StatusCode, &'static str); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { + // TODO: need to set this extension from middleware + match parts.extensions.get::>() { + Some(c) => Ok(c.to_request_data()), + None => Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Missing extension, did you register ApubMiddleware?", + )), + } + } +} diff --git a/src/core/axum/mod.rs b/src/core/axum/mod.rs index ad6c81b..90c205c 100644 --- a/src/core/axum/mod.rs +++ b/src/core/axum/mod.rs @@ -11,6 +11,7 @@ use digest::{verify_sha256, DigestPart}; mod digest; pub mod inbox; pub mod json; +pub mod middleware; /// A request guard to ensure digest has been verified request has been /// see [`receive_activity`] diff --git a/src/core/inbox.rs b/src/core/inbox.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/core/object_id.rs b/src/core/object_id.rs index 5b74e31..ec3010f 100644 --- a/src/core/object_id.rs +++ b/src/core/object_id.rs @@ -1,4 +1,4 @@ -use crate::{traits::ApubObject, utils::fetch_object_http, Error, LocalInstance}; +use crate::{request_data::RequestData, traits::ApubObject, utils::fetch_object_http, Error}; use anyhow::anyhow; use chrono::{Duration as ChronoDuration, NaiveDateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -13,7 +13,7 @@ use url::Url; #[serde(transparent)] pub struct ObjectId(Box, PhantomData) where - Kind: ApubObject + Send + 'static, + Kind: ApubObject, for<'de2> ::ApubType: serde::Deserialize<'de2>; impl ObjectId @@ -39,9 +39,7 @@ where /// Fetches an activitypub object, either from local database (if possible), or over http. pub async fn dereference( &self, - data: &::DataType, - instance: &LocalInstance, - request_counter: &mut i32, + data: &RequestData<::DataType>, ) -> Result::Error> where ::Error: From + From, @@ -49,7 +47,7 @@ where let db_object = self.dereference_from_db(data).await?; // if its a local object, only fetch it from the database and not over http - if instance.is_local_url(&self.0) { + if data.local_instance().is_local_url(&self.0) { return match db_object { None => Err(Error::NotFound.into()), Some(o) => Ok(o), @@ -61,17 +59,14 @@ where // object is old and should be refetched if let Some(last_refreshed_at) = object.last_refreshed_at() { if should_refetch_object(last_refreshed_at) { - return self - .dereference_from_http(data, instance, request_counter, Some(object)) - .await; + return self.dereference_from_http(data, Some(object)).await; } } Ok(object) } // object not found, need to fetch over http else { - self.dereference_from_http(data, instance, request_counter, None) - .await + self.dereference_from_http(data, None).await } } @@ -79,7 +74,7 @@ where /// the object is not found in the database. pub async fn dereference_local( &self, - data: &::DataType, + data: &RequestData<::DataType>, ) -> Result::Error> where ::Error: From, @@ -91,7 +86,7 @@ where /// returning none means the object was not found in local db async fn dereference_from_db( &self, - data: &::DataType, + data: &RequestData<::DataType>, ) -> Result, ::Error> { let id = self.0.clone(); ApubObject::read_from_apub_id(*id, data).await @@ -99,15 +94,13 @@ where async fn dereference_from_http( &self, - data: &::DataType, - instance: &LocalInstance, - request_counter: &mut i32, + data: &RequestData<::DataType>, db_object: Option, ) -> Result::Error> where ::Error: From + From, { - let res = fetch_object_http(&self.0, instance, request_counter).await; + let res = fetch_object_http(&self.0, data).await; if let Err(Error::ObjectDeleted) = &res { if let Some(db_object) = db_object { @@ -118,14 +111,14 @@ where let res2 = res?; - Kind::from_apub(res2, data, request_counter).await + Kind::from_apub(res2, data).await } } /// Need to implement clone manually, to avoid requiring Kind to be Clone impl Clone for ObjectId where - Kind: ApubObject + Send + 'static, + Kind: ApubObject, for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn clone(&self) -> Self { @@ -155,7 +148,7 @@ fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool { impl Display for ObjectId where - Kind: ApubObject + Send + 'static, + Kind: ApubObject, for<'de2> ::ApubType: serde::Deserialize<'de2>, { #[allow(clippy::recursive_format_impl)] @@ -167,7 +160,7 @@ where impl From> for Url where - Kind: ApubObject + Send + 'static, + Kind: ApubObject, for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn from(id: ObjectId) -> Self { @@ -175,10 +168,20 @@ where } } -impl PartialEq for ObjectId +impl From for ObjectId where Kind: ApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>, +{ + fn from(url: Url) -> Self { + ObjectId::new(url) + } +} + +impl PartialEq for ObjectId +where + Kind: ApubObject, + for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn eq(&self, other: &Self) -> bool { self.0.eq(&other.0) && self.1 == other.1 @@ -191,7 +194,7 @@ mod tests { use crate::core::object_id::should_refetch_object; use anyhow::Error; - #[derive(Debug)] + #[derive(Debug, Clone)] struct TestObject {} #[async_trait::async_trait] @@ -203,7 +206,7 @@ mod tests { async fn read_from_apub_id( _object_id: Url, - _data: &Self::DataType, + _data: &RequestData, ) -> Result, Self::Error> where Self: Sized, @@ -211,14 +214,16 @@ mod tests { todo!() } - async fn into_apub(self, _data: &Self::DataType) -> Result { + async fn into_apub( + self, + _data: &RequestData, + ) -> Result { todo!() } async fn from_apub( _apub: Self::ApubType, - _data: &Self::DataType, - _request_counter: &mut i32, + _data: &RequestData, ) -> Result where Self: Sized, diff --git a/src/data.rs b/src/data.rs deleted file mode 100644 index 0b21330..0000000 --- a/src/data.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::{ops::Deref, sync::Arc}; - -/// This type can be used to pass your own data into library functions and traits. It can be useful -/// to pass around database connections or other context. -#[derive(Debug)] -pub struct Data(Arc); - -impl Data { - /// Create new `Data` instance. - pub fn new(state: T) -> Data { - Data(Arc::new(state)) - } - - /// Get reference to inner app data. - pub fn get_ref(&self) -> &T { - self.0.as_ref() - } - - /// Convert to the internal Arc - pub fn into_inner(self) -> Arc { - self.0 - } -} - -impl Deref for Data { - type Target = Arc; - - fn deref(&self) -> &Arc { - &self.0 - } -} - -impl Clone for Data { - fn clone(&self) -> Data { - Data(self.0.clone()) - } -} diff --git a/src/deser/context.rs b/src/deser/context.rs index 73e44c0..8981361 100644 --- a/src/deser/context.rs +++ b/src/deser/context.rs @@ -1,4 +1,8 @@ -use crate::{data::Data, deser::helpers::deserialize_one_or_many, traits::ActivityHandler}; +use crate::{ + deser::helpers::deserialize_one_or_many, + request_data::RequestData, + traits::ActivityHandler, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::str::FromStr; @@ -48,11 +52,7 @@ where self.inner.actor() } - async fn receive( - self, - data: &Data, - request_counter: &mut i32, - ) -> Result<(), Self::Error> { - self.inner.receive(data, request_counter).await + async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { + self.inner.receive(data).await } } diff --git a/src/lib.rs b/src/lib.rs index a1f0148..378f904 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,24 +13,24 @@ use std::time::Duration; use url::Url; pub mod core; -pub mod data; pub mod deser; +pub mod request_data; pub mod traits; pub mod utils; /// Mime type for Activitypub, used for `Accept` and `Content-Type` HTTP headers pub static APUB_JSON_CONTENT_TYPE: &str = "application/activity+json"; -/// Represents a single, federated instance (for example lemmy.ml). There should only be one of -/// this in your application (except for testing). -pub struct LocalInstance { +/// Represents configuration for a single, federated instance. There should usually be only one of +/// this per application. +pub struct InstanceConfig { hostname: String, client: ClientWithMiddleware, activity_queue: Manager, - settings: InstanceSettings, + settings: FederationSettings, } -impl LocalInstance { +impl InstanceConfig { async fn verify_url_and_domain( &self, activity: &Activity, @@ -92,9 +92,11 @@ pub trait UrlVerifier: DynClone + Send { } clone_trait_object!(UrlVerifier); -// Use InstanceSettingsBuilder to initialize this +/// Various settings related to Activitypub federation. +/// +/// Use [FederationSettings.builder()] to initialize this. #[derive(Builder)] -pub struct InstanceSettings { +pub struct FederationSettings { /// Maximum number of outgoing HTTP requests per incoming activity #[builder(default = "20")] http_fetch_limit: i32, @@ -124,9 +126,9 @@ pub struct InstanceSettings { http_signature_compat: bool, } -impl InstanceSettings { +impl FederationSettings { /// Returns a new settings builder. - pub fn builder() -> InstanceSettingsBuilder { + pub fn builder() -> FederationSettingsBuilder { <_>::default() } } @@ -141,10 +143,10 @@ impl UrlVerifier for DefaultUrlVerifier { } } -impl LocalInstance { - pub fn new(domain: String, client: ClientWithMiddleware, settings: InstanceSettings) -> Self { +impl InstanceConfig { + pub fn new(domain: String, client: ClientWithMiddleware, settings: FederationSettings) -> Self { let activity_queue = create_activity_queue(client.clone(), &settings); - LocalInstance { + InstanceConfig { hostname: domain, client, activity_queue, diff --git a/src/request_data.rs b/src/request_data.rs new file mode 100644 index 0000000..57656c9 --- /dev/null +++ b/src/request_data.rs @@ -0,0 +1,76 @@ +use crate::InstanceConfig; +use std::{ + ops::Deref, + sync::{atomic::AtomicI32, Arc}, +}; + +/// Stores context data which is necessary for the library to work. +#[derive(Clone)] +pub struct ApubContext { + /// Data which the application requires in handlers, such as database connection + /// or configuration. + application_data: Arc, + /// Configuration of this library. + pub(crate) local_instance: Arc, +} + +impl ApubContext { + pub fn new(state: T, local_instance: InstanceConfig) -> ApubContext { + ApubContext { + application_data: Arc::new(state), + local_instance: Arc::new(local_instance), + } + } + pub fn local_instance(&self) -> &InstanceConfig { + self.local_instance.deref() + } + + /// Create new [RequestData] from this. You should prefer to use a middleware if possible. + pub fn to_request_data(&self) -> RequestData { + RequestData { + apub_context: self.clone(), + request_counter: AtomicI32::default(), + } + } +} + +/// Stores data for handling one specific HTTP request. Most importantly this contains a +/// counter for outgoing HTTP requests. This is necessary to prevent denial of service attacks, +/// where an attacker triggers fetching of recursive objects. +/// +/// https://www.w3.org/TR/activitypub/#security-recursive-objects +pub struct RequestData { + pub(crate) apub_context: ApubContext, + pub(crate) request_counter: AtomicI32, +} + +impl RequestData { + pub fn local_instance(&self) -> &InstanceConfig { + self.apub_context.local_instance.deref() + } +} + +impl Deref for ApubContext { + type Target = T; + + fn deref(&self) -> &T { + &self.application_data + } +} + +impl Deref for RequestData { + type Target = T; + + fn deref(&self) -> &T { + &self.apub_context.application_data + } +} + +#[derive(Clone)] +pub struct ApubMiddleware(pub(crate) ApubContext); + +impl ApubMiddleware { + pub fn new(apub_context: ApubContext) -> Self { + ApubMiddleware(apub_context) + } +} diff --git a/src/signature.rs b/src/signature.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/traits.rs b/src/traits.rs index 7a21bba..4cb48f0 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -1,4 +1,4 @@ -use crate::data::Data; +use crate::request_data::RequestData; use chrono::NaiveDateTime; use std::ops::Deref; use url::Url; @@ -17,18 +17,14 @@ pub trait ActivityHandler { fn actor(&self) -> &Url; /// Receives the activity and stores its action in database. - async fn receive( - self, - data: &Data, - request_counter: &mut i32, - ) -> Result<(), Self::Error>; + async fn receive(self, data: &RequestData) -> Result<(), Self::Error>; } /// Allow for boxing of enum variants #[async_trait::async_trait] impl ActivityHandler for Box where - T: ActivityHandler + Send + Sync, + T: ActivityHandler + Send, { type DataType = T::DataType; type Error = T::Error; @@ -41,12 +37,8 @@ where self.deref().actor() } - async fn receive( - self, - data: &Data, - request_counter: &mut i32, - ) -> Result<(), Self::Error> { - (*self).receive(data, request_counter).await + async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { + (*self).receive(data).await } } @@ -66,14 +58,14 @@ pub trait ApubObject { /// Try to read the object with given ID from local database. Returns Ok(None) if it doesn't exist. async fn read_from_apub_id( object_id: Url, - data: &Self::DataType, + data: &RequestData, ) -> Result, Self::Error> where Self: Sized; /// Marks the object as deleted in local db. Called when a delete activity is received, or if /// fetch returns a tombstone. - async fn delete(self, _data: &Self::DataType) -> Result<(), Self::Error> + async fn delete(self, _data: &RequestData) -> Result<(), Self::Error> where Self: Sized, { @@ -81,7 +73,10 @@ pub trait ApubObject { } /// Trait for converting an object or actor into the respective ActivityPub type. - async fn into_apub(self, data: &Self::DataType) -> Result; + async fn into_apub( + self, + data: &RequestData, + ) -> Result; /// Converts an object from ActivityPub type to Lemmy internal type. /// @@ -91,8 +86,7 @@ pub trait ApubObject { /// * `mod_action_allowed` True if the object can be a mod activity, ignore `expected_domain` in this case async fn from_apub( apub: Self::ApubType, - data: &Self::DataType, - request_counter: &mut i32, + data: &RequestData, ) -> Result where Self: Sized; diff --git a/src/utils.rs b/src/utils.rs index 296904f..b910b93 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,24 +1,29 @@ -use crate::{utils::reqwest_shim::ResponseExt, Error, LocalInstance, APUB_JSON_CONTENT_TYPE}; +use crate::{ + request_data::RequestData, + utils::reqwest_shim::ResponseExt, + Error, + APUB_JSON_CONTENT_TYPE, +}; use http::{header::HeaderName, HeaderValue, StatusCode}; use serde::de::DeserializeOwned; -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::atomic::Ordering}; use tracing::info; use url::Url; pub(crate) mod reqwest_shim; -pub async fn fetch_object_http( +pub async fn fetch_object_http( url: &Url, - instance: &LocalInstance, - request_counter: &mut i32, + data: &RequestData, ) -> Result { + let instance = &data.local_instance(); // dont fetch local objects this way debug_assert!(url.domain() != Some(&instance.hostname)); instance.verify_url_valid(url).await?; info!("Fetching remote object {}", url.to_string()); - *request_counter += 1; - if *request_counter > instance.settings.http_fetch_limit { + let counter = data.request_counter.fetch_add(1, Ordering::SeqCst); + if counter > instance.settings.http_fetch_limit { return Err(Error::RequestLimit); }