Extract activitypub library into separate git repository

This commit is contained in:
Felix Ableitner 2022-06-02 13:34:45 +02:00
parent e5b1bbe38b
commit 807d75c4df
34 changed files with 11 additions and 2110 deletions

69
Cargo.lock generated
View file

@ -4,11 +4,11 @@ version = 3
[[package]]
name = "activitypub_federation"
version = "0.16.4-rc.11"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b34a144dc98c419543690aa8f182d8675ebe0610775982b8fdee84a00f70fe"
dependencies = [
"activitypub_federation_derive",
"activitystreams-kinds",
"actix-rt",
"actix-web",
"anyhow",
"async-trait",
@ -16,32 +16,30 @@ dependencies = [
"base64",
"chrono",
"derive_builder 0.11.2",
"env_logger",
"http",
"http-signature-normalization-actix",
"http-signature-normalization-reqwest",
"once_cell",
"openssl",
"rand 0.8.5",
"reqwest",
"reqwest-middleware",
"serde",
"serde_json",
"sha2",
"thiserror",
"tokio",
"tracing",
"url",
]
[[package]]
name = "activitypub_federation_derive"
version = "0.16.4-rc.11"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a2aaf58676b669d3b0dedf6bbb44fa518b5a6657b2959561d77899c668dec2a"
dependencies = [
"proc-macro2 1.0.39",
"quote 1.0.18",
"syn 1.0.95",
"trybuild",
]
[[package]]
@ -1117,12 +1115,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "dissimilar"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31ad93652f40969dead8d4bf897a41e9462095152eb21c56e5830537e41179dd"
[[package]]
name = "doku"
version = "0.11.0"
@ -1239,15 +1231,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5320ae4c3782150d900b79807611a59a99fc9a1d61d686faafc24b93fc8d7ca"
[[package]]
name = "env_logger"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3"
dependencies = [
"log",
]
[[package]]
name = "event-listener"
version = "2.5.2"
@ -1479,12 +1462,6 @@ dependencies = [
"wasi 0.10.0+wasi-snapshot-preview1",
]
[[package]]
name = "glob"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "h2"
version = "0.3.12"
@ -3788,15 +3765,6 @@ dependencies = [
"utf-8",
]
[[package]]
name = "termcolor"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.31"
@ -3980,15 +3948,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "toml"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
dependencies = [
"serde",
]
[[package]]
name = "tonic"
version = "0.6.2"
@ -4192,22 +4151,6 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "trybuild"
version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ae8c4cee9b97b861a6e3be1d5acb6f50a86bbb68b1f3a896db8342fb6d0f94c"
dependencies = [
"dissimilar",
"glob",
"once_cell",
"serde",
"serde_derive",
"serde_json",
"termcolor",
"toml",
]
[[package]]
name = "twoway"
version = "0.2.2"

View file

@ -23,8 +23,6 @@ members = [
"crates/api",
"crates/api_crud",
"crates/api_common",
"crates/activitypub_federation",
"crates/activitypub_federation_derive",
"crates/apub",
"crates/utils",
"crates/db_schema",
@ -39,12 +37,12 @@ members = [
lemmy_api = { version = "=0.16.4-rc.11", path = "./crates/api" }
lemmy_api_crud = { version = "=0.16.4-rc.11", path = "./crates/api_crud" }
lemmy_apub = { version = "=0.16.4-rc.11", path = "./crates/apub" }
activitypub_federation = { version = "=0.16.4-rc.11", path = "./crates/activitypub_federation" }
lemmy_utils = { version = "=0.16.4-rc.11", path = "./crates/utils" }
lemmy_db_schema = { version = "=0.16.4-rc.11", path = "./crates/db_schema" }
lemmy_api_common = { version = "=0.16.4-rc.11", path = "crates/api_common" }
lemmy_websocket = { version = "=0.16.4-rc.11", path = "./crates/websocket" }
lemmy_routes = { version = "=0.16.4-rc.11", path = "./crates/routes" }
activitypub_federation = "0.1.0"
diesel = "1.4.8"
diesel_migrations = "1.4.0"
serde = { version = "1.0.136", features = ["derive"] }

View file

@ -1,38 +0,0 @@
[package]
name = "activitypub_federation"
version = "0.16.4-rc.11"
edition = "2021"
description = "A link aggregator for the fediverse"
license = "AGPL-3.0"
homepage = "https://join-lemmy.org/"
documentation = "https://join-lemmy.org/docs/en/index.html"
[dependencies]
activitypub_federation_derive = { version = "=0.16.4-rc.11", path = "../activitypub_federation_derive" }
chrono = { version = "0.4.19", features = ["clock"], default-features = false }
serde = { version = "1.0.136", features = ["derive"] }
async-trait = "0.1.53"
url = { version = "2.2.2", features = ["serde"] }
serde_json = { version = "1.0.79", features = ["preserve_order"] }
anyhow = "1.0.56"
reqwest = { version = "0.11.10", features = ["json"] }
reqwest-middleware = "0.1.5"
tracing = "0.1.32"
base64 = "0.13.0"
openssl = "0.10.38"
once_cell = "1.10.0"
http = "0.2.6"
sha2 = "0.10.2"
actix-web = { version = "4.0.1", default-features = false }
http-signature-normalization-actix = { version = "0.6.1", default-features = false, features = ["server", "sha-2"] }
http-signature-normalization-reqwest = { version = "0.5.0", default-features = false, features = ["sha-2", "middleware"] }
background-jobs = "0.12.0"
thiserror = "1.0.31"
derive_builder = "0.11.2"
[dev-dependencies]
activitystreams-kinds = "0.2.1"
rand = "0.8.5"
actix-rt = "2.7.0"
tokio = "1.18.2"
env_logger = { version = "0.9.0", default-features = false }

View file

@ -1,32 +0,0 @@
Activitypub-Federation
===
A high-level framework for [ActivityPub](https://www.w3.org/TR/activitypub/) federation in Rust, extracted from [Lemmy](https://join-lemmy.org/). The goal is that this library can take care of almost everything related to federation for different projects, but for now it is still far away from that goal.
## Features
- ObjectId type, wraps the `id` url and allows for type safe fetching of objects, both from database and HTTP
- Queue for activity sending, handles HTTP signatures, retry with exponential backoff, all in background workers
- Inbox for receiving activities, verifies HTTP signatures, performs other basic checks and helps with routing
- Generic error type (unfortunately this was necessary)
- various helpers for verification, (de)serialization, context etc
## Roadmap
Things to work on in the future:
- **Simplify generics**: The library uses a lot of generic parameters, where clauses and associated types. It should be possible to simplify them.
- **Improve macro**: The macro is implemented very badly and doesn't have any error handling.
- **Generate HTTP endpoints**: It would be possible to generate HTTP endpoints automatically for each actor.
- **Support for other web frameworks**: Can be implemented using feature flags if other projects require it.
- **Signed fetch**: JSON can only be fetched by authenticated actors, which means that fetches from blocked instances can also be blocked. In combination with the previous point, this could be handled entirely in the library.
- **Helpers for testing**: Lemmy has a pretty useful test suite which (de)serializes json from other projects, to ensure that federation remains compatible. Helpers for this could be added to the library.
- **[Webfinger](https://datatracker.ietf.org/doc/html/rfc7033) support**: Not part of the Activitypub standard, but often used together for user discovery.
- **Remove request_counter from API**: It should be handled internally and not exposed. Maybe as part of `Data` struct.
## How to use
To get started, have a look at the example. If anything is unclear, please open an issue for clarification. You can also look at [Lemmy code](https://github.com/LemmyNet/lemmy/tree/main/crates/apub) for a more advanced implementation.
## License
[AGPLv3](../../LICENSE)

View file

@ -1,56 +0,0 @@
use crate::{activities::follow::Follow, instance::InstanceHandle, objects::person::MyUser};
use activitypub_federation::{core::object_id::ObjectId, data::Data, traits::ActivityHandler};
use activitystreams_kinds::activity::AcceptType;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Accept {
actor: ObjectId<MyUser>,
object: Follow,
#[serde(rename = "type")]
kind: AcceptType,
id: Url,
}
impl Accept {
pub fn new(actor: ObjectId<MyUser>, object: Follow, id: Url) -> Accept {
Accept {
actor,
object,
kind: Default::default(),
id,
}
}
}
#[async_trait::async_trait(?Send)]
impl ActivityHandler for Accept {
type DataType = InstanceHandle;
type Error = crate::error::Error;
fn id(&self) -> &Url {
&self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(
&self,
_data: &Data<Self::DataType>,
_request_counter: &mut i32,
) -> Result<(), Self::Error> {
Ok(())
}
async fn receive(
self,
_data: &Data<Self::DataType>,
_request_counter: &mut i32,
) -> Result<(), Self::Error> {
Ok(())
}
}

View file

@ -1,70 +0,0 @@
use crate::{
instance::InstanceHandle,
objects::{note::Note, person::MyUser},
MyPost,
};
use activitypub_federation::{
core::object_id::ObjectId,
data::Data,
deser::helpers::deserialize_one_or_many,
traits::{ActivityHandler, ApubObject},
};
use activitystreams_kinds::activity::CreateType;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct CreateNote {
pub(crate) actor: ObjectId<MyUser>,
#[serde(deserialize_with = "deserialize_one_or_many")]
pub(crate) to: Vec<Url>,
pub(crate) object: Note,
#[serde(rename = "type")]
pub(crate) kind: CreateType,
pub(crate) id: Url,
}
impl CreateNote {
pub fn new(note: Note, id: Url) -> CreateNote {
CreateNote {
actor: note.attributed_to.clone(),
to: note.to.clone(),
object: note,
kind: CreateType::Create,
id,
}
}
}
#[async_trait::async_trait(?Send)]
impl ActivityHandler for CreateNote {
type DataType = InstanceHandle;
type Error = crate::error::Error;
fn id(&self) -> &Url {
&self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(
&self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
MyPost::verify(&self.object, self.id(), data, request_counter).await?;
Ok(())
}
async fn receive(
self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
MyPost::from_apub(self.object, data, request_counter).await?;
Ok(())
}
}

View file

@ -1,84 +0,0 @@
use crate::{
activities::accept::Accept,
generate_object_id,
instance::InstanceHandle,
objects::person::MyUser,
};
use activitypub_federation::{core::object_id::ObjectId, data::Data, traits::ActivityHandler};
use activitystreams_kinds::activity::FollowType;
use anyhow::Error;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Follow {
pub(crate) actor: ObjectId<MyUser>,
pub(crate) object: ObjectId<MyUser>,
#[serde(rename = "type")]
kind: FollowType,
id: Url,
}
impl Follow {
pub fn new(actor: ObjectId<MyUser>, object: ObjectId<MyUser>, id: Url) -> Follow {
Follow {
actor,
object,
kind: Default::default(),
id,
}
}
}
#[async_trait::async_trait(?Send)]
impl ActivityHandler for Follow {
type DataType = InstanceHandle;
type Error = crate::error::Error;
fn id(&self) -> &Url {
&self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(
&self,
_data: &Data<Self::DataType>,
_request_counter: &mut i32,
) -> Result<(), Self::Error> {
Ok(())
}
async fn receive(
self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
// add to followers
let mut users = data.users.lock().unwrap();
let local_user = users.first_mut().unwrap();
local_user.followers.push(self.actor.inner().clone());
let local_user = local_user.clone();
drop(users);
// send back an accept
let follower = self
.actor
.dereference::<Error>(data, data.local_instance(), request_counter)
.await?;
let id = generate_object_id(data.local_instance().hostname())?;
let accept = Accept::new(local_user.ap_id.clone(), self, id.clone());
local_user
.send(
id,
accept,
vec![follower.inbox.clone()],
data.local_instance(),
)
.await?;
Ok(())
}
}

View file

@ -1,3 +0,0 @@
pub mod accept;
pub mod create_note;
pub mod follow;

View file

@ -1,23 +0,0 @@
use actix_web::ResponseError;
use std::fmt::{Display, Formatter};
/// Necessary because of this issue: https://github.com/actix/actix-web/issues/1711
#[derive(Debug)]
pub struct Error(anyhow::Error);
impl ResponseError for Error {}
impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
impl<T> From<T> for Error
where
T: Into<anyhow::Error>,
{
fn from(t: T) -> Self {
Error(t.into())
}
}

View file

@ -1,125 +0,0 @@
use crate::{
error::Error,
generate_object_id,
objects::{
note::MyPost,
person::{MyUser, PersonAcceptedActivities},
},
};
use activitypub_federation::{
core::{inbox::receive_activity, object_id::ObjectId, signatures::generate_actor_keypair},
data::Data,
deser::context::WithContext,
traits::ApubObject,
InstanceSettingsBuilder,
LocalInstance,
APUB_JSON_CONTENT_TYPE,
};
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use http_signature_normalization_actix::prelude::VerifyDigest;
use reqwest::Client;
use sha2::{Digest, Sha256};
use std::{
ops::Deref,
sync::{Arc, Mutex},
};
use tokio::task;
use url::Url;
pub type InstanceHandle = Arc<Instance>;
pub struct Instance {
/// This holds all library data
local_instance: LocalInstance,
/// Our "database" which contains all known users (local and federated)
pub users: Mutex<Vec<MyUser>>,
/// Same, but for posts
pub posts: Mutex<Vec<MyPost>>,
}
impl Instance {
pub fn new(hostname: String) -> Result<InstanceHandle, Error> {
let settings = InstanceSettingsBuilder::default()
.testing_send_sync(true)
.worker_count(1)
.build()?;
let local_instance = LocalInstance::new(hostname.clone(), Client::default().into(), settings);
let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?);
let instance = Arc::new(Instance {
local_instance,
users: Mutex::new(vec![local_user]),
posts: Mutex::new(vec![]),
});
Ok(instance)
}
pub fn local_user(&self) -> MyUser {
self.users.lock().unwrap().first().cloned().unwrap()
}
pub fn local_instance(&self) -> &LocalInstance {
&self.local_instance
}
pub fn listen(instance: &InstanceHandle) -> Result<(), Error> {
let hostname = instance.local_instance.hostname();
let instance = instance.clone();
let server = HttpServer::new(move || {
App::new()
.app_data(web::Data::new(instance.clone()))
.route("/objects/{user_name}", web::get().to(http_get_user))
.service(
web::scope("")
// Important: this ensures that the activity json matches the hashsum in signed
// HTTP header
// TODO: it would be possible to get rid of this by verifying hash in
// receive_activity()
.wrap(VerifyDigest::new(Sha256::new()))
// Just a single, global inbox for simplicity
.route("/inbox", web::post().to(http_post_user_inbox)),
)
})
.bind(hostname)?
.run();
task::spawn(server);
Ok(())
}
}
/// Handles requests to fetch user json over HTTP
async fn http_get_user(
request: HttpRequest,
data: web::Data<InstanceHandle>,
) -> Result<HttpResponse, Error> {
let data: InstanceHandle = data.into_inner().deref().clone();
let hostname: String = data.local_instance.hostname().to_string();
let request_url = format!("http://{}{}", hostname, &request.uri().to_string());
let url = Url::parse(&request_url)?;
let user = ObjectId::<MyUser>::new(url)
.dereference_local::<Error>(&data)
.await?
.into_apub(&data)
.await?;
Ok(
HttpResponse::Ok()
.content_type(APUB_JSON_CONTENT_TYPE)
.json(WithContext::new_default(user)),
)
}
/// Handles messages received in user inbox
async fn http_post_user_inbox(
request: HttpRequest,
payload: String,
data: web::Data<InstanceHandle>,
) -> Result<HttpResponse, Error> {
let data: InstanceHandle = data.into_inner().deref().clone();
let activity = serde_json::from_str(&payload)?;
receive_activity::<WithContext<PersonAcceptedActivities>, MyUser, InstanceHandle, Error>(
request,
activity,
&data.clone().local_instance,
&Data::new(data),
)
.await
}

View file

@ -1,13 +0,0 @@
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use url::{ParseError, Url};
/// Just generate random url as object id. In a real project, you probably want to use
/// an url which contains the database id for easy retrieval (or store the random id in db).
pub fn generate_object_id(hostname: &str) -> Result<Url, ParseError> {
let id: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect();
Url::parse(&format!("http://{}/objects/{}", hostname, id))
}

View file

@ -1,42 +0,0 @@
use crate::{error::Error, instance::Instance, lib::generate_object_id, objects::note::MyPost};
use tracing::log::LevelFilter;
mod activities;
mod error;
mod instance;
mod lib;
mod objects;
#[actix_rt::main]
async fn main() -> Result<(), Error> {
env_logger::builder()
.filter_level(LevelFilter::Debug)
.init();
let alpha = Instance::new("localhost:8001".to_string())?;
let beta = Instance::new("localhost:8002".to_string())?;
Instance::listen(&alpha)?;
Instance::listen(&beta)?;
// alpha user follows beta user
alpha
.local_user()
.follow(&beta.local_user(), &alpha)
.await?;
// assert that follow worked correctly
assert_eq!(
beta.local_user().followers(),
&vec![alpha.local_user().ap_id.inner().clone()]
);
// beta sends a post to its followers
let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id);
beta.local_user().post(sent_post.clone(), &beta).await?;
let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap();
// assert that alpha received the post
assert_eq!(received_post.text, sent_post.text);
assert_eq!(received_post.ap_id.inner(), sent_post.ap_id.inner());
assert_eq!(received_post.creator.inner(), sent_post.creator.inner());
Ok(())
}

View file

@ -1,2 +0,0 @@
pub mod note;
pub mod person;

View file

@ -1,93 +0,0 @@
use crate::{generate_object_id, instance::InstanceHandle, objects::person::MyUser};
use activitypub_federation::{
core::object_id::ObjectId,
deser::helpers::deserialize_one_or_many,
traits::ApubObject,
};
use activitystreams_kinds::{object::NoteType, public};
use anyhow::Error;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Clone, Debug)]
pub struct MyPost {
pub text: String,
pub ap_id: ObjectId<MyPost>,
pub creator: ObjectId<MyUser>,
pub local: bool,
}
impl MyPost {
pub fn new(text: String, creator: ObjectId<MyUser>) -> MyPost {
MyPost {
text,
ap_id: ObjectId::new(generate_object_id(creator.inner().domain().unwrap()).unwrap()),
creator,
local: true,
}
}
}
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Note {
#[serde(rename = "type")]
kind: NoteType,
id: ObjectId<MyPost>,
pub(crate) attributed_to: ObjectId<MyUser>,
#[serde(deserialize_with = "deserialize_one_or_many")]
pub(crate) to: Vec<Url>,
content: String,
}
#[async_trait::async_trait(?Send)]
impl ApubObject for MyPost {
type DataType = InstanceHandle;
type ApubType = Note;
type DbType = ();
type Error = crate::error::Error;
async fn read_from_apub_id(
_object_id: Url,
_data: &Self::DataType,
) -> Result<Option<Self>, Self::Error> {
todo!()
}
async fn into_apub(self, data: &Self::DataType) -> Result<Self::ApubType, Self::Error> {
let creator = self.creator.dereference_local::<Error>(data).await?;
Ok(Note {
kind: Default::default(),
id: self.ap_id,
attributed_to: self.creator,
to: vec![public(), creator.followers_url()?],
content: self.text,
})
}
async fn verify(
_apub: &Self::ApubType,
_expected_domain: &Url,
_data: &Self::DataType,
_request_counter: &mut i32,
) -> Result<(), Self::Error> {
Ok(())
}
async fn from_apub(
apub: Self::ApubType,
data: &Self::DataType,
_request_counter: &mut i32,
) -> Result<Self, Self::Error> {
let post = MyPost {
text: apub.content,
ap_id: apub.id,
creator: apub.attributed_to,
local: false,
};
let mut lock = data.posts.lock().unwrap();
lock.push(post.clone());
Ok(post)
}
}

View file

@ -1,195 +0,0 @@
use crate::{
activities::{accept::Accept, create_note::CreateNote, follow::Follow},
error::Error,
instance::InstanceHandle,
lib::generate_object_id,
objects::note::MyPost,
};
use activitypub_federation::{
core::{
activity_queue::SendActivity,
inbox::ActorPublicKey,
object_id::ObjectId,
signatures::{Keypair, PublicKey},
},
deser::context::WithContext,
traits::ApubObject,
LocalInstance,
};
use activitypub_federation_derive::activity_handler;
use activitystreams_kinds::actor::PersonType;
use serde::{Deserialize, Serialize};
use tracing::log::debug;
use url::Url;
#[derive(Debug, Clone)]
pub struct MyUser {
pub ap_id: ObjectId<MyUser>,
pub inbox: Url,
// exists for all users (necessary to verify http signatures)
public_key: String,
// exists only for local users
private_key: Option<String>,
pub followers: Vec<Url>,
pub local: bool,
}
/// List of all activities which this actor can receive.
#[activity_handler(InstanceHandle, Error)]
#[derive(Deserialize, Serialize, Debug)]
#[serde(untagged)]
pub enum PersonAcceptedActivities {
Follow(Follow),
Accept(Accept),
CreateNote(CreateNote),
}
impl MyUser {
pub fn new(ap_id: Url, keypair: Keypair) -> MyUser {
let mut inbox = ap_id.clone();
inbox.set_path("/inbox");
let ap_id = ObjectId::new(ap_id);
MyUser {
ap_id,
inbox,
public_key: keypair.public_key,
private_key: Some(keypair.private_key),
followers: vec![],
local: true,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Person {
#[serde(rename = "type")]
kind: PersonType,
id: ObjectId<MyUser>,
inbox: Url,
public_key: PublicKey,
}
impl MyUser {
pub fn followers(&self) -> &Vec<Url> {
&self.followers
}
pub fn followers_url(&self) -> Result<Url, Error> {
Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?)
}
fn public_key(&self) -> PublicKey {
PublicKey::new_main_key(self.ap_id.clone().into_inner(), self.public_key.clone())
}
pub async fn follow(&self, other: &MyUser, instance: &InstanceHandle) -> Result<(), Error> {
let id = generate_object_id(instance.local_instance().hostname())?;
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone());
self
.send(
id,
follow,
vec![other.inbox.clone()],
instance.local_instance(),
)
.await?;
Ok(())
}
pub async fn post(&self, post: MyPost, instance: &InstanceHandle) -> Result<(), Error> {
let id = generate_object_id(instance.local_instance().hostname())?;
let create = CreateNote::new(post.into_apub(instance).await?, id.clone());
let mut inboxes = vec![];
for f in self.followers.clone() {
let user: MyUser = ObjectId::new(f)
.dereference::<Error>(instance, instance.local_instance(), &mut 0)
.await?;
inboxes.push(user.inbox);
}
self
.send(id, &create, inboxes, instance.local_instance())
.await?;
Ok(())
}
pub(crate) async fn send<Activity: Serialize>(
&self,
activity_id: Url,
activity: Activity,
inboxes: Vec<Url>,
local_instance: &LocalInstance,
) -> Result<(), Error> {
let serialized = serde_json::to_string_pretty(&WithContext::new_default(activity))?;
debug!("Sending activity: {}", &serialized);
SendActivity {
activity_id,
actor_public_key: self.public_key(),
actor_private_key: self.private_key.clone().expect("has private key"),
inboxes,
activity: serialized,
}
.send(local_instance)
.await?;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl ApubObject for MyUser {
type DataType = InstanceHandle;
type ApubType = Person;
type DbType = MyUser;
type Error = crate::error::Error;
async fn read_from_apub_id(
object_id: Url,
data: &Self::DataType,
) -> Result<Option<Self>, Self::Error> {
let users = data.users.lock().unwrap();
let res = users
.clone()
.into_iter()
.find(|u| u.ap_id.inner() == &object_id);
Ok(res)
}
async fn into_apub(self, _data: &Self::DataType) -> Result<Self::ApubType, Self::Error> {
Ok(Person {
kind: Default::default(),
id: self.ap_id.clone(),
inbox: self.inbox.clone(),
public_key: self.public_key(),
})
}
async fn verify(
_apub: &Self::ApubType,
_expected_domain: &Url,
_data: &Self::DataType,
_request_counter: &mut i32,
) -> Result<(), Self::Error> {
Ok(())
}
async fn from_apub(
apub: Self::ApubType,
_data: &Self::DataType,
_request_counter: &mut i32,
) -> Result<Self, Self::Error> {
Ok(MyUser {
ap_id: apub.id,
inbox: apub.inbox,
public_key: apub.public_key.public_key_pem,
private_key: None,
followers: vec![],
local: false,
})
}
}
impl ActorPublicKey for MyUser {
fn public_key(&self) -> &str {
&self.public_key
}
}

View file

@ -1,196 +0,0 @@
use crate::{
core::signatures::{sign_request, PublicKey},
Error,
LocalInstance,
APUB_JSON_CONTENT_TYPE,
};
use anyhow::anyhow;
use background_jobs::{
memory_storage::Storage,
ActixJob,
Backoff,
Manager,
MaxRetries,
WorkerConfig,
};
use http::{header::HeaderName, HeaderMap, HeaderValue};
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, future::Future, pin::Pin, time::Duration};
use tracing::{info, warn};
use url::Url;
/// Necessary data for sending out an activity
#[derive(Debug)]
pub struct SendActivity {
/// Id of the sent activity, used for logging
pub activity_id: Url,
/// Public key and actor id of the sender
pub actor_public_key: PublicKey,
/// Signing key of sender for HTTP signatures
pub actor_private_key: String,
/// List of Activitypub inboxes that the activity gets delivered to
pub inboxes: Vec<Url>,
/// Activity json
pub activity: String,
}
impl SendActivity {
/// Send out the given activity to all inboxes, automatically generating the HTTP signatures. By
/// default, sending is done on a background thread, and automatically retried on failure with
/// exponential backoff.
///
/// For debugging or testing, you might want to set [[InstanceSettings.testing_send_sync]].
pub async fn send(self, instance: &LocalInstance) -> Result<(), Error> {
let activity_queue = &instance.activity_queue;
for inbox in self.inboxes {
let message = SendActivityTask {
activity_id: self.activity_id.clone(),
inbox,
activity: self.activity.clone(),
public_key: self.actor_public_key.clone(),
private_key: self.actor_private_key.clone(),
};
if instance.settings.testing_send_sync {
let res = do_send(message, &instance.client, instance.settings.request_timeout).await;
// Don't fail on error, as we intentionally do some invalid actions in tests, to verify that
// they are rejected on the receiving side. These errors shouldn't bubble up to make the API
// call fail. This matches the behaviour in production.
if let Err(e) = res {
warn!("{}", e);
}
} else {
activity_queue.queue::<SendActivityTask>(message).await?;
let stats = activity_queue.get_stats().await?;
info!(
"Activity queue stats: pending: {}, running: {}, dead (this hour): {}, complete (this hour): {}",
stats.pending,
stats.running,
stats.dead.this_hour(),
stats.complete.this_hour()
);
}
}
Ok(())
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
struct SendActivityTask {
activity_id: Url,
inbox: Url,
activity: String,
public_key: PublicKey,
private_key: String,
}
/// Signs the activity with the sending actor's key, and delivers to the given inbox. Also retries
/// if the delivery failed.
impl ActixJob for SendActivityTask {
type State = MyState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "SendActivityTask";
/// With these params, retries are made at the following intervals:
/// 3s
/// 9s
/// 27s
/// 1m 21s
/// 4m 3s
/// 12m 9s
/// 36m 27s
/// 1h 49m 21s
/// 5h 28m 3s
/// 16h 24m 9s
const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
const BACKOFF: Backoff = Backoff::Exponential(3);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { do_send(self, &state.client, state.timeout).await })
}
}
async fn do_send(
task: SendActivityTask,
client: &ClientWithMiddleware,
timeout: Duration,
) -> Result<(), anyhow::Error> {
info!("Sending {} to {}", task.activity_id, task.inbox);
let request_builder = client
.post(&task.inbox.to_string())
.timeout(timeout)
.headers(generate_request_headers(&task.inbox));
let request = sign_request(
request_builder,
task.activity.clone(),
task.public_key.clone(),
task.private_key.to_owned(),
)
.await?;
let response = client.execute(request).await;
match response {
Ok(o) => {
if o.status().is_success() {
Ok(())
} else {
let status = o.status();
let text = o.text().await.map_err(Error::conv)?;
Err(anyhow!(
"Send {} to {} failed with status {}: {}",
task.activity_id,
task.inbox,
status,
text,
))
}
}
Err(e) => Err(anyhow!(
"Failed to send activity {} to {}: {}",
&task.activity_id,
task.inbox,
e
)),
}
}
fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
let mut host = inbox_url.domain().expect("read inbox domain").to_string();
if let Some(port) = inbox_url.port() {
host = format!("{}:{}", host, port);
}
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static(APUB_JSON_CONTENT_TYPE),
);
headers.insert(
HeaderName::from_static("host"),
HeaderValue::from_str(&host).expect("Hostname is valid"),
);
headers
}
pub(crate) fn create_activity_queue(
client: ClientWithMiddleware,
worker_count: u64,
timeout: Duration,
) -> Manager {
// Configure and start our workers
WorkerConfig::new_managed(Storage::new(), move |_| MyState {
client: client.clone(),
timeout,
})
.register::<SendActivityTask>()
.set_worker_count("default", worker_count)
.start()
}
#[derive(Clone)]
struct MyState {
client: ClientWithMiddleware,
timeout: Duration,
}

View file

@ -1,51 +0,0 @@
use crate::{
core::{object_id::ObjectId, signatures::verify_signature},
data::Data,
traits::{ActivityHandler, ApubObject},
utils::verify_domains_match,
Error,
LocalInstance,
};
use actix_web::{HttpRequest, HttpResponse};
use serde::de::DeserializeOwned;
use tracing::log::debug;
pub trait ActorPublicKey {
/// Returns the actor's public key for verification of HTTP signatures
fn public_key(&self) -> &str;
}
/// Receive an activity and perform some basic checks, including HTTP signature verification.
pub async fn receive_activity<Activity, Actor, Datatype, E>(
request: HttpRequest,
activity: Activity,
local_instance: &LocalInstance,
data: &Data<Datatype>,
) -> Result<HttpResponse, E>
where
Activity: ActivityHandler<DataType = Datatype, Error = E> + DeserializeOwned + Send + 'static,
Actor: ApubObject<DataType = Datatype, Error = E> + ActorPublicKey + Send + 'static,
for<'de2> <Actor as ApubObject>::ApubType: serde::Deserialize<'de2>,
E: From<anyhow::Error> + From<Error>,
{
verify_domains_match(activity.id(), activity.actor())?;
if local_instance.is_local_url(activity.id()) {
return Err(Error::UrlVerificationError("Activity was sent from local instance").into());
}
(local_instance.settings.verify_url_function)(activity.id())
.map_err(Error::UrlVerificationError)?;
let request_counter = &mut 0;
let actor = ObjectId::<Actor>::new(activity.actor().clone())
.dereference::<E>(data, local_instance, request_counter)
.await?;
verify_signature(&request, actor.public_key())?;
debug!("Verifying activity {}", activity.id().to_string());
activity.verify(data, request_counter).await?;
debug!("Receiving activity {}", activity.id().to_string());
activity.receive(data, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}

View file

@ -1,4 +0,0 @@
pub mod activity_queue;
pub mod inbox;
pub mod object_id;
pub mod signatures;

View file

@ -1,261 +0,0 @@
use crate::{traits::ApubObject, utils::fetch_object_http, Error, LocalInstance};
use anyhow::anyhow;
use chrono::{Duration as ChronoDuration, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{
fmt::{Debug, Display, Formatter},
marker::PhantomData,
};
use url::Url;
/// We store Url on the heap because it is quite large (88 bytes).
#[derive(Serialize, Deserialize, Debug)]
#[serde(transparent)]
pub struct ObjectId<Kind>(Box<Url>, PhantomData<Kind>)
where
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>;
impl<Kind> ObjectId<Kind>
where
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
pub fn new<T>(url: T) -> Self
where
T: Into<Url>,
{
ObjectId(Box::new(url.into()), PhantomData::<Kind>)
}
pub fn inner(&self) -> &Url {
&self.0
}
pub fn into_inner(self) -> Url {
*self.0
}
/// Fetches an activitypub object, either from local database (if possible), or over http.
pub async fn dereference<E>(
&self,
data: &<Kind as ApubObject>::DataType,
instance: &LocalInstance,
request_counter: &mut i32,
) -> Result<Kind, <Kind as ApubObject>::Error>
where
<Kind as ApubObject>::Error: From<Error> + From<anyhow::Error>,
{
let db_object = self.dereference_from_db(data).await?;
// if its a local object, only fetch it from the database and not over http
if instance.is_local_url(&self.0) {
return match db_object {
None => Err(Error::NotFound.into()),
Some(o) => Ok(o),
};
}
// object found in database
if let Some(object) = db_object {
// object is old and should be refetched
if let Some(last_refreshed_at) = object.last_refreshed_at() {
if should_refetch_object(last_refreshed_at) {
return self
.dereference_from_http(data, instance, request_counter, Some(object))
.await;
}
}
Ok(object)
}
// object not found, need to fetch over http
else {
self
.dereference_from_http(data, instance, request_counter, None)
.await
}
}
/// Fetch an object from the local db. Instead of falling back to http, this throws an error if
/// the object is not found in the database.
pub async fn dereference_local<E>(
&self,
data: &<Kind as ApubObject>::DataType,
) -> Result<Kind, <Kind as ApubObject>::Error>
where
<Kind as ApubObject>::Error: From<Error>,
{
let object = self.dereference_from_db(data).await?;
object.ok_or_else(|| Error::NotFound.into())
}
/// returning none means the object was not found in local db
async fn dereference_from_db(
&self,
data: &<Kind as ApubObject>::DataType,
) -> Result<Option<Kind>, <Kind as ApubObject>::Error> {
let id = self.0.clone();
ApubObject::read_from_apub_id(*id, data).await
}
async fn dereference_from_http(
&self,
data: &<Kind as ApubObject>::DataType,
instance: &LocalInstance,
request_counter: &mut i32,
db_object: Option<Kind>,
) -> Result<Kind, <Kind as ApubObject>::Error>
where
<Kind as ApubObject>::Error: From<Error> + From<anyhow::Error>,
{
let res = fetch_object_http(&self.0, instance, request_counter).await;
if let Err(Error::ObjectDeleted) = &res {
if let Some(db_object) = db_object {
db_object.delete(data).await?;
}
return Err(anyhow!("Fetched remote object {} which was deleted", self).into());
}
let res2 = res?;
Kind::verify(&res2, self.inner(), data, request_counter).await?;
Kind::from_apub(res2, data, request_counter).await
}
}
/// Need to implement clone manually, to avoid requiring Kind to be Clone
impl<Kind> Clone for ObjectId<Kind>
where
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
fn clone(&self) -> Self {
ObjectId(self.0.clone(), self.1)
}
}
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 20;
/// Determines when a remote actor should be refetched from its instance. In release builds, this is
/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
///
/// TODO it won't pick up new avatars, summaries etc until a day after.
/// Actors need an "update" activity pushed to other servers to fix this.
fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool {
let update_interval = if cfg!(debug_assertions) {
// avoid infinite loop when fetching community outbox
ChronoDuration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
} else {
ChronoDuration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
};
let refresh_limit = Utc::now().naive_utc() - update_interval;
last_refreshed.lt(&refresh_limit)
}
impl<Kind> Display for ObjectId<Kind>
where
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
#[allow(clippy::to_string_in_display)]
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// Use to_string here because Url.display is not useful for us
write!(f, "{}", self.0)
}
}
impl<Kind> From<ObjectId<Kind>> for Url
where
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
fn from(id: ObjectId<Kind>) -> Self {
*id.0
}
}
impl<Kind> PartialEq for ObjectId<Kind>
where
Kind: ApubObject + Send + 'static,
for<'de2> <Kind as ApubObject>::ApubType: serde::Deserialize<'de2>,
{
fn eq(&self, other: &Self) -> bool {
self.0.eq(&other.0) && self.1 == other.1
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::object_id::should_refetch_object;
use anyhow::Error;
#[derive(Debug)]
struct TestObject {}
#[async_trait::async_trait(?Send)]
impl ApubObject for TestObject {
type DataType = TestObject;
type ApubType = ();
type DbType = ();
type Error = Error;
async fn read_from_apub_id(
_object_id: Url,
_data: &Self::DataType,
) -> Result<Option<Self>, Self::Error>
where
Self: Sized,
{
todo!()
}
async fn into_apub(self, _data: &Self::DataType) -> Result<Self::ApubType, Self::Error> {
todo!()
}
async fn verify(
_apub: &Self::ApubType,
_expected_domain: &Url,
_data: &Self::DataType,
_request_counter: &mut i32,
) -> Result<(), Self::Error> {
todo!()
}
async fn from_apub(
_apub: Self::ApubType,
_data: &Self::DataType,
_request_counter: &mut i32,
) -> Result<Self, Self::Error>
where
Self: Sized,
{
todo!()
}
}
#[test]
fn test_deserialize() {
let url = Url::parse("http://test.com/").unwrap();
let id = ObjectId::<TestObject>::new(url);
let string = serde_json::to_string(&id).unwrap();
assert_eq!("\"http://test.com/\"", string);
let parsed: ObjectId<TestObject> = serde_json::from_str(&string).unwrap();
assert_eq!(parsed, id);
}
#[test]
fn test_should_refetch_object() {
let one_second_ago = Utc::now().naive_utc() - ChronoDuration::seconds(1);
assert!(!should_refetch_object(one_second_ago));
let two_days_ago = Utc::now().naive_utc() - ChronoDuration::days(2);
assert!(should_refetch_object(two_days_ago));
}
}

View file

@ -1,124 +0,0 @@
use actix_web::HttpRequest;
use anyhow::anyhow;
use http_signature_normalization_actix::Config as ConfigActix;
use http_signature_normalization_reqwest::prelude::{Config, SignExt};
use once_cell::sync::Lazy;
use openssl::{
hash::MessageDigest,
pkey::PKey,
rsa::Rsa,
sign::{Signer, Verifier},
};
use reqwest::Request;
use reqwest_middleware::RequestBuilder;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::io::{Error, ErrorKind};
use tracing::debug;
use url::Url;
static CONFIG2: Lazy<ConfigActix> = Lazy::new(ConfigActix::new);
static HTTP_SIG_CONFIG: Lazy<Config> = Lazy::new(Config::new);
/// A private/public key pair used for HTTP signatures
#[derive(Debug, Clone)]
pub struct Keypair {
pub private_key: String,
pub public_key: String,
}
/// Generate the asymmetric keypair for ActivityPub HTTP signatures.
pub fn generate_actor_keypair() -> Result<Keypair, Error> {
let rsa = Rsa::generate(2048)?;
let pkey = PKey::from_rsa(rsa)?;
let public_key = pkey.public_key_to_pem()?;
let private_key = pkey.private_key_to_pem_pkcs8()?;
let key_to_string = |key| match String::from_utf8(key) {
Ok(s) => Ok(s),
Err(e) => Err(Error::new(
ErrorKind::Other,
format!("Failed converting key to string: {}", e),
)),
};
Ok(Keypair {
private_key: key_to_string(private_key)?,
public_key: key_to_string(public_key)?,
})
}
/// Creates an HTTP post request to `inbox_url`, with the given `client` and `headers`, and
/// `activity` as request body. The request is signed with `private_key` and then sent.
pub(crate) async fn sign_request(
request_builder: RequestBuilder,
activity: String,
public_key: PublicKey,
private_key: String,
) -> Result<Request, anyhow::Error> {
request_builder
.signature_with_digest(
HTTP_SIG_CONFIG.clone(),
public_key.id,
Sha256::new(),
activity,
move |signing_string| {
let private_key = PKey::private_key_from_pem(private_key.as_bytes())?;
let mut signer = Signer::new(MessageDigest::sha256(), &private_key)?;
signer.update(signing_string.as_bytes())?;
Ok(base64::encode(signer.sign_to_vec()?)) as Result<_, anyhow::Error>
},
)
.await
}
/// Verifies the HTTP signature on an incoming inbox request.
pub fn verify_signature(request: &HttpRequest, public_key: &str) -> Result<(), anyhow::Error> {
let verified = CONFIG2
.begin_verify(
request.method(),
request.uri().path_and_query(),
request.headers().clone(),
)?
.verify(|signature, signing_string| -> Result<bool, anyhow::Error> {
debug!(
"Verifying with key {}, message {}",
&public_key, &signing_string
);
let public_key = PKey::public_key_from_pem(public_key.as_bytes())?;
let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?;
verifier.update(signing_string.as_bytes())?;
Ok(verifier.verify(&base64::decode(signature)?)?)
})?;
if verified {
debug!("verified signature for {}", &request.uri());
Ok(())
} else {
Err(anyhow!("Invalid signature on request: {}", &request.uri()))
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PublicKey {
pub(crate) id: String,
pub(crate) owner: Url,
pub public_key_pem: String,
}
impl PublicKey {
/// Create public key with default id, for actors that only have a single keypair
pub fn new_main_key(owner: Url, public_key_pem: String) -> Self {
let key_id = format!("{}#main-key", &owner);
PublicKey::new(key_id, owner, public_key_pem)
}
/// Create public key with custom key id. Use this method if there are multiple keypairs per actor
pub fn new(id: String, owner: Url, public_key_pem: String) -> Self {
PublicKey {
id,
owner,
public_key_pem,
}
}
}

View file

@ -1,37 +0,0 @@
use std::{ops::Deref, sync::Arc};
/// This type can be used to pass your own data into library functions and traits. It can be useful
/// to pass around database connections or other context.
#[derive(Debug)]
pub struct Data<T: ?Sized>(Arc<T>);
impl<T> Data<T> {
/// Create new `Data` instance.
pub fn new(state: T) -> Data<T> {
Data(Arc::new(state))
}
/// Get reference to inner app data.
pub fn get_ref(&self) -> &T {
self.0.as_ref()
}
/// Convert to the internal Arc<T>
pub fn into_inner(self) -> Arc<T> {
self.0
}
}
impl<T: ?Sized> Deref for Data<T> {
type Target = Arc<T>;
fn deref(&self) -> &Arc<T> {
&self.0
}
}
impl<T: ?Sized> Clone for Data<T> {
fn clone(&self) -> Data<T> {
Data(self.0.clone())
}
}

View file

@ -1,62 +0,0 @@
use crate::{data::Data, deser::helpers::deserialize_one_or_many, traits::ActivityHandler};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::str::FromStr;
use url::Url;
const DEFAULT_CONTEXT: &str = "[\"https://www.w3.org/ns/activitystreams\"]";
/// Simple wrapper which adds json-ld context to an object or activity. Doing it this way ensures
/// that nested objects dont have any context, but only the outermost one.
#[derive(Serialize, Deserialize, Debug)]
pub struct WithContext<T> {
#[serde(rename = "@context")]
#[serde(deserialize_with = "deserialize_one_or_many")]
context: Vec<Value>,
#[serde(flatten)]
inner: T,
}
impl<T> WithContext<T> {
pub fn new_default(inner: T) -> WithContext<T> {
let context = vec![Value::from_str(DEFAULT_CONTEXT).expect("valid context")];
WithContext::new(inner, context)
}
pub fn new(inner: T, context: Vec<Value>) -> WithContext<T> {
WithContext { context, inner }
}
}
#[async_trait::async_trait(?Send)]
impl<T> ActivityHandler for WithContext<T>
where
T: ActivityHandler,
{
type DataType = <T as ActivityHandler>::DataType;
type Error = <T as ActivityHandler>::Error;
fn id(&self) -> &Url {
self.inner.id()
}
fn actor(&self) -> &Url {
self.inner.actor()
}
async fn verify(
&self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
self.inner.verify(data, request_counter).await
}
async fn receive(
self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
self.inner.receive(data, request_counter).await
}
}

View file

@ -1,66 +0,0 @@
use serde::{Deserialize, Deserializer};
/// Deserialize either a single json value, or a json array. In either case, the items are returned
/// as an array.
///
/// Usage:
/// `#[serde(deserialize_with = "deserialize_one_or_many")]`
pub fn deserialize_one_or_many<'de, T, D>(deserializer: D) -> Result<Vec<T>, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum OneOrMany<T> {
One(T),
Many(Vec<T>),
}
let result: OneOrMany<T> = Deserialize::deserialize(deserializer)?;
Ok(match result {
OneOrMany::Many(list) => list,
OneOrMany::One(value) => vec![value],
})
}
/// Deserialize either a single json value, or a json array with one element. In both cases it
/// returns a single value.
///
/// Usage:
/// `#[serde(deserialize_with = "deserialize_one")]`
pub fn deserialize_one<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum MaybeArray<T> {
Simple(T),
Array([T; 1]),
}
let result: MaybeArray<T> = Deserialize::deserialize(deserializer)?;
Ok(match result {
MaybeArray::Simple(value) => value,
MaybeArray::Array([value]) => value,
})
}
/// Attempts to deserialize the item. If any error happens, its ignored and the type's default
/// value is returned.
///
/// Usage:
/// `#[serde(deserialize_with = "deserialize_skip_error")]`
pub fn deserialize_skip_error<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: Deserialize<'de> + Default,
D: Deserializer<'de>,
{
let result = Deserialize::deserialize(deserializer);
Ok(match result {
Ok(o) => o,
Err(_) => Default::default(),
})
}

View file

@ -1,3 +0,0 @@
pub mod context;
pub mod helpers;
pub mod values;

View file

@ -1,61 +0,0 @@
//! The enums here serve to limit a json string value to a single, hardcoded value which can be
//! verified at compilation time. When using it as the type of a struct field, the struct can only
//! be constructed or deserialized if the field has the exact same value.
//!
//! If we used String as the field type, any value would be accepted, and we would have to check
//! manually at runtime that it contains the expected value.
//!
//! The enums in `activitystreams::activity::kind` work in the same way, and can be used to
//! distinguish different activity types.
//!
//! In the example below, `MyObject` can only be constructed or
//! deserialized if `media_type` is `text/markdown`, but not if it is `text/html`.
//!
//! ```
//! use serde_json::from_str;
//! use serde::{Deserialize, Serialize};
//! use activitypub_federation::deser::values::MediaTypeMarkdown;
//!
//! #[derive(Deserialize, Serialize)]
//! struct MyObject {
//! content: String,
//! media_type: MediaTypeMarkdown,
//! }
//!
//! let markdown_json = r#"{"content": "**test**", "media_type": "text/markdown"}"#;
//! let from_markdown = from_str::<MyObject>(markdown_json);
//! assert!(from_markdown.is_ok());
//!
//! let markdown_html = r#"{"content": "<b>test</b>", "media_type": "text/html"}"#;
//! let from_html = from_str::<MyObject>(markdown_html);
//! assert!(from_html.is_err());
//! ```
use serde::{Deserialize, Serialize};
/// Media type for markdown text.
///
/// <https://www.iana.org/assignments/media-types/media-types.xhtml>
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum MediaTypeMarkdown {
#[serde(rename = "text/markdown")]
Markdown,
}
/// Media type for HTML text.
///
/// <https://www.iana.org/assignments/media-types/media-types.xhtml>
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum MediaTypeHtml {
#[serde(rename = "text/html")]
Html,
}
/// Media type which allows both markdown and HTML.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum MediaTypeMarkdownOrHtml {
#[serde(rename = "text/markdown")]
Markdown,
#[serde(rename = "text/html")]
Html,
}

View file

@ -1,101 +0,0 @@
use crate::core::activity_queue::create_activity_queue;
use background_jobs::Manager;
use derive_builder::Builder;
use reqwest_middleware::ClientWithMiddleware;
use std::time::Duration;
use url::Url;
pub mod core;
pub mod data;
pub mod deser;
pub mod traits;
pub mod utils;
/// Mime type for Activitypub, used for `Accept` and `Content-Type` HTTP headers
pub static APUB_JSON_CONTENT_TYPE: &str = "application/activity+json";
/// Represents a single, federated instance (for example lemmy.ml). There should only be one of
/// this in your application (except for testing).
pub struct LocalInstance {
hostname: String,
client: ClientWithMiddleware,
activity_queue: Manager,
settings: InstanceSettings,
}
// Use InstanceSettingsBuilder to initialize this
#[derive(Builder)]
pub struct InstanceSettings {
/// Maximum number of outgoing HTTP requests per incoming activity
#[builder(default = "20")]
http_fetch_retry_limit: i32,
/// Number of worker threads for sending outgoing activities
#[builder(default = "64")]
worker_count: u64,
/// Send outgoing activities synchronously, not in background thread. Helps to make tests
/// more consistent, but not recommended for production.
#[builder(default = "false")]
testing_send_sync: bool,
/// Timeout for all HTTP requests. HTTP signatures are valid for 10s, so it makes sense to
/// use the same as timeout when sending
#[builder(default = "Duration::from_secs(10)")]
request_timeout: Duration,
/// Function used to verify that urls are valid, used when receiving activities or fetching remote
/// objects. Use this to implement functionality like federation blocklists. In case verification
/// fails, it should return an error message.
#[builder(default = "|_| { Ok(()) }")]
verify_url_function: fn(&Url) -> Result<(), &'static str>,
}
impl LocalInstance {
pub fn new(domain: String, client: ClientWithMiddleware, settings: InstanceSettings) -> Self {
let activity_queue = create_activity_queue(
client.clone(),
settings.worker_count,
settings.request_timeout,
);
LocalInstance {
hostname: domain,
client,
activity_queue,
settings,
}
}
/// Returns true if the url refers to this instance. Handles hostnames like `localhost:8540` for
/// local debugging.
fn is_local_url(&self, url: &Url) -> bool {
let mut domain = url.domain().expect("id has domain").to_string();
if let Some(port) = url.port() {
domain = format!("{}:{}", domain, port);
}
domain == self.hostname
}
/// Returns the local hostname
pub fn hostname(&self) -> &str {
&self.hostname
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Object was not found in database")]
NotFound,
#[error("Request limit was reached during fetch")]
RequestLimit,
#[error("Object to be fetched was deleted")]
ObjectDeleted,
#[error("{0}")]
UrlVerificationError(&'static str),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl Error {
pub fn conv<T>(error: T) -> Self
where
T: Into<anyhow::Error>,
{
Error::Other(error.into())
}
}

View file

@ -1,91 +0,0 @@
use crate::data::Data;
pub use activitypub_federation_derive::*;
use chrono::NaiveDateTime;
use url::Url;
/// Trait which allows verification and reception of incoming activities.
#[async_trait::async_trait(?Send)]
pub trait ActivityHandler {
type DataType;
type Error;
/// `id` field of the activity
fn id(&self) -> &Url;
/// `actor` field of activity
fn actor(&self) -> &Url;
/// Verify that the activity is valid. If this method returns an error, the activity will be
/// discarded. This is separate from receive(), so that it can be called recursively on nested
/// objects, without storing something in the database by accident.
async fn verify(
&self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error>;
/// Receives the activity and stores its action in database.
async fn receive(
self,
data: &Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error>;
}
#[async_trait::async_trait(?Send)]
pub trait ApubObject {
type DataType;
type ApubType;
type DbType;
type Error;
/// If the object is stored in the database, this method should return the fetch time. Used to
/// update actors after certain interval.
fn last_refreshed_at(&self) -> Option<NaiveDateTime> {
None
}
/// Try to read the object with given ID from local database. Returns Ok(None) if it doesn't exist.
async fn read_from_apub_id(
object_id: Url,
data: &Self::DataType,
) -> Result<Option<Self>, Self::Error>
where
Self: Sized;
/// Marks the object as deleted in local db. Called when a delete activity is received, or if
/// fetch returns a tombstone.
async fn delete(self, _data: &Self::DataType) -> Result<(), Self::Error>
where
Self: Sized,
{
Ok(())
}
/// Trait for converting an object or actor into the respective ActivityPub type.
async fn into_apub(self, data: &Self::DataType) -> Result<Self::ApubType, Self::Error>;
/// Verify that the object is valid. If this method returns an error, it will be
/// discarded. This is separate from from_apub(), so that it can be called recursively on nested
/// objects, without storing something in the database by accident.
async fn verify(
apub: &Self::ApubType,
expected_domain: &Url,
data: &Self::DataType,
request_counter: &mut i32,
) -> Result<(), Self::Error>;
/// Converts an object from ActivityPub type to Lemmy internal type.
///
/// * `apub` The object to read from
/// * `context` LemmyContext which holds DB pool, HTTP client etc
/// * `expected_domain` Domain where the object was received from. None in case of mod action.
/// * `mod_action_allowed` True if the object can be a mod activity, ignore `expected_domain` in this case
async fn from_apub(
apub: Self::ApubType,
data: &Self::DataType,
request_counter: &mut i32,
) -> Result<Self, Self::Error>
where
Self: Sized;
}

View file

@ -1,51 +0,0 @@
use crate::{Error, LocalInstance, APUB_JSON_CONTENT_TYPE};
use http::StatusCode;
use serde::de::DeserializeOwned;
use tracing::log::info;
use url::Url;
pub async fn fetch_object_http<Kind: DeserializeOwned>(
url: &Url,
instance: &LocalInstance,
request_counter: &mut i32,
) -> Result<Kind, Error> {
// dont fetch local objects this way
debug_assert!(url.domain() != Some(&instance.hostname));
info!("Fetching remote object {}", url.to_string());
*request_counter += 1;
if *request_counter > instance.settings.http_fetch_retry_limit {
return Err(Error::RequestLimit);
}
let res = instance
.client
.get(url.as_str())
.header("Accept", APUB_JSON_CONTENT_TYPE)
.timeout(instance.settings.request_timeout)
.send()
.await
.map_err(Error::conv)?;
if res.status() == StatusCode::GONE {
return Err(Error::ObjectDeleted);
}
res.json().await.map_err(Error::conv)
}
/// Check that both urls have the same domain. If not, return UrlVerificationError.
pub fn verify_domains_match(a: &Url, b: &Url) -> Result<(), Error> {
if a.domain() != b.domain() {
return Err(Error::UrlVerificationError("Domains do not match"));
}
Ok(())
}
/// Check that both urls are identical. If not, return UrlVerificationError.
pub fn verify_urls_match(a: &Url, b: &Url) -> Result<(), Error> {
if a != b {
return Err(Error::UrlVerificationError("Urls do not match"));
}
Ok(())
}

View file

@ -1,19 +0,0 @@
[package]
name = "activitypub_federation_derive"
version = "0.16.4-rc.11"
edition = "2021"
description = "A link aggregator for the fediverse"
license = "AGPL-3.0"
homepage = "https://join-lemmy.org/"
documentation = "https://join-lemmy.org/docs/en/index.html"
[lib]
proc-macro = true
[dev-dependencies]
trybuild = { version = "1.0.57", features = ["diff"] }
[dependencies]
proc-macro2 = "1.0.36"
syn = "1.0.90"
quote = "1.0.17"

View file

@ -1,137 +0,0 @@
use proc_macro2::TokenStream;
use quote::quote;
use syn::{parse_macro_input, Data, DeriveInput, Fields::Unnamed, Ident, Variant};
/// Generates implementation ActivityHandler for an enum, which looks like the following (handling
/// all enum variants).
///
/// Based on this code:
/// ```ignore
/// #[derive(serde::Deserialize, serde::Serialize)]
/// #[serde(untagged)]
/// #[activity_handler(LemmyContext, LemmyError)]
/// pub enum PersonInboxActivities {
/// CreateNote(CreateNote),
/// UpdateNote(UpdateNote),
/// }
/// ```
/// It will generate this:
/// ```ignore
/// impl ActivityHandler for PersonInboxActivities {
/// type DataType = LemmyContext;
/// type Error = LemmyError;
///
/// async fn verify(
/// &self,
/// data: &Self::DataType,
/// request_counter: &mut i32,
/// ) -> Result<(), Self::Error> {
/// match self {
/// PersonInboxActivities::CreateNote(a) => a.verify(data, request_counter).await,
/// PersonInboxActivities::UpdateNote(a) => a.verify(context, request_counter).await,
/// }
/// }
///
/// async fn receive(
/// &self,
/// data: &Self::DataType,
/// request_counter: &mut i32,
/// ) -> Result<(), Self::Error> {
/// match self {
/// PersonInboxActivities::CreateNote(a) => a.receive(data, request_counter).await,
/// PersonInboxActivities::UpdateNote(a) => a.receive(data, request_counter).await,
/// }
/// }
/// ```
#[proc_macro_attribute]
pub fn activity_handler(
attr: proc_macro::TokenStream,
input: proc_macro::TokenStream,
) -> proc_macro::TokenStream {
let derive_input = parse_macro_input!(input as DeriveInput);
let derive_input2 = derive_input.clone();
let attr = proc_macro2::TokenStream::from(attr);
let mut attr = attr.into_iter();
let data_type = attr.next().expect("data type input");
let _delimiter = attr.next();
let error = attr.next().expect("error type input");
let enum_name = derive_input2.ident;
let (impl_generics, ty_generics, where_clause) = derive_input2.generics.split_for_impl();
let enum_variants = if let Data::Enum(d) = derive_input2.data {
d.variants
} else {
unimplemented!()
};
let impl_id = enum_variants
.iter()
.map(|v| generate_match_arm(&enum_name, v, &quote! {a.id()}));
let impl_actor = enum_variants
.iter()
.map(|v| generate_match_arm(&enum_name, v, &quote! {a.actor()}));
let body_verify = quote! {a.verify(context, request_counter).await};
let impl_verify = enum_variants
.iter()
.map(|v| generate_match_arm(&enum_name, v, &body_verify));
let body_receive = quote! {a.receive(context, request_counter).await};
let impl_receive = enum_variants
.iter()
.map(|v| generate_match_arm(&enum_name, v, &body_receive));
let expanded = quote! {
#derive_input
#[async_trait::async_trait(?Send)]
impl #impl_generics activitypub_federation::traits::ActivityHandler for #enum_name #ty_generics #where_clause {
type DataType = #data_type;
type Error = #error;
fn id(
&self,
) -> &Url {
match self {
#(#impl_id)*
}
}
fn actor(
&self,
) -> &Url {
match self {
#(#impl_actor)*
}
}
async fn verify(
&self,
context: &activitypub_federation::data::Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
match self {
#(#impl_verify)*
}
}
async fn receive(
self,
context: &activitypub_federation::data::Data<Self::DataType>,
request_counter: &mut i32,
) -> Result<(), Self::Error> {
match self {
#(#impl_receive)*
}
}
}
};
expanded.into()
}
fn generate_match_arm(enum_name: &Ident, variant: &Variant, body: &TokenStream) -> TokenStream {
let id = &variant.ident;
match &variant.fields {
Unnamed(_) => {
quote! {
#enum_name::#id(a) => #body,
}
}
_ => unimplemented!(),
}
}

View file

@ -14,7 +14,7 @@ doctest = false
[dependencies]
lemmy_apub = { version = "=0.16.4-rc.11", path = "../apub" }
activitypub_federation = { version = "=0.16.4-rc.11", path = "../activitypub_federation" }
activitypub_federation = "0.1.0"
lemmy_utils = { version = "=0.16.4-rc.11", path = "../utils" }
lemmy_db_schema = { version = "=0.16.4-rc.11", path = "../db_schema", features = ["full"] }
lemmy_db_views = { version = "=0.16.4-rc.11", path = "../db_views", features = ["full"] }

View file

@ -9,13 +9,13 @@ documentation = "https://join-lemmy.org/docs/en/index.html"
[dependencies]
lemmy_apub = { version = "=0.16.4-rc.11", path = "../apub" }
activitypub_federation = { version = "=0.16.4-rc.11", path = "../activitypub_federation" }
lemmy_utils = { version = "=0.16.4-rc.11", path = "../utils" }
lemmy_db_schema = { version = "=0.16.4-rc.11", path = "../db_schema", features = ["full"] }
lemmy_db_views = { version = "=0.16.4-rc.11", path = "../db_views", features = ["full"] }
lemmy_db_views_actor = { version = "=0.16.4-rc.11", path = "../db_views_actor", features = ["full"] }
lemmy_api_common = { version = "=0.16.4-rc.11", path = "../api_common", features = ["full"] }
lemmy_websocket = { version = "=0.16.4-rc.11", path = "../websocket" }
activitypub_federation = "0.1.0"
bcrypt = "0.12.1"
serde_json = { version = "1.0.79", features = ["preserve_order"] }
serde = { version = "1.0.136", features = ["derive"] }

View file

@ -14,12 +14,12 @@ doctest = false
[dependencies]
lemmy_utils = { version = "=0.16.4-rc.11", path = "../utils" }
activitypub_federation = { version = "=0.16.4-rc.11", path = "../activitypub_federation" }
lemmy_db_schema = { version = "=0.16.4-rc.11", path = "../db_schema", features = ["full"] }
lemmy_db_views = { version = "=0.16.4-rc.11", path = "../db_views", features = ["full"] }
lemmy_db_views_actor = { version = "=0.16.4-rc.11", path = "../db_views_actor", features = ["full"] }
lemmy_api_common = { version = "=0.16.4-rc.11", path = "../api_common", features = ["full"] }
lemmy_websocket = { version = "=0.16.4-rc.11", path = "../websocket" }
activitypub_federation = "0.1.0"
diesel = "1.4.8"
activitystreams-kinds = "0.2.1"
chrono = { version = "0.4.19", features = ["serde"], default-features = false }

View file

@ -23,7 +23,7 @@ url = { version = "2.2.2", features = ["serde"] }
strum = "0.24.0"
strum_macros = "0.24.0"
serde_json = { version = "1.0.79", features = ["preserve_order"], optional = true }
activitypub_federation = { version = "=0.16.4-rc.11", path = "../activitypub_federation", optional = true }
activitypub_federation = { version = "0.1.0", optional = true }
lemmy_utils = { version = "=0.16.4-rc.11", path = "../utils", optional = true }
bcrypt = { version = "0.12.1", optional = true }
diesel = { version = "1.4.8", features = ["postgres","chrono","r2d2","serde_json"], optional = true }