From d94a2ed0fc9754c1901b9857000287a1ee47d99b Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 7 Mar 2023 23:01:36 +0100 Subject: [PATCH] live federation example --- Cargo.toml | 4 + docs/03_federating_users.md | 6 +- examples/README.md | 25 ++++ .../live_federation/activities/create_post.rs | 72 ++++++++++ examples/live_federation/activities/mod.rs | 1 + examples/live_federation/database.rs | 26 ++++ examples/live_federation/error.rs | 20 +++ examples/live_federation/http.rs | 69 ++++++++++ examples/live_federation/main.rs | 69 ++++++++++ examples/live_federation/objects/mod.rs | 2 + examples/live_federation/objects/person.rs | 127 ++++++++++++++++++ examples/live_federation/objects/post.rs | 101 ++++++++++++++ examples/live_federation/utils.rs | 13 ++ .../activities/create_post.rs | 10 +- examples/local_federation/actix_web/http.rs | 2 +- examples/local_federation/axum/http.rs | 2 +- examples/local_federation/instance.rs | 2 +- examples/local_federation/main.rs | 1 + examples/local_federation/objects/person.rs | 23 +++- examples/local_federation/objects/post.rs | 11 +- examples/local_federation/utils.rs | 4 +- src/activity_queue.rs | 5 +- src/axum/inbox.rs | 1 + src/config.rs | 4 +- src/fetch/webfinger.rs | 6 +- src/protocol/context.rs | 7 +- src/traits.rs | 18 ++- 27 files changed, 597 insertions(+), 34 deletions(-) create mode 100644 examples/live_federation/activities/create_post.rs create mode 100644 examples/live_federation/activities/mod.rs create mode 100644 examples/live_federation/database.rs create mode 100644 examples/live_federation/error.rs create mode 100644 examples/live_federation/http.rs create mode 100644 examples/live_federation/main.rs create mode 100644 examples/live_federation/objects/mod.rs create mode 100644 examples/live_federation/objects/person.rs create mode 100644 examples/live_federation/objects/post.rs create mode 100644 examples/live_federation/utils.rs diff --git a/Cargo.toml b/Cargo.toml index 2a1a494..94b64b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,3 +67,7 @@ debug = 0 [[example]] name = "local_federation" path = "examples/local_federation/main.rs" + +[[example]] +name = "live_federation" +path = "examples/live_federation/main.rs" diff --git a/docs/03_federating_users.md b/docs/03_federating_users.md index af762fb..1eb2027 100644 --- a/docs/03_federating_users.md +++ b/docs/03_federating_users.md @@ -65,6 +65,7 @@ Besides we also need a second struct to represent the data which gets stored in ```rust # use url::Url; +# use chrono::NaiveDateTime; pub struct DbUser { pub id: i32, @@ -76,8 +77,9 @@ pub struct DbUser { pub inbox: Url, pub outbox: Url, pub local: bool, - public_key: String, - private_key: Option, + pub public_key: String, + pub private_key: Option, + pub last_refreshed_at: NaiveDateTime, } ``` diff --git a/examples/README.md b/examples/README.md index 9af7587..77c0070 100644 --- a/examples/README.md +++ b/examples/README.md @@ -11,3 +11,28 @@ Use one of the following commands to run the example with the specified web fram `cargo run --example local_federation axum` `cargo run --example local_federation actix-web` + +## Live Federation + +A minimal application which can be deployed on a server and federate with other platforms such as Mastodon. For this it needs run at the root of a (sub)domain which is available over HTTPS. Edit `main.rs` to configure the server domain and your Fediverse handle. + +Setup instructions: + +- Deploy the project to a server. For this you can clone the git repository on the server and execute `cargo run --example live_federation`. Alternatively run `cargo build --example live_federation` and copy the binary at `target/debug/examples/live_federation` to the server. +- Create a TLS certificate. With Let's Encrypt certbot you can use a command like `certbot certonly --nginx -d 'example.com' -m '*your-email@domain.com*'` (replace with your actual domain and email). +- Setup a reverse proxy which handles TLS and passes requests to the example project. With nginx you can use the following basic config, again using your actual domain: +``` +server { + listen 443 ssl http2; + listen [::]:443 ssl http2; + server_name example.com; + ssl_certificate /etc/letsencrypt/live/example.com/fullchain.pem; + ssl_certificate_key /etc/letsencrypt/live/example.com/privkey.pem; + location / { + proxy_pass "http://localhost:8003"; + proxy_set_header Host $host; + } +} +``` +- Test with `curl -H 'Accept: application/activity+json' https://example.com/alison | jq` and `curl -H 'Accept: application/activity+json' "https://example.com/.well-known/webfinger?resource=acct:alison@example.com" | jq` that the server is setup correctly and serving correct responses. +- Login to a Fediverse platform like Mastodon, and search for `@alison@example.com`, with the actual domain and username from your `main.rs`. If you send a message, it will automatically send a response. \ No newline at end of file diff --git a/examples/live_federation/activities/create_post.rs b/examples/live_federation/activities/create_post.rs new file mode 100644 index 0000000..e87f7f7 --- /dev/null +++ b/examples/live_federation/activities/create_post.rs @@ -0,0 +1,72 @@ +use crate::{ + database::DatabaseHandle, + error::Error, + objects::{person::DbUser, post::Note}, + utils::generate_object_id, + DbPost, +}; +use activitypub_federation::{ + activity_queue::send_activity, + config::RequestData, + fetch::object_id::ObjectId, + kinds::activity::CreateType, + protocol::{context::WithContext, helpers::deserialize_one_or_many}, + traits::{ActivityHandler, ApubObject}, +}; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct CreatePost { + pub(crate) actor: ObjectId, + #[serde(deserialize_with = "deserialize_one_or_many")] + pub(crate) to: Vec, + pub(crate) object: Note, + #[serde(rename = "type")] + pub(crate) kind: CreateType, + pub(crate) id: Url, +} + +impl CreatePost { + pub async fn send( + note: Note, + inbox: Url, + data: &RequestData, + ) -> Result<(), Error> { + print!("Sending reply to {}", ¬e.attributed_to); + let create = CreatePost { + actor: note.attributed_to.clone(), + to: note.to.clone(), + object: note, + kind: CreateType::Create, + id: generate_object_id(data.domain())?, + }; + let create_with_context = WithContext::new_default(create); + let private_key = data + .local_user() + .private_key + .expect("local user always has private key"); + send_activity(create_with_context, private_key, vec![inbox], data).await?; + Ok(()) + } +} + +#[async_trait::async_trait] +impl ActivityHandler for CreatePost { + type DataType = DatabaseHandle; + type Error = crate::error::Error; + + fn id(&self) -> &Url { + &self.id + } + + fn actor(&self) -> &Url { + self.actor.inner() + } + + async fn receive(self, data: &RequestData) -> Result<(), Self::Error> { + DbPost::from_apub(self.object, data).await?; + Ok(()) + } +} diff --git a/examples/live_federation/activities/mod.rs b/examples/live_federation/activities/mod.rs new file mode 100644 index 0000000..7e15ee0 --- /dev/null +++ b/examples/live_federation/activities/mod.rs @@ -0,0 +1 @@ +pub mod create_post; diff --git a/examples/live_federation/database.rs b/examples/live_federation/database.rs new file mode 100644 index 0000000..967c534 --- /dev/null +++ b/examples/live_federation/database.rs @@ -0,0 +1,26 @@ +use crate::{objects::person::DbUser, Error}; +use anyhow::anyhow; +use std::sync::{Arc, Mutex}; + +pub type DatabaseHandle = Arc; + +/// Our "database" which contains all known users (local and federated) +pub struct Database { + pub users: Mutex>, +} + +impl Database { + pub fn local_user(&self) -> DbUser { + let lock = self.users.lock().unwrap(); + lock.first().unwrap().clone() + } + + pub fn read_user(&self, name: &str) -> Result { + let db_user = self.local_user(); + if name == db_user.name { + Ok(db_user) + } else { + Err(anyhow!("Invalid user {name}").into()) + } + } +} diff --git a/examples/live_federation/error.rs b/examples/live_federation/error.rs new file mode 100644 index 0000000..3ef1819 --- /dev/null +++ b/examples/live_federation/error.rs @@ -0,0 +1,20 @@ +use std::fmt::{Display, Formatter}; + +/// Necessary because of this issue: https://github.com/actix/actix-web/issues/1711 +#[derive(Debug)] +pub struct Error(pub(crate) anyhow::Error); + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl From for Error +where + T: Into, +{ + fn from(t: T) -> Self { + Error(t.into()) + } +} diff --git a/examples/live_federation/http.rs b/examples/live_federation/http.rs new file mode 100644 index 0000000..ba1ea68 --- /dev/null +++ b/examples/live_federation/http.rs @@ -0,0 +1,69 @@ +use crate::{ + database::DatabaseHandle, + error::Error, + objects::person::{DbUser, Person, PersonAcceptedActivities}, +}; +use activitypub_federation::{ + axum::{ + inbox::{receive_activity, ActivityData}, + json::ApubJson, + }, + config::RequestData, + fetch::webfinger::{build_webfinger_response, extract_webfinger_name, Webfinger}, + protocol::context::WithContext, + traits::ApubObject, +}; +use axum::{ + extract::{Path, Query}, + response::{IntoResponse, Response}, + Json, +}; +use axum_macros::debug_handler; +use http::StatusCode; +use serde::Deserialize; + +impl IntoResponse for Error { + fn into_response(self) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", self.0)).into_response() + } +} + +#[debug_handler] +pub async fn http_get_user( + Path(name): Path, + data: RequestData, +) -> Result>, Error> { + let db_user = data.read_user(&name)?; + let apub_user = db_user.into_apub(&data).await?; + Ok(ApubJson(WithContext::new_default(apub_user))) +} + +#[debug_handler] +pub async fn http_post_user_inbox( + data: RequestData, + activity_data: ActivityData, +) -> impl IntoResponse { + receive_activity::, DbUser, DatabaseHandle>( + activity_data, + &data, + ) + .await +} + +#[derive(Deserialize)] +pub struct WebfingerQuery { + resource: String, +} + +#[debug_handler] +pub async fn webfinger( + Query(query): Query, + data: RequestData, +) -> Result, Error> { + let name = extract_webfinger_name(&query.resource, &data)?; + let db_user = data.read_user(&name)?; + Ok(Json(build_webfinger_response( + query.resource, + db_user.ap_id.into_inner(), + ))) +} diff --git a/examples/live_federation/main.rs b/examples/live_federation/main.rs new file mode 100644 index 0000000..1734f28 --- /dev/null +++ b/examples/live_federation/main.rs @@ -0,0 +1,69 @@ +use crate::{ + database::Database, + http::{http_get_user, http_post_user_inbox, webfinger}, + objects::{person::DbUser, post::DbPost}, + utils::generate_object_id, +}; +use activitypub_federation::config::{ApubMiddleware, FederationConfig}; +use axum::{ + routing::{get, post}, + Router, +}; +use error::Error; +use std::{ + net::ToSocketAddrs, + sync::{Arc, Mutex}, +}; +use tracing::log::{info, LevelFilter}; + +mod activities; +mod database; +mod error; +#[allow(clippy::diverging_sub_expression, clippy::items_after_statements)] +mod http; +mod objects; +mod utils; + +const DOMAIN: &str = "example.com"; +const LOCAL_USER_NAME: &str = "alison"; +const BIND_ADDRESS: &str = "localhost:8003"; + +#[actix_rt::main] +async fn main() -> Result<(), Error> { + env_logger::builder() + .filter_level(LevelFilter::Warn) + .filter_module("activitypub_federation", LevelFilter::Info) + .filter_module("live_federation", LevelFilter::Info) + .format_timestamp(None) + .init(); + + info!("Setup local user and database"); + let local_user = DbUser::new(DOMAIN, LOCAL_USER_NAME)?; + let database = Arc::new(Database { + users: Mutex::new(vec![local_user]), + }); + + info!("Setup configuration"); + let config = FederationConfig::builder() + .domain(DOMAIN) + .app_data(database) + .build()?; + + info!("Listen with HTTP server on {BIND_ADDRESS}"); + let config = config.clone(); + let app = Router::new() + .route("/:user", get(http_get_user)) + .route("/:user/inbox", post(http_post_user_inbox)) + .route("/.well-known/webfinger", get(webfinger)) + .layer(ApubMiddleware::new(config)); + + let addr = BIND_ADDRESS + .to_socket_addrs()? + .next() + .expect("Failed to lookup domain name"); + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await?; + + Ok(()) +} diff --git a/examples/live_federation/objects/mod.rs b/examples/live_federation/objects/mod.rs new file mode 100644 index 0000000..b5239ab --- /dev/null +++ b/examples/live_federation/objects/mod.rs @@ -0,0 +1,2 @@ +pub mod person; +pub mod post; diff --git a/examples/live_federation/objects/person.rs b/examples/live_federation/objects/person.rs new file mode 100644 index 0000000..545a504 --- /dev/null +++ b/examples/live_federation/objects/person.rs @@ -0,0 +1,127 @@ +use crate::{activities::create_post::CreatePost, database::DatabaseHandle, error::Error}; +use activitypub_federation::{ + config::RequestData, + fetch::object_id::ObjectId, + http_signatures::generate_actor_keypair, + kinds::actor::PersonType, + protocol::public_key::PublicKey, + traits::{ActivityHandler, Actor, ApubObject}, +}; +use chrono::{Local, NaiveDateTime}; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use url::Url; + +#[derive(Debug, Clone)] +pub struct DbUser { + pub name: String, + pub ap_id: ObjectId, + pub inbox: Url, + // exists for all users (necessary to verify http signatures) + pub public_key: String, + // exists only for local users + pub private_key: Option, + last_refreshed_at: NaiveDateTime, + pub followers: Vec, + pub local: bool, +} + +/// List of all activities which this actor can receive. +#[derive(Deserialize, Serialize, Debug)] +#[serde(untagged)] +#[enum_delegate::implement(ActivityHandler)] +pub enum PersonAcceptedActivities { + CreateNote(CreatePost), +} + +impl DbUser { + pub fn new(hostname: &str, name: &str) -> Result { + let ap_id = Url::parse(&format!("https://{}/{}", hostname, &name))?.into(); + let inbox = Url::parse(&format!("https://{}/{}/inbox", hostname, &name))?; + let keypair = generate_actor_keypair()?; + Ok(DbUser { + name: name.to_string(), + ap_id, + inbox, + public_key: keypair.public_key, + private_key: Some(keypair.private_key), + last_refreshed_at: Local::now().naive_local(), + followers: vec![], + local: true, + }) + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Person { + #[serde(rename = "type")] + kind: PersonType, + preferred_username: String, + id: ObjectId, + inbox: Url, + public_key: PublicKey, +} + +#[async_trait::async_trait] +impl ApubObject for DbUser { + type DataType = DatabaseHandle; + type ApubType = Person; + type Error = Error; + + fn last_refreshed_at(&self) -> Option { + Some(self.last_refreshed_at) + } + + async fn read_from_apub_id( + object_id: Url, + data: &RequestData, + ) -> Result, Self::Error> { + let users = data.users.lock().unwrap(); + let res = users + .clone() + .into_iter() + .find(|u| u.ap_id.inner() == &object_id); + Ok(res) + } + + async fn into_apub( + self, + _data: &RequestData, + ) -> Result { + let public_key = PublicKey::new(self.ap_id.clone().into_inner(), self.public_key.clone()); + Ok(Person { + preferred_username: self.name.clone(), + kind: Default::default(), + id: self.ap_id.clone(), + inbox: self.inbox, + public_key, + }) + } + + async fn from_apub( + apub: Self::ApubType, + _data: &RequestData, + ) -> Result { + Ok(DbUser { + name: apub.preferred_username, + ap_id: apub.id, + inbox: apub.inbox, + public_key: apub.public_key.public_key_pem, + private_key: None, + last_refreshed_at: Local::now().naive_local(), + followers: vec![], + local: false, + }) + } +} + +impl Actor for DbUser { + fn public_key(&self) -> &str { + &self.public_key + } + + fn inbox(&self) -> Url { + self.inbox.clone() + } +} diff --git a/examples/live_federation/objects/post.rs b/examples/live_federation/objects/post.rs new file mode 100644 index 0000000..783b725 --- /dev/null +++ b/examples/live_federation/objects/post.rs @@ -0,0 +1,101 @@ +use crate::{ + activities::create_post::CreatePost, + database::DatabaseHandle, + error::Error, + generate_object_id, + objects::person::DbUser, +}; +use activitypub_federation::{ + config::RequestData, + fetch::object_id::ObjectId, + kinds::{object::NoteType, public}, + protocol::helpers::deserialize_one_or_many, + traits::{Actor, ApubObject}, +}; +use activitystreams_kinds::link::MentionType; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Clone, Debug)] +pub struct DbPost { + pub text: String, + pub ap_id: ObjectId, + pub creator: ObjectId, + pub local: bool, +} + +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Note { + #[serde(rename = "type")] + kind: NoteType, + id: ObjectId, + pub(crate) attributed_to: ObjectId, + #[serde(deserialize_with = "deserialize_one_or_many")] + pub(crate) to: Vec, + content: String, + in_reply_to: Option>, + tag: Vec, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Mention { + pub href: Url, + #[serde(rename = "type")] + pub kind: MentionType, +} + +#[async_trait::async_trait] +impl ApubObject for DbPost { + type DataType = DatabaseHandle; + type ApubType = Note; + type Error = Error; + + async fn read_from_apub_id( + _object_id: Url, + _data: &RequestData, + ) -> Result, Self::Error> { + Ok(None) + } + + async fn into_apub( + self, + _data: &RequestData, + ) -> Result { + unimplemented!() + } + + async fn from_apub( + apub: Self::ApubType, + data: &RequestData, + ) -> Result { + println!( + "Received post with content {} and id {}", + &apub.content, &apub.id + ); + let creator = apub.attributed_to.dereference(data).await?; + let post = DbPost { + text: apub.content, + ap_id: apub.id.clone(), + creator: apub.attributed_to.clone(), + local: false, + }; + + let mention = Mention { + href: creator.ap_id.clone().into_inner(), + kind: Default::default(), + }; + let note = Note { + kind: Default::default(), + id: generate_object_id(data.domain())?.into(), + attributed_to: data.local_user().ap_id, + to: vec![public()], + content: format!("Hello {}", creator.name), + in_reply_to: Some(apub.id.clone()), + tag: vec![mention], + }; + CreatePost::send(note, creator.shared_inbox_or_inbox(), data).await?; + + Ok(post) + } +} diff --git a/examples/live_federation/utils.rs b/examples/live_federation/utils.rs new file mode 100644 index 0000000..0b2b098 --- /dev/null +++ b/examples/live_federation/utils.rs @@ -0,0 +1,13 @@ +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use url::{ParseError, Url}; + +/// Just generate random url as object id. In a real project, you probably want to use +/// an url which contains the database id for easy retrieval (or store the random id in db). +pub fn generate_object_id(domain: &str) -> Result { + let id: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + Url::parse(&format!("https://{}/objects/{}", domain, id)) +} diff --git a/examples/local_federation/activities/create_post.rs b/examples/local_federation/activities/create_post.rs index 875b8e5..547c723 100644 --- a/examples/local_federation/activities/create_post.rs +++ b/examples/local_federation/activities/create_post.rs @@ -15,7 +15,7 @@ use url::Url; #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct CreateNote { +pub struct CreatePost { pub(crate) actor: ObjectId, #[serde(deserialize_with = "deserialize_one_or_many")] pub(crate) to: Vec, @@ -25,9 +25,9 @@ pub struct CreateNote { pub(crate) id: Url, } -impl CreateNote { - pub fn new(note: Note, id: Url) -> CreateNote { - CreateNote { +impl CreatePost { + pub fn new(note: Note, id: Url) -> CreatePost { + CreatePost { actor: note.attributed_to.clone(), to: note.to.clone(), object: note, @@ -38,7 +38,7 @@ impl CreateNote { } #[async_trait::async_trait] -impl ActivityHandler for CreateNote { +impl ActivityHandler for CreatePost { type DataType = DatabaseHandle; type Error = crate::error::Error; diff --git a/examples/local_federation/actix_web/http.rs b/examples/local_federation/actix_web/http.rs index 8bcc2c0..0fd5037 100644 --- a/examples/local_federation/actix_web/http.rs +++ b/examples/local_federation/actix_web/http.rs @@ -17,7 +17,7 @@ use serde::Deserialize; use tracing::info; pub fn listen(config: &FederationConfig) -> Result<(), Error> { - let hostname = config.hostname(); + let hostname = config.domain(); info!("Listening with actix-web on {hostname}"); let config = config.clone(); let server = HttpServer::new(move || { diff --git a/examples/local_federation/axum/http.rs b/examples/local_federation/axum/http.rs index 2a1151f..dfa8e7e 100644 --- a/examples/local_federation/axum/http.rs +++ b/examples/local_federation/axum/http.rs @@ -26,7 +26,7 @@ use std::net::ToSocketAddrs; use tracing::info; pub fn listen(config: &FederationConfig) -> Result<(), Error> { - let hostname = config.hostname(); + let hostname = config.domain(); info!("Listening with axum on {hostname}"); let config = config.clone(); let app = Router::new() diff --git a/examples/local_federation/instance.rs b/examples/local_federation/instance.rs index 1c2ecfa..1dca4a9 100644 --- a/examples/local_federation/instance.rs +++ b/examples/local_federation/instance.rs @@ -30,7 +30,7 @@ pub fn new_instance( pub type DatabaseHandle = Arc; -/// Our "database" which contains all known posts users (local and federated) +/// Our "database" which contains all known posts and users (local and federated) pub struct Database { pub users: Mutex>, pub posts: Mutex>, diff --git a/examples/local_federation/main.rs b/examples/local_federation/main.rs index 6f65ea7..68f7324 100644 --- a/examples/local_federation/main.rs +++ b/examples/local_federation/main.rs @@ -21,6 +21,7 @@ mod utils; async fn main() -> Result<(), Error> { env_logger::builder() .filter_level(LevelFilter::Warn) + .filter_module("activitypub_federation", LevelFilter::Info) .filter_module("local_federation", LevelFilter::Info) .format_timestamp(None) .init(); diff --git a/examples/local_federation/objects/person.rs b/examples/local_federation/objects/person.rs index cbbe9fa..8e1c7a1 100644 --- a/examples/local_federation/objects/person.rs +++ b/examples/local_federation/objects/person.rs @@ -1,5 +1,5 @@ use crate::{ - activities::{accept::Accept, create_post::CreateNote, follow::Follow}, + activities::{accept::Accept, create_post::CreatePost, follow::Follow}, error::Error, instance::DatabaseHandle, objects::post::DbPost, @@ -14,6 +14,7 @@ use activitypub_federation::{ protocol::{context::WithContext, public_key::PublicKey}, traits::{ActivityHandler, Actor, ApubObject}, }; +use chrono::{Local, NaiveDateTime}; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use url::Url; @@ -27,6 +28,7 @@ pub struct DbUser { public_key: String, // exists only for local users private_key: Option, + last_refreshed_at: NaiveDateTime, pub followers: Vec, pub local: bool, } @@ -38,7 +40,7 @@ pub struct DbUser { pub enum PersonAcceptedActivities { Follow(Follow), Accept(Accept), - CreateNote(CreateNote), + CreateNote(CreatePost), } impl DbUser { @@ -52,6 +54,7 @@ impl DbUser { inbox, public_key: keypair.public_key, private_key: Some(keypair.private_key), + last_refreshed_at: Local::now().naive_local(), followers: vec![], local: true, }) @@ -101,7 +104,7 @@ impl DbUser { data: &RequestData, ) -> Result<(), Error> { let id = generate_object_id(data.domain())?; - let create = CreateNote::new(post.into_apub(data).await?, id.clone()); + let create = CreatePost::new(post.into_apub(data).await?, id.clone()); let mut inboxes = vec![]; for f in self.followers.clone() { let user: DbUser = ObjectId::from(f).dereference(data).await?; @@ -139,6 +142,10 @@ impl ApubObject for DbUser { type ApubType = Person; type Error = Error; + fn last_refreshed_at(&self) -> Option { + Some(self.last_refreshed_at) + } + async fn read_from_apub_id( object_id: Url, data: &RequestData, @@ -166,17 +173,21 @@ impl ApubObject for DbUser { async fn from_apub( apub: Self::ApubType, - _data: &RequestData, + data: &RequestData, ) -> Result { - Ok(DbUser { + let user = DbUser { name: apub.preferred_username, ap_id: apub.id, inbox: apub.inbox, public_key: apub.public_key.public_key_pem, private_key: None, + last_refreshed_at: Local::now().naive_local(), followers: vec![], local: false, - }) + }; + let mut mutex = data.users.lock().unwrap(); + mutex.push(user.clone()); + Ok(user) } } diff --git a/examples/local_federation/objects/post.rs b/examples/local_federation/objects/post.rs index ebe7cec..601fe8b 100644 --- a/examples/local_federation/objects/post.rs +++ b/examples/local_federation/objects/post.rs @@ -48,10 +48,15 @@ impl ApubObject for DbPost { type Error = Error; async fn read_from_apub_id( - _object_id: Url, - _data: &RequestData, + object_id: Url, + data: &RequestData, ) -> Result, Self::Error> { - todo!() + let posts = data.posts.lock().unwrap(); + let res = posts + .clone() + .into_iter() + .find(|u| u.ap_id.inner() == &object_id); + Ok(res) } async fn into_apub( diff --git a/examples/local_federation/utils.rs b/examples/local_federation/utils.rs index 87c421e..6b5f9d0 100644 --- a/examples/local_federation/utils.rs +++ b/examples/local_federation/utils.rs @@ -3,11 +3,11 @@ use url::{ParseError, Url}; /// Just generate random url as object id. In a real project, you probably want to use /// an url which contains the database id for easy retrieval (or store the random id in db). -pub fn generate_object_id(hostname: &str) -> Result { +pub fn generate_object_id(domain: &str) -> Result { let id: String = thread_rng() .sample_iter(&Alphanumeric) .take(7) .map(char::from) .collect(); - Url::parse(&format!("http://{}/objects/{}", hostname, id)) + Url::parse(&format!("http://{}/objects/{}", domain, id)) } diff --git a/src/activity_queue.rs b/src/activity_queue.rs index 77cda44..c2e8cbc 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -162,9 +162,10 @@ async fn do_send( Ok(()) } Ok(o) if o.status().is_client_error() => { + let text = o.text_limited().await.map_err(Error::other)?; info!( - "Target server {} rejected {}, aborting", - task.inbox, task.activity_id, + "Activity {} was rejected by {}, aborting: {}", + task.activity_id, task.inbox, text, ); Ok(()) } diff --git a/src/axum/inbox.rs b/src/axum/inbox.rs index 79442ce..92493c0 100644 --- a/src/axum/inbox.rs +++ b/src/axum/inbox.rs @@ -44,6 +44,7 @@ where .dereference(data) .await?; + // TODO: why do errors here not get returned over http? verify_signature( &activity_data.headers, &activity_data.method, diff --git a/src/config.rs b/src/config.rs index 9f3dc46..ad0ed4e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -160,8 +160,8 @@ impl FederationConfig { domain == self.domain } - /// Returns the local hostname - pub fn hostname(&self) -> &str { + /// Returns the local domain + pub fn domain(&self) -> &str { &self.domain } } diff --git a/src/fetch/webfinger.rs b/src/fetch/webfinger.rs index 29ce1c9..79a6d17 100644 --- a/src/fetch/webfinger.rs +++ b/src/fetch/webfinger.rs @@ -125,10 +125,10 @@ pub struct Webfinger { /// Links where further data about `subject` can be retrieved pub links: Vec, /// Other Urls which identify the same actor as the `subject` - #[serde(default)] + #[serde(default, skip_serializing_if = "Vec::is_empty")] pub aliases: Vec, /// Additional data about the subject - #[serde(default)] + #[serde(default, skip_serializing_if = "HashMap::is_empty")] pub properties: HashMap, } @@ -143,7 +143,7 @@ pub struct WebfingerLink { /// Url pointing to the target resource pub href: Option, /// Additional data about the link - #[serde(default)] + #[serde(default, skip_serializing_if = "HashMap::is_empty")] pub properties: HashMap, } diff --git a/src/protocol/context.rs b/src/protocol/context.rs index 5c5340b..d71cb7b 100644 --- a/src/protocol/context.rs +++ b/src/protocol/context.rs @@ -15,7 +15,7 @@ //! }; //! let note_with_context = WithContext::new_default(note); //! let serialized = serde_json::to_string(¬e_with_context)?; -//! assert_eq!(serialized, r#"{"@context":[["https://www.w3.org/ns/activitystreams"]],"content":"Hello world"}"#); +//! assert_eq!(serialized, r#"{"@context":["https://www.w3.org/ns/activitystreams"],"content":"Hello world"}"#); //! Ok::<(), serde_json::error::Error>(()) //! ``` @@ -26,11 +26,10 @@ use crate::{ }; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::str::FromStr; use url::Url; /// Default context used in Activitypub -const DEFAULT_CONTEXT: &str = "[\"https://www.w3.org/ns/activitystreams\"]"; +const DEFAULT_CONTEXT: &str = "https://www.w3.org/ns/activitystreams"; /// Wrapper for federated structs which handles `@context` field. #[derive(Serialize, Deserialize, Debug)] @@ -45,7 +44,7 @@ pub struct WithContext { impl WithContext { /// Create a new wrapper with the default Activitypub context. pub fn new_default(inner: T) -> WithContext { - let context = vec![Value::from_str(DEFAULT_CONTEXT).expect("valid context")]; + let context = vec![Value::String(DEFAULT_CONTEXT.to_string())]; WithContext::new(inner, context) } diff --git a/src/traits.rs b/src/traits.rs index 66c4f32..4473925 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -9,6 +9,7 @@ use url::Url; /// Helper for converting between database structs and federated protocol structs. /// /// ``` +/// # use chrono::{Local, NaiveDateTime}; /// # use url::Url; /// # use activitypub_federation::protocol::public_key::PublicKey; /// # use activitypub_federation::config::RequestData; @@ -19,7 +20,9 @@ use url::Url; /// # pub ap_id: Url, /// # pub inbox: Url, /// # pub public_key: String, +/// # pub private_key: Option, /// # pub local: bool, +/// # pub last_refreshed_at: NaiveDateTime, /// # } /// /// #[async_trait::async_trait] @@ -28,6 +31,10 @@ use url::Url; /// type ApubType = Person; /// type Error = anyhow::Error; /// +/// fn last_refreshed_at(&self) -> Option { +/// Some(self.last_refreshed_at) +/// } +/// /// async fn read_from_apub_id(object_id: Url, data: &RequestData) -> Result, Self::Error> { /// // Attempt to read object from local database. Return Ok(None) if not found. /// let user: Option = data.read_user_from_apub_id(object_id).await?; @@ -55,7 +62,9 @@ use url::Url; /// ap_id: apub.id.into_inner(), /// inbox: apub.inbox, /// public_key: apub.public_key.public_key_pem, +/// private_key: None, /// local: false, +/// last_refreshed_at: Local::now().naive_local(), /// }; /// /// // Make sure not to overwrite any local object @@ -80,8 +89,13 @@ pub trait ApubObject: Sized { /// Returns the last time this object was updated. /// - /// Used to avoid refetching an object over HTTP every time it is dereferenced. Only called - /// for remote objects. + /// If this returns `Some` and the value is too long ago, the object is refetched from the + /// original instance. This should always be implemented for actors, because there is no active + /// update mechanism prescribed. It is possible to send `Update/Person` activities for profile + /// changes, but not all implementations do this, so `last_refreshed_at` is still necessary. + /// + /// The object is refetched if `last_refreshed_at` value is more than 24 hours ago. In debug + /// mode this is reduced to 20 seconds. fn last_refreshed_at(&self) -> Option { None }