mirror of
https://github.com/LemmyNet/activitypub-federation-rust.git
synced 2024-06-02 13:29:36 +00:00
WIP: axum veridy digest + example
Note: this does not compile yet
This commit is contained in:
parent
e51cd50113
commit
52971b64b3
26
Cargo.lock
generated
26
Cargo.lock
generated
|
@ -12,6 +12,7 @@ dependencies = [
|
|||
"anyhow",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"axum-macros",
|
||||
"background-jobs",
|
||||
"base64",
|
||||
"chrono",
|
||||
|
@ -20,9 +21,11 @@ dependencies = [
|
|||
"enum_delegate",
|
||||
"env_logger",
|
||||
"http",
|
||||
"http-signature-normalization",
|
||||
"http-signature-normalization-actix",
|
||||
"http-signature-normalization-reqwest",
|
||||
"httpdate",
|
||||
"hyper",
|
||||
"itertools",
|
||||
"once_cell",
|
||||
"openssl",
|
||||
|
@ -34,6 +37,8 @@ dependencies = [
|
|||
"sha2",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"url",
|
||||
]
|
||||
|
@ -273,9 +278,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
|||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.6.0-rc.5"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "21b6b56b1bdef08d4d50e63683d79c797dbf2ae467586c7502b283241b5e2b8a"
|
||||
checksum = "744864363a200a5e724a7e61bc8c11b6628cf2e3ec519c8a1a48e609a8156b40"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum-core",
|
||||
|
@ -293,6 +298,7 @@ dependencies = [
|
|||
"mime",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
|
@ -307,9 +313,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "axum-core"
|
||||
version = "0.3.0-rc.3"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d1aa274f0599e5100cbc24e1f184d437d8086ea0bba0b5f68326e2ad5a48567"
|
||||
checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
|
@ -317,15 +323,16 @@ dependencies = [
|
|||
"http",
|
||||
"http-body",
|
||||
"mime",
|
||||
"rustversion",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-macros"
|
||||
version = "0.3.0-rc.3"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f2185fff4d6f14de84dcc01b0ff8eee2ac5331a962cf85e60e080ce7db724cc9"
|
||||
checksum = "e4df0fc33ada14a338b799002f7e8657711422b25d4e16afb032708d6b185621"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
|
@ -1493,6 +1500,12 @@ dependencies = [
|
|||
"semver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.11"
|
||||
|
@ -1805,6 +1818,7 @@ dependencies = [
|
|||
"libc",
|
||||
"memchr",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"parking_lot",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
|
|
26
Cargo.toml
26
Cargo.toml
|
@ -30,23 +30,43 @@ dyn-clone = "1.0.9"
|
|||
enum_delegate = "0.2.0"
|
||||
httpdate = "1.0.2"
|
||||
http-signature-normalization-reqwest = { version = "0.7.1", default-features = false, features = ["sha-2", "middleware"] }
|
||||
http-signature-normalization = "0.6.0"
|
||||
|
||||
actix-web = { version = "4.2.1", default-features = false, optional = true }
|
||||
http-signature-normalization-actix = { version = "0.6.1", default-features = false, features = ["server", "sha-2"], optional = true }
|
||||
axum = { version = "0.6.0-rc.5", features = ["json", "headers", "macros"], optional = true }
|
||||
|
||||
# Axum
|
||||
tower-http = { version = "0.3", features = ["map-request-body", "util"], optional = true }
|
||||
tower = { version = "0.4.13", optional = true }
|
||||
hyper = { version = "0.14", optional = true }
|
||||
|
||||
## FIXME: this is for debugging purpose
|
||||
axum-macros = "0.3.0"
|
||||
|
||||
[features]
|
||||
actix = ["dep:actix-web", "dep:http-signature-normalization-actix"]
|
||||
axum = ["dep:axum"]
|
||||
axum = [
|
||||
"dep:axum",
|
||||
"dep:tower-http",
|
||||
"dep:tower",
|
||||
"dep:hyper",
|
||||
]
|
||||
|
||||
[dev-dependencies]
|
||||
activitystreams-kinds = "0.2.1"
|
||||
rand = "0.8.5"
|
||||
actix-rt = "2.7.0"
|
||||
tokio = "1.21.2"
|
||||
tokio = { version = "1.21.2", features = ["full"] }
|
||||
env_logger = { version = "0.9.3", default-features = false }
|
||||
|
||||
[[example]]
|
||||
name = "simple_federation_actix"
|
||||
path = "examples/federation/main.rs"
|
||||
path = "examples/federation-actix/main.rs"
|
||||
required-features = ["actix"]
|
||||
|
||||
[[example]]
|
||||
name = "simple_federation_axum"
|
||||
path = "examples/federation-axum/main.rs"
|
||||
required-features = ["axum"]
|
||||
|
||||
|
|
56
examples/federation-axum/activities/accept.rs
Normal file
56
examples/federation-axum/activities/accept.rs
Normal file
|
@ -0,0 +1,56 @@
|
|||
use crate::{activities::follow::Follow, instance::InstanceHandle, objects::person::MyUser};
|
||||
use activitypub_federation::{core::object_id::ObjectId, data::Data, traits::ActivityHandler};
|
||||
use activitystreams_kinds::activity::AcceptType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Accept {
|
||||
actor: ObjectId<MyUser>,
|
||||
object: Follow,
|
||||
#[serde(rename = "type")]
|
||||
kind: AcceptType,
|
||||
id: Url,
|
||||
}
|
||||
|
||||
impl Accept {
|
||||
pub fn new(actor: ObjectId<MyUser>, object: Follow, id: Url) -> Accept {
|
||||
Accept {
|
||||
actor,
|
||||
object,
|
||||
kind: Default::default(),
|
||||
id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
impl ActivityHandler for Accept {
|
||||
type DataType = InstanceHandle;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
&self.id
|
||||
}
|
||||
|
||||
fn actor(&self) -> &Url {
|
||||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn verify(
|
||||
&self,
|
||||
_data: &Data<Self::DataType>,
|
||||
_request_counter: &mut i32,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn receive(
|
||||
self,
|
||||
_data: &Data<Self::DataType>,
|
||||
_request_counter: &mut i32,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
70
examples/federation-axum/activities/create_note.rs
Normal file
70
examples/federation-axum/activities/create_note.rs
Normal file
|
@ -0,0 +1,70 @@
|
|||
use crate::{
|
||||
instance::InstanceHandle,
|
||||
objects::{note::Note, person::MyUser},
|
||||
MyPost,
|
||||
};
|
||||
use activitypub_federation::{
|
||||
core::object_id::ObjectId,
|
||||
data::Data,
|
||||
deser::helpers::deserialize_one_or_many,
|
||||
traits::{ActivityHandler, ApubObject},
|
||||
};
|
||||
use activitystreams_kinds::activity::CreateType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CreateNote {
|
||||
pub(crate) actor: ObjectId<MyUser>,
|
||||
#[serde(deserialize_with = "deserialize_one_or_many")]
|
||||
pub(crate) to: Vec<Url>,
|
||||
pub(crate) object: Note,
|
||||
#[serde(rename = "type")]
|
||||
pub(crate) kind: CreateType,
|
||||
pub(crate) id: Url,
|
||||
}
|
||||
|
||||
impl CreateNote {
|
||||
pub fn new(note: Note, id: Url) -> CreateNote {
|
||||
CreateNote {
|
||||
actor: note.attributed_to.clone(),
|
||||
to: note.to.clone(),
|
||||
object: note,
|
||||
kind: CreateType::Create,
|
||||
id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
impl ActivityHandler for CreateNote {
|
||||
type DataType = InstanceHandle;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
&self.id
|
||||
}
|
||||
|
||||
fn actor(&self) -> &Url {
|
||||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn verify(
|
||||
&self,
|
||||
data: &Data<Self::DataType>,
|
||||
request_counter: &mut i32,
|
||||
) -> Result<(), Self::Error> {
|
||||
MyPost::verify(&self.object, self.id(), data, request_counter).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn receive(
|
||||
self,
|
||||
data: &Data<Self::DataType>,
|
||||
request_counter: &mut i32,
|
||||
) -> Result<(), Self::Error> {
|
||||
MyPost::from_apub(self.object, data, request_counter).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
88
examples/federation-axum/activities/follow.rs
Normal file
88
examples/federation-axum/activities/follow.rs
Normal file
|
@ -0,0 +1,88 @@
|
|||
use crate::{
|
||||
activities::accept::Accept,
|
||||
generate_object_id,
|
||||
instance::InstanceHandle,
|
||||
objects::person::MyUser,
|
||||
};
|
||||
use activitypub_federation::{
|
||||
core::object_id::ObjectId,
|
||||
data::Data,
|
||||
traits::{ActivityHandler, Actor},
|
||||
};
|
||||
use activitystreams_kinds::activity::FollowType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Follow {
|
||||
pub(crate) actor: ObjectId<MyUser>,
|
||||
pub(crate) object: ObjectId<MyUser>,
|
||||
#[serde(rename = "type")]
|
||||
kind: FollowType,
|
||||
id: Url,
|
||||
}
|
||||
|
||||
impl Follow {
|
||||
pub fn new(actor: ObjectId<MyUser>, object: ObjectId<MyUser>, id: Url) -> Follow {
|
||||
Follow {
|
||||
actor,
|
||||
object,
|
||||
kind: Default::default(),
|
||||
id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
impl ActivityHandler for Follow {
|
||||
type DataType = InstanceHandle;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn id(&self) -> &Url {
|
||||
&self.id
|
||||
}
|
||||
|
||||
fn actor(&self) -> &Url {
|
||||
self.actor.inner()
|
||||
}
|
||||
|
||||
async fn verify(
|
||||
&self,
|
||||
_data: &Data<Self::DataType>,
|
||||
_request_counter: &mut i32,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Ignore clippy false positive: https://github.com/rust-lang/rust-clippy/issues/6446
|
||||
#[allow(clippy::await_holding_lock)]
|
||||
async fn receive(
|
||||
self,
|
||||
data: &Data<Self::DataType>,
|
||||
request_counter: &mut i32,
|
||||
) -> Result<(), Self::Error> {
|
||||
// add to followers
|
||||
let mut users = data.users.lock().unwrap();
|
||||
let local_user = users.first_mut().unwrap();
|
||||
local_user.followers.push(self.actor.inner().clone());
|
||||
let local_user = local_user.clone();
|
||||
drop(users);
|
||||
|
||||
// send back an accept
|
||||
let follower = self
|
||||
.actor
|
||||
.dereference(data, data.local_instance(), request_counter)
|
||||
.await?;
|
||||
let id = generate_object_id(data.local_instance().hostname())?;
|
||||
let accept = Accept::new(local_user.ap_id.clone(), self, id.clone());
|
||||
local_user
|
||||
.send(
|
||||
accept,
|
||||
vec![follower.shared_inbox_or_inbox()],
|
||||
data.local_instance(),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
3
examples/federation-axum/activities/mod.rs
Normal file
3
examples/federation-axum/activities/mod.rs
Normal file
|
@ -0,0 +1,3 @@
|
|||
pub mod accept;
|
||||
pub mod create_note;
|
||||
pub mod follow;
|
40
examples/federation-axum/error.rs
Normal file
40
examples/federation-axum/error.rs
Normal file
|
@ -0,0 +1,40 @@
|
|||
/// Necessary because of this issue: https://github.com/actix/actix-web/issues/1711
|
||||
#[derive(Debug)]
|
||||
pub struct Error(anyhow::Error);
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
impl Display for Error {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
std::fmt::Display::fmt(&self.0, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<T> for Error
|
||||
where
|
||||
T: Into<anyhow::Error>,
|
||||
{
|
||||
fn from(t: T) -> Self {
|
||||
Error(t.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "actix")]
|
||||
mod actix {
|
||||
use crate::error::Error;
|
||||
use actix_web::ResponseError;
|
||||
|
||||
impl ResponseError for Error {}
|
||||
}
|
||||
|
||||
#[cfg(feature = "axum")]
|
||||
mod axum {
|
||||
use super::Error;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use http::StatusCode;
|
||||
|
||||
impl IntoResponse for Error {
|
||||
fn into_response(self) -> Response {
|
||||
(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", self.0)).into_response()
|
||||
}
|
||||
}
|
||||
}
|
163
examples/federation-axum/instance.rs
Normal file
163
examples/federation-axum/instance.rs
Normal file
|
@ -0,0 +1,163 @@
|
|||
use crate::{
|
||||
error::Error,
|
||||
generate_object_id,
|
||||
objects::{
|
||||
note::MyPost,
|
||||
person::{MyUser, PersonAcceptedActivities},
|
||||
},
|
||||
};
|
||||
|
||||
use activitypub_federation::{
|
||||
core::{inbox::receive_activity, object_id::ObjectId, signatures::generate_actor_keypair},
|
||||
data::Data,
|
||||
deser::context::WithContext,
|
||||
traits::ApubObject,
|
||||
InstanceSettings,
|
||||
LocalInstance,
|
||||
UrlVerifier,
|
||||
APUB_JSON_CONTENT_TYPE,
|
||||
};
|
||||
|
||||
use activitypub_federation::core::axum::{verify_request_payload, DigestVerified};
|
||||
use async_trait::async_trait;
|
||||
use axum::{
|
||||
body,
|
||||
body::Body,
|
||||
extract::State,
|
||||
middleware,
|
||||
response::IntoResponse,
|
||||
routing::get,
|
||||
Extension,
|
||||
Json,
|
||||
Router,
|
||||
};
|
||||
use http::{header::CONTENT_TYPE, Request, Response};
|
||||
use reqwest::Client;
|
||||
use std::{
|
||||
net::SocketAddr,
|
||||
str::FromStr,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use tokio::task;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::ServiceBuilderExt;
|
||||
use url::Url;
|
||||
|
||||
pub type InstanceHandle = Arc<Instance>;
|
||||
|
||||
pub struct Instance {
|
||||
/// This holds all library data
|
||||
local_instance: LocalInstance,
|
||||
/// Our "database" which contains all known users (local and federated)
|
||||
pub users: Mutex<Vec<MyUser>>,
|
||||
/// Same, but for posts
|
||||
pub posts: Mutex<Vec<MyPost>>,
|
||||
}
|
||||
|
||||
/// Use this to store your federation blocklist, or a database connection needed to retrieve it.
|
||||
#[derive(Clone)]
|
||||
struct MyUrlVerifier();
|
||||
|
||||
#[async_trait]
|
||||
impl UrlVerifier for MyUrlVerifier {
|
||||
async fn verify(&self, url: &Url) -> Result<(), &'static str> {
|
||||
if url.domain() == Some("malicious.com") {
|
||||
Err("malicious domain")
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
pub fn new(hostname: String) -> Result<InstanceHandle, Error> {
|
||||
let settings = InstanceSettings::builder()
|
||||
.debug(true)
|
||||
.url_verifier(Box::new(MyUrlVerifier()))
|
||||
.build()?;
|
||||
let local_instance =
|
||||
LocalInstance::new(hostname.clone(), Client::default().into(), settings);
|
||||
let local_user = MyUser::new(generate_object_id(&hostname)?, generate_actor_keypair()?);
|
||||
let instance = Arc::new(Instance {
|
||||
local_instance,
|
||||
users: Mutex::new(vec![local_user]),
|
||||
posts: Mutex::new(vec![]),
|
||||
});
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
pub fn local_user(&self) -> MyUser {
|
||||
self.users.lock().unwrap().first().cloned().unwrap()
|
||||
}
|
||||
|
||||
pub fn local_instance(&self) -> &LocalInstance {
|
||||
&self.local_instance
|
||||
}
|
||||
|
||||
pub fn listen(instance: &InstanceHandle) -> Result<(), Error> {
|
||||
let hostname = instance.local_instance.hostname();
|
||||
let instance = instance.clone();
|
||||
let app = Router::new()
|
||||
.route("/objects/:user_name", get(http_get_user))
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
.map_request_body(body::boxed)
|
||||
.layer(middleware::from_fn(verify_request_payload)),
|
||||
)
|
||||
.with_state(instance);
|
||||
|
||||
// run it
|
||||
let addr = SocketAddr::from_str(hostname)?;
|
||||
let server = axum::Server::bind(&addr).serve(app.into_make_service());
|
||||
|
||||
task::spawn(server);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// FIXME
|
||||
use axum_macros::debug_handler;
|
||||
#[debug_handler(body = Body)]
|
||||
/// Handles requests to fetch user json over HTTP
|
||||
async fn http_get_user(
|
||||
State(data): State<InstanceHandle>,
|
||||
request: Request<Body>,
|
||||
) -> impl IntoResponse {
|
||||
let hostname: String = data.local_instance.hostname().to_string();
|
||||
let request_url = format!("http://{}{}", hostname, &request.uri());
|
||||
|
||||
let url = Url::parse(&request_url).expect("Failed to parse url");
|
||||
|
||||
let user = ObjectId::<MyUser>::new(url)
|
||||
.dereference_local(&data)
|
||||
.await
|
||||
.expect("Failed to dereference user")
|
||||
.into_apub(&data)
|
||||
.await
|
||||
.expect("Failed to convert to apub user");
|
||||
|
||||
let json =
|
||||
serde_json::to_string(&WithContext::new_default(user)).expect("failed to parse json");
|
||||
|
||||
Response::builder()
|
||||
.header(CONTENT_TYPE, APUB_JSON_CONTENT_TYPE)
|
||||
.body(json)
|
||||
.expect("failed to build response")
|
||||
}
|
||||
|
||||
/// Handles messages received in user inbox
|
||||
async fn http_post_user_inbox(
|
||||
request: Request<Body>,
|
||||
activity: Json<WithContext<PersonAcceptedActivities>>,
|
||||
Extension(data): Extension<InstanceHandle>,
|
||||
Extension(digest_verified): Extension<DigestVerified>,
|
||||
) -> impl IntoResponse {
|
||||
receive_activity::<WithContext<PersonAcceptedActivities>, MyUser, InstanceHandle>(
|
||||
digest_verified,
|
||||
request,
|
||||
activity,
|
||||
&data.clone().local_instance,
|
||||
&Data::new(data),
|
||||
)
|
||||
.await
|
||||
}
|
42
examples/federation-axum/main.rs
Normal file
42
examples/federation-axum/main.rs
Normal file
|
@ -0,0 +1,42 @@
|
|||
use crate::{error::Error, instance::Instance, objects::note::MyPost, utils::generate_object_id};
|
||||
use tracing::log::LevelFilter;
|
||||
|
||||
mod activities;
|
||||
mod error;
|
||||
mod instance;
|
||||
mod objects;
|
||||
mod utils;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
env_logger::builder()
|
||||
.filter_level(LevelFilter::Debug)
|
||||
.init();
|
||||
|
||||
let alpha = Instance::new("localhost:8001".to_string())?;
|
||||
let beta = Instance::new("localhost:8002".to_string())?;
|
||||
Instance::listen(&alpha)?;
|
||||
Instance::listen(&beta)?;
|
||||
|
||||
// alpha user follows beta user
|
||||
alpha
|
||||
.local_user()
|
||||
.follow(&beta.local_user(), &alpha)
|
||||
.await?;
|
||||
// assert that follow worked correctly
|
||||
assert_eq!(
|
||||
beta.local_user().followers(),
|
||||
&vec![alpha.local_user().ap_id.inner().clone()]
|
||||
);
|
||||
|
||||
// beta sends a post to its followers
|
||||
let sent_post = MyPost::new("hello world!".to_string(), beta.local_user().ap_id);
|
||||
beta.local_user().post(sent_post.clone(), &beta).await?;
|
||||
let received_post = alpha.posts.lock().unwrap().first().cloned().unwrap();
|
||||
|
||||
// assert that alpha received the post
|
||||
assert_eq!(received_post.text, sent_post.text);
|
||||
assert_eq!(received_post.ap_id.inner(), sent_post.ap_id.inner());
|
||||
assert_eq!(received_post.creator.inner(), sent_post.creator.inner());
|
||||
Ok(())
|
||||
}
|
2
examples/federation-axum/objects/mod.rs
Normal file
2
examples/federation-axum/objects/mod.rs
Normal file
|
@ -0,0 +1,2 @@
|
|||
pub mod note;
|
||||
pub mod person;
|
92
examples/federation-axum/objects/note.rs
Normal file
92
examples/federation-axum/objects/note.rs
Normal file
|
@ -0,0 +1,92 @@
|
|||
use crate::{generate_object_id, instance::InstanceHandle, objects::person::MyUser};
|
||||
use activitypub_federation::{
|
||||
core::object_id::ObjectId,
|
||||
deser::helpers::deserialize_one_or_many,
|
||||
traits::ApubObject,
|
||||
};
|
||||
use activitystreams_kinds::{object::NoteType, public};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MyPost {
|
||||
pub text: String,
|
||||
pub ap_id: ObjectId<MyPost>,
|
||||
pub creator: ObjectId<MyUser>,
|
||||
pub local: bool,
|
||||
}
|
||||
|
||||
impl MyPost {
|
||||
pub fn new(text: String, creator: ObjectId<MyUser>) -> MyPost {
|
||||
MyPost {
|
||||
text,
|
||||
ap_id: ObjectId::new(generate_object_id(creator.inner().domain().unwrap()).unwrap()),
|
||||
creator,
|
||||
local: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Note {
|
||||
#[serde(rename = "type")]
|
||||
kind: NoteType,
|
||||
id: ObjectId<MyPost>,
|
||||
pub(crate) attributed_to: ObjectId<MyUser>,
|
||||
#[serde(deserialize_with = "deserialize_one_or_many")]
|
||||
pub(crate) to: Vec<Url>,
|
||||
content: String,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
impl ApubObject for MyPost {
|
||||
type DataType = InstanceHandle;
|
||||
type ApubType = Note;
|
||||
type DbType = ();
|
||||
type Error = crate::error::Error;
|
||||
|
||||
async fn read_from_apub_id(
|
||||
_object_id: Url,
|
||||
_data: &Self::DataType,
|
||||
) -> Result<Option<Self>, Self::Error> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn into_apub(self, data: &Self::DataType) -> Result<Self::ApubType, Self::Error> {
|
||||
let creator = self.creator.dereference_local(data).await?;
|
||||
Ok(Note {
|
||||
kind: Default::default(),
|
||||
id: self.ap_id,
|
||||
attributed_to: self.creator,
|
||||
to: vec![public(), creator.followers_url()?],
|
||||
content: self.text,
|
||||
})
|
||||
}
|
||||
|
||||
async fn verify(
|
||||
_apub: &Self::ApubType,
|
||||
_expected_domain: &Url,
|
||||
_data: &Self::DataType,
|
||||
_request_counter: &mut i32,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn from_apub(
|
||||
apub: Self::ApubType,
|
||||
data: &Self::DataType,
|
||||
_request_counter: &mut i32,
|
||||
) -> Result<Self, Self::Error> {
|
||||
let post = MyPost {
|
||||
text: apub.content,
|
||||
ap_id: apub.id,
|
||||
creator: apub.attributed_to,
|
||||
local: false,
|
||||
};
|
||||
|
||||
let mut lock = data.posts.lock().unwrap();
|
||||
lock.push(post.clone());
|
||||
Ok(post)
|
||||
}
|
||||
}
|
195
examples/federation-axum/objects/person.rs
Normal file
195
examples/federation-axum/objects/person.rs
Normal file
|
@ -0,0 +1,195 @@
|
|||
use crate::{
|
||||
activities::{accept::Accept, create_note::CreateNote, follow::Follow},
|
||||
error::Error,
|
||||
instance::InstanceHandle,
|
||||
objects::note::MyPost,
|
||||
utils::generate_object_id,
|
||||
};
|
||||
use activitypub_federation::{
|
||||
core::{
|
||||
activity_queue::send_activity,
|
||||
object_id::ObjectId,
|
||||
signatures::{Keypair, PublicKey},
|
||||
},
|
||||
data::Data,
|
||||
deser::context::WithContext,
|
||||
traits::{ActivityHandler, Actor, ApubObject},
|
||||
LocalInstance,
|
||||
};
|
||||
use activitystreams_kinds::actor::PersonType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MyUser {
|
||||
pub ap_id: ObjectId<MyUser>,
|
||||
pub inbox: Url,
|
||||
// exists for all users (necessary to verify http signatures)
|
||||
public_key: String,
|
||||
// exists only for local users
|
||||
private_key: Option<String>,
|
||||
pub followers: Vec<Url>,
|
||||
pub local: bool,
|
||||
}
|
||||
|
||||
/// List of all activities which this actor can receive.
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
#[serde(untagged)]
|
||||
#[enum_delegate::implement(ActivityHandler)]
|
||||
pub enum PersonAcceptedActivities {
|
||||
Follow(Follow),
|
||||
Accept(Accept),
|
||||
CreateNote(CreateNote),
|
||||
}
|
||||
|
||||
impl MyUser {
|
||||
pub fn new(ap_id: Url, keypair: Keypair) -> MyUser {
|
||||
let mut inbox = ap_id.clone();
|
||||
inbox.set_path("/inbox");
|
||||
let ap_id = ObjectId::new(ap_id);
|
||||
MyUser {
|
||||
ap_id,
|
||||
inbox,
|
||||
public_key: keypair.public_key,
|
||||
private_key: Some(keypair.private_key),
|
||||
followers: vec![],
|
||||
local: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Person {
|
||||
#[serde(rename = "type")]
|
||||
kind: PersonType,
|
||||
id: ObjectId<MyUser>,
|
||||
inbox: Url,
|
||||
public_key: PublicKey,
|
||||
}
|
||||
|
||||
impl MyUser {
|
||||
pub fn followers(&self) -> &Vec<Url> {
|
||||
&self.followers
|
||||
}
|
||||
|
||||
pub fn followers_url(&self) -> Result<Url, Error> {
|
||||
Ok(Url::parse(&format!("{}/followers", self.ap_id.inner()))?)
|
||||
}
|
||||
|
||||
fn public_key(&self) -> PublicKey {
|
||||
PublicKey::new_main_key(self.ap_id.clone().into_inner(), self.public_key.clone())
|
||||
}
|
||||
|
||||
pub async fn follow(&self, other: &MyUser, instance: &InstanceHandle) -> Result<(), Error> {
|
||||
let id = generate_object_id(instance.local_instance().hostname())?;
|
||||
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone());
|
||||
self.send(
|
||||
follow,
|
||||
vec![other.shared_inbox_or_inbox()],
|
||||
instance.local_instance(),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn post(&self, post: MyPost, instance: &InstanceHandle) -> Result<(), Error> {
|
||||
let id = generate_object_id(instance.local_instance().hostname())?;
|
||||
let create = CreateNote::new(post.into_apub(instance).await?, id.clone());
|
||||
let mut inboxes = vec![];
|
||||
for f in self.followers.clone() {
|
||||
let user: MyUser = ObjectId::new(f)
|
||||
.dereference(instance, instance.local_instance(), &mut 0)
|
||||
.await?;
|
||||
inboxes.push(user.shared_inbox_or_inbox());
|
||||
}
|
||||
self.send(create, inboxes, instance.local_instance())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn send<Activity>(
|
||||
&self,
|
||||
activity: Activity,
|
||||
recipients: Vec<Url>,
|
||||
local_instance: &LocalInstance,
|
||||
) -> Result<(), <Activity as ActivityHandler>::Error>
|
||||
where
|
||||
Activity: ActivityHandler + Serialize,
|
||||
<Activity as ActivityHandler>::Error: From<anyhow::Error> + From<serde_json::Error>,
|
||||
{
|
||||
let activity = WithContext::new_default(activity);
|
||||
send_activity(
|
||||
activity,
|
||||
self.public_key(),
|
||||
self.private_key.clone().expect("has private key"),
|
||||
recipients,
|
||||
local_instance,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
impl ApubObject for MyUser {
|
||||
type DataType = InstanceHandle;
|
||||
type ApubType = Person;
|
||||
type DbType = MyUser;
|
||||
type Error = crate::error::Error;
|
||||
|
||||
async fn read_from_apub_id(
|
||||
object_id: Url,
|
||||
data: &Self::DataType,
|
||||
) -> Result<Option<Self>, Self::Error> {
|
||||
let users = data.users.lock().unwrap();
|
||||
let res = users
|
||||
.clone()
|
||||
.into_iter()
|
||||
.find(|u| u.ap_id.inner() == &object_id);
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn into_apub(self, _data: &Self::DataType) -> Result<Self::ApubType, Self::Error> {
|
||||
Ok(Person {
|
||||
kind: Default::default(),
|
||||
id: self.ap_id.clone(),
|
||||
inbox: self.inbox.clone(),
|
||||
public_key: self.public_key(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn verify(
|
||||
_apub: &Self::ApubType,
|
||||
_expected_domain: &Url,
|
||||
_data: &Self::DataType,
|
||||
_request_counter: &mut i32,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn from_apub(
|
||||
apub: Self::ApubType,
|
||||
_data: &Self::DataType,
|
||||
_request_counter: &mut i32,
|
||||
) -> Result<Self, Self::Error> {
|
||||
Ok(MyUser {
|
||||
ap_id: apub.id,
|
||||
inbox: apub.inbox,
|
||||
public_key: apub.public_key.public_key_pem,
|
||||
private_key: None,
|
||||
followers: vec![],
|
||||
local: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for MyUser {
|
||||
fn public_key(&self) -> &str {
|
||||
&self.public_key
|
||||
}
|
||||
|
||||
fn inbox(&self) -> Url {
|
||||
self.inbox.clone()
|
||||
}
|
||||
}
|
13
examples/federation-axum/utils.rs
Normal file
13
examples/federation-axum/utils.rs
Normal file
|
@ -0,0 +1,13 @@
|
|||
use rand::{distributions::Alphanumeric, thread_rng, Rng};
|
||||
use url::{ParseError, Url};
|
||||
|
||||
/// Just generate random url as object id. In a real project, you probably want to use
|
||||
/// an url which contains the database id for easy retrieval (or store the random id in db).
|
||||
pub fn generate_object_id(hostname: &str) -> Result<Url, ParseError> {
|
||||
let id: String = thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(7)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
Url::parse(&format!("http://{}/objects/{}", hostname, id))
|
||||
}
|
45
src/core/axum/digest.rs
Normal file
45
src/core/axum/digest.rs
Normal file
|
@ -0,0 +1,45 @@
|
|||
use axum::http::HeaderValue;
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DigestPart {
|
||||
pub algorithm: String,
|
||||
pub digest: String,
|
||||
}
|
||||
|
||||
impl DigestPart {
|
||||
pub fn try_from_header(h: &HeaderValue) -> Option<Vec<DigestPart>> {
|
||||
let h = h.to_str().ok()?.split(';').next()?;
|
||||
let v: Vec<_> = h
|
||||
.split(',')
|
||||
.filter_map(|p| {
|
||||
let mut iter = p.splitn(2, '=');
|
||||
iter.next()
|
||||
.and_then(|alg| iter.next().map(|value| (alg, value)))
|
||||
})
|
||||
.map(|(alg, value)| DigestPart {
|
||||
algorithm: alg.to_owned(),
|
||||
digest: value.to_owned(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
if v.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn verify_sha256(digests: &[DigestPart], payload: &[u8]) -> bool {
|
||||
let mut hasher = Sha256::new();
|
||||
|
||||
for part in digests {
|
||||
hasher.update(payload);
|
||||
if base64::encode(hasher.finalize_reset()) != part.digest {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
69
src/core/axum/mod.rs
Normal file
69
src/core/axum/mod.rs
Normal file
|
@ -0,0 +1,69 @@
|
|||
use axum::{
|
||||
async_trait,
|
||||
body::{self, BoxBody, Bytes, Full},
|
||||
extract::FromRequest,
|
||||
http::{Request, StatusCode},
|
||||
middleware::Next,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use digest::{verify_sha256, DigestPart};
|
||||
|
||||
mod digest;
|
||||
|
||||
/// A request guard to ensure digest has been verified request has been
|
||||
/// see [`receive_activity`]
|
||||
pub struct DigestVerified;
|
||||
|
||||
pub async fn verify_request_payload(
|
||||
request: Request<BoxBody>,
|
||||
next: Next<BoxBody>,
|
||||
) -> Result<impl IntoResponse, Response> {
|
||||
let mut request = verify_payload(request).await?;
|
||||
request.extensions_mut().insert(DigestVerified);
|
||||
Ok(next.run(request).await)
|
||||
}
|
||||
|
||||
async fn verify_payload(request: Request<BoxBody>) -> Result<Request<BoxBody>, Response> {
|
||||
let (parts, body) = request.into_parts();
|
||||
|
||||
// this wont work if the body is an long running stream
|
||||
let bytes = hyper::body::to_bytes(body)
|
||||
.await
|
||||
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;
|
||||
|
||||
let Some(digest) = parts.headers.get("Digest") else {
|
||||
return Err((StatusCode::UNAUTHORIZED, "Missing digest header".to_string()).into_response());
|
||||
};
|
||||
|
||||
let Some(digests) = DigestPart::try_from_header(&digest) else {
|
||||
return Err((StatusCode::UNAUTHORIZED, "Malformed digest header".to_string()).into_response());
|
||||
};
|
||||
|
||||
if !verify_sha256(&digests, bytes.as_ref()) {
|
||||
Err((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"Digest does not match payload".to_string(),
|
||||
)
|
||||
.into_response())
|
||||
} else {
|
||||
Ok(Request::from_parts(parts, body::boxed(Full::from(bytes))))
|
||||
}
|
||||
}
|
||||
|
||||
struct BufferRequestBody(Bytes);
|
||||
|
||||
#[async_trait]
|
||||
impl<S> FromRequest<S, BoxBody> for BufferRequestBody
|
||||
where
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = Response;
|
||||
|
||||
async fn from_request(req: Request<BoxBody>, state: &S) -> Result<Self, Self::Rejection> {
|
||||
let body = Bytes::from_request(req, state)
|
||||
.await
|
||||
.map_err(|err| err.into_response())?;
|
||||
|
||||
Ok(Self(body))
|
||||
}
|
||||
}
|
|
@ -67,23 +67,25 @@ mod actix_imp {
|
|||
#[cfg(feature = "axum")]
|
||||
mod axum_imp {
|
||||
use crate::{
|
||||
core::{object_id::ObjectId, signatures::axum::verify_signature},
|
||||
core::{axum::DigestVerified, object_id::ObjectId, signatures::axum::verify_signature},
|
||||
data::Data,
|
||||
traits::{ActivityHandler, Actor, ApubObject},
|
||||
utils::{verify_domains_match, verify_url_valid},
|
||||
Error,
|
||||
LocalInstance,
|
||||
};
|
||||
use axum::{http::Request, Extension, Json};
|
||||
use axum::{http::Request, Json};
|
||||
use hyper::Body;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tracing::debug;
|
||||
|
||||
/// Receive an activity and perform some basic checks, including HTTP signature verification.
|
||||
pub async fn receive_activity<Activity, ActorT, Datatype>(
|
||||
request: Request<Activity>,
|
||||
_digest_verified: DigestVerified,
|
||||
request: Request<Body>,
|
||||
Json(activity): Json<Activity>,
|
||||
Extension(local_instance): Extension<&LocalInstance>,
|
||||
Extension(data): Extension<&Data<Datatype>>,
|
||||
local_instance: &LocalInstance,
|
||||
data: &Data<Datatype>,
|
||||
) -> Result<(), <Activity as ActivityHandler>::Error>
|
||||
where
|
||||
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
|
||||
|
@ -93,13 +95,9 @@ mod axum_imp {
|
|||
+ From<Error>
|
||||
+ From<<ActorT as ApubObject>::Error>
|
||||
+ From<serde_json::Error>,
|
||||
// FIXME
|
||||
// + From<http_signature_normalization_actix::digest::middleware::VerifyError>,
|
||||
<ActorT as ApubObject>::Error: From<Error> + From<anyhow::Error>,
|
||||
{
|
||||
// ensure that payload hash was checked against digest header by middleware
|
||||
/// FIXME
|
||||
// DigestVerified::from_request(&request, &mut Payload::None).await?;
|
||||
verify_domains_match(activity.id(), activity.actor())?;
|
||||
verify_url_valid(activity.id(), &local_instance.settings).await?;
|
||||
if local_instance.is_local_url(activity.id()) {
|
||||
|
@ -112,6 +110,7 @@ mod axum_imp {
|
|||
let actor = ObjectId::<ActorT>::new(activity.actor().clone())
|
||||
.dereference(data, local_instance, request_counter)
|
||||
.await?;
|
||||
|
||||
verify_signature(&request, actor.public_key())?;
|
||||
|
||||
debug!("Verifying activity {}", activity.id().to_string());
|
||||
|
|
|
@ -2,3 +2,6 @@ pub mod activity_queue;
|
|||
pub mod inbox;
|
||||
pub mod object_id;
|
||||
pub mod signatures;
|
||||
|
||||
#[cfg(feature = "axum")]
|
||||
pub mod axum;
|
||||
|
|
|
@ -114,7 +114,7 @@ pub mod actix {
|
|||
request.uri().path_and_query(),
|
||||
request.headers().clone(),
|
||||
)?
|
||||
.verify(|signature, signing_string| -> Result<bool, anyhow::Error> {
|
||||
.verify(|signature, signing_string| -> anyhow::Result<bool> {
|
||||
debug!(
|
||||
"Verifying with key {}, message {}",
|
||||
&public_key, &signing_string
|
||||
|
@ -136,13 +136,44 @@ pub mod actix {
|
|||
|
||||
#[cfg(feature = "axum")]
|
||||
pub mod axum {
|
||||
use anyhow::anyhow;
|
||||
use axum::http::Request;
|
||||
use http_signature_normalization::Config;
|
||||
use openssl::{hash::MessageDigest, pkey::PKey, sign::Verifier};
|
||||
use std::collections::BTreeMap;
|
||||
use tracing::debug;
|
||||
|
||||
/// Verifies the HTTP signature on an incoming inbox request.
|
||||
pub fn verify_signature<B>(
|
||||
request: &Request<B>,
|
||||
public_key: &str,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
todo!("Tower Http does not have this feature yet")
|
||||
let config = Config::default();
|
||||
let mut header_map = BTreeMap::new();
|
||||
for (name, value) in request.headers() {
|
||||
if let Ok(value) = value.to_str() {
|
||||
header_map.insert(name.to_string(), value.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
let verified = config
|
||||
.begin_verify("GET", "/foo?bar=baz", header_map)?
|
||||
.verify(|signature, signing_string| -> anyhow::Result<bool> {
|
||||
debug!(
|
||||
"Verifying with key {}, message {}",
|
||||
&public_key, &signing_string
|
||||
);
|
||||
let public_key = PKey::public_key_from_pem(public_key.as_bytes())?;
|
||||
let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?;
|
||||
verifier.update(signing_string.as_bytes())?;
|
||||
Ok(verifier.verify(&base64::decode(signature)?)?)
|
||||
})?;
|
||||
|
||||
if verified {
|
||||
debug!("verified signature for {}", &request.uri());
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("Invalid signature on request: {}", &request.uri()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue