mirror of
https://github.com/astro/buzzrelay.git
synced 2024-11-22 12:11:00 +00:00
in progress
This commit is contained in:
commit
1ea9cac671
7 changed files with 2105 additions and 0 deletions
1738
Cargo.lock
generated
Normal file
1738
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
21
Cargo.toml
Normal file
21
Cargo.toml
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
[package]
|
||||||
|
name = "buzzrelay"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
axum = "0.6"
|
||||||
|
axum-macros = "0.3"
|
||||||
|
axum-extra = { version = "0.4", features = ["spa"] }
|
||||||
|
askama = "0.11"
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
tracing = "*"
|
||||||
|
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
|
serde = "1"
|
||||||
|
serde_json = "1"
|
||||||
|
reqwest = { version = "0.11", features = ["json"] }
|
||||||
|
sigh = { path = "../rust-sigh" }
|
||||||
|
http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] }
|
||||||
|
thiserror = "1"
|
||||||
|
http = "0.2"
|
||||||
|
chrono = "0.4"
|
32
src/activitypub.rs
Normal file
32
src/activitypub.rs
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
use serde::{Deserialize, Serialize, de::DeserializeOwned};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Actor {
|
||||||
|
#[serde(rename = "@context")]
|
||||||
|
pub jsonld_context: serde_json::Value,
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub actor_type: String,
|
||||||
|
pub id: String,
|
||||||
|
pub inbox: String,
|
||||||
|
pub outbox: String,
|
||||||
|
#[serde(rename = "publicKey")]
|
||||||
|
pub public_key: ActorPublicKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct ActorPublicKey {
|
||||||
|
pub id: String,
|
||||||
|
pub owner: Option<String>,
|
||||||
|
#[serde(rename = "publicKeyPem")]
|
||||||
|
pub pem: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// ActivityPub "activity"
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Action<O> {
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub action_type: String,
|
||||||
|
pub actor: String,
|
||||||
|
pub to: Option<String>,
|
||||||
|
pub object: Option<O>,
|
||||||
|
}
|
124
src/endpoint.rs
Normal file
124
src/endpoint.rs
Normal file
|
@ -0,0 +1,124 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{
|
||||||
|
async_trait,
|
||||||
|
body::{Bytes, HttpBody},
|
||||||
|
extract::{FromRef, FromRequest},
|
||||||
|
http::{header::CONTENT_TYPE, Request, StatusCode},
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
routing::post,
|
||||||
|
Form, RequestExt, Router, BoxError,
|
||||||
|
};
|
||||||
|
|
||||||
|
use http_digest_headers::{DigestHeader, DigestMethod, Error as DigestError};
|
||||||
|
use serde::{Deserialize, Serialize, de::DeserializeOwned};
|
||||||
|
use sigh::{Signature, PublicKey, Key};
|
||||||
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
|
||||||
|
use crate::fetch::fetch;
|
||||||
|
use crate::activitypub::Actor;
|
||||||
|
|
||||||
|
const SIGNATURE_HEADERS_REQUIRED: &[&str] = &[
|
||||||
|
"(request-target)",
|
||||||
|
"host", "date",
|
||||||
|
"digest", "content-type",
|
||||||
|
];
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Endpoint {
|
||||||
|
pub payload: serde_json::Value,
|
||||||
|
pub actor: Actor,
|
||||||
|
}
|
||||||
|
|
||||||
|
// impl Endpoint {
|
||||||
|
// pub fn parse<T: DeserializeOwned>(self) -> Result<T, serde_json::Error> {
|
||||||
|
// serde_json::from_value(self.payload)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<S, B> FromRequest<S, B> for Endpoint
|
||||||
|
where
|
||||||
|
B: HttpBody + Send + 'static,
|
||||||
|
B::Data: Send,
|
||||||
|
B::Error: Into<BoxError>,
|
||||||
|
S: Send + Sync,
|
||||||
|
Arc<reqwest::Client>: FromRef<S>,
|
||||||
|
{
|
||||||
|
type Rejection = (StatusCode, String);
|
||||||
|
|
||||||
|
async fn from_request(req: Request<B>, state: &S) -> Result<Self, Self::Rejection> {
|
||||||
|
// validate content-type
|
||||||
|
let content_type = if let Some(content_type) = req.headers()
|
||||||
|
.get(CONTENT_TYPE)
|
||||||
|
.and_then(|value| value.to_str().ok()) {
|
||||||
|
content_type
|
||||||
|
} else {
|
||||||
|
return Err((StatusCode::UNSUPPORTED_MEDIA_TYPE, "No content-type".to_string()));
|
||||||
|
};
|
||||||
|
if ! content_type.starts_with("application/json") &&
|
||||||
|
! (content_type.starts_with("application/") && content_type.ends_with("+json"))
|
||||||
|
{
|
||||||
|
return Err((StatusCode::UNSUPPORTED_MEDIA_TYPE, "Invalid content-type".to_string()));
|
||||||
|
}
|
||||||
|
// get signature before consuming req
|
||||||
|
let signature = Signature::from(&req);
|
||||||
|
// check signature fields
|
||||||
|
let signature_headers = signature.headers()
|
||||||
|
.ok_or((StatusCode::BAD_REQUEST, "No signed headers".to_string()))?;
|
||||||
|
for header in SIGNATURE_HEADERS_REQUIRED {
|
||||||
|
if signature_headers.iter().find(|h| *h == header) == None {
|
||||||
|
return Err((StatusCode::BAD_REQUEST, format!("Header {:?} not signed", header)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse digest
|
||||||
|
let mut digest_header: String = req.headers().get("digest")
|
||||||
|
.ok_or((StatusCode::BAD_REQUEST, "Missing Digest: header".to_string()))?
|
||||||
|
.to_str()
|
||||||
|
.map_err(|_| (StatusCode::BAD_REQUEST, "Digest: header contained invalid characters".to_string()))?
|
||||||
|
.to_string();
|
||||||
|
// fixup digest header
|
||||||
|
if digest_header.starts_with("SHA-") {
|
||||||
|
digest_header.replace_range(..4, "sha-");
|
||||||
|
}
|
||||||
|
// mastodon uses base64::alphabet::STANDARD, not base64::alphabet::URL_SAFE
|
||||||
|
digest_header = digest_header.replace("+", "-")
|
||||||
|
.replace("/", "_");
|
||||||
|
dbg!(&digest_header);
|
||||||
|
let digest: DigestHeader = digest_header.parse()
|
||||||
|
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Cannot parse Digest: header: {}", e)))?;
|
||||||
|
// read body
|
||||||
|
let bytes = Bytes::from_request(req, state).await
|
||||||
|
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Body: {}", e)))?;
|
||||||
|
// validate digest
|
||||||
|
if ! digest.verify(&bytes).unwrap_or(false) {
|
||||||
|
return Err((StatusCode::BAD_REQUEST, "Digest didn't match".to_string()));
|
||||||
|
}
|
||||||
|
// parse body
|
||||||
|
let payload: serde_json::Value = serde_json::from_slice(&bytes)
|
||||||
|
.map_err(|_| (StatusCode::BAD_REQUEST, format!("Error parsing JSON")))?;
|
||||||
|
let actor_uri = if let Some(serde_json::Value::String(actor_uri)) = payload.get("actor") {
|
||||||
|
actor_uri
|
||||||
|
} else {
|
||||||
|
return Err((StatusCode::BAD_REQUEST, "Actor missing".to_string()));
|
||||||
|
};
|
||||||
|
|
||||||
|
// validate actor
|
||||||
|
let client = Arc::from_ref(&state);
|
||||||
|
let actor: Actor =
|
||||||
|
serde_json::from_value(
|
||||||
|
fetch(&client, &actor_uri).await
|
||||||
|
.map_err(|e| (StatusCode::BAD_GATEWAY, format!("{}", e)))?
|
||||||
|
).map_err(|e| (StatusCode::BAD_GATEWAY, format!("Invalid actor: {}", e)))?;
|
||||||
|
let public_key = PublicKey::from_pem(actor.public_key.pem.as_bytes())
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
|
||||||
|
if signature.verify(&public_key)
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))? != true
|
||||||
|
{
|
||||||
|
return Err((StatusCode::BAD_REQUEST, "Signature verification failed".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(Endpoint { actor, payload });
|
||||||
|
}
|
||||||
|
}
|
13
src/fetch.rs
Normal file
13
src/fetch.rs
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
|
||||||
|
pub async fn fetch<T>(client: &reqwest::Client, url: &str) -> Result<T, reqwest::Error>
|
||||||
|
where
|
||||||
|
T: DeserializeOwned,
|
||||||
|
{
|
||||||
|
client.get(url)
|
||||||
|
.header("accept", "application/activity+json")
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
}
|
118
src/main.rs
Normal file
118
src/main.rs
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
use axum::{
|
||||||
|
async_trait,
|
||||||
|
extract::{FromRequest, FromRef},
|
||||||
|
http::{header::CONTENT_TYPE, Request, StatusCode},
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
routing::{get, post},
|
||||||
|
Form, Json, RequestExt, Router,
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use sigh::{PrivateKey, PublicKey, alg::{RsaSha256, Algorithm}, Key};
|
||||||
|
use std::{net::SocketAddr, sync::Arc};
|
||||||
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
|
||||||
|
mod fetch;
|
||||||
|
pub use fetch::fetch;
|
||||||
|
mod send;
|
||||||
|
pub use send::send;
|
||||||
|
mod activitypub;
|
||||||
|
mod endpoint;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct State {
|
||||||
|
client: Arc<reqwest::Client>,
|
||||||
|
private_key: PrivateKey,
|
||||||
|
public_key: PublicKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl FromRef<State> for Arc<reqwest::Client> {
|
||||||
|
fn from_ref(state: &State) -> Arc<reqwest::Client> {
|
||||||
|
state.client.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn actor(axum::extract::State(state): axum::extract::State<State>) -> impl IntoResponse {
|
||||||
|
let id = "https://relay.fedi.buzz/".to_string();
|
||||||
|
Json(activitypub::Actor {
|
||||||
|
jsonld_context: serde_json::Value::String(
|
||||||
|
"https://www.w3.org/ns/activitystreams".to_string()
|
||||||
|
),
|
||||||
|
actor_type: "Application".to_string(),
|
||||||
|
id: id.clone(),
|
||||||
|
inbox: id.clone(),
|
||||||
|
outbox: id.clone(),
|
||||||
|
public_key: activitypub::ActorPublicKey {
|
||||||
|
id: id.clone(),
|
||||||
|
owner: Some(id),
|
||||||
|
pem: state.public_key.to_pem().unwrap(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handler(
|
||||||
|
axum::extract::State(state): axum::extract::State<State>,
|
||||||
|
endpoint: endpoint::Endpoint,
|
||||||
|
) -> Response {
|
||||||
|
let action = match serde_json::from_value::<activitypub::Action<serde_json::Value>>(endpoint.payload.clone()) {
|
||||||
|
Ok(action) => action,
|
||||||
|
Err(e) => return (
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
format!("Bad action: {:?}", e)
|
||||||
|
).into_response(),
|
||||||
|
};
|
||||||
|
dbg!(&action);
|
||||||
|
|
||||||
|
if action.action_type == "Follow" {
|
||||||
|
let private_key = state.private_key.clone();
|
||||||
|
let client = state.client.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let accept = activitypub::Action {
|
||||||
|
action_type: "Accept".to_string(),
|
||||||
|
actor: "https://relay.fedi.buzz/".to_string(),
|
||||||
|
to: Some(endpoint.actor.id),
|
||||||
|
object: Some(endpoint.payload),
|
||||||
|
};
|
||||||
|
dbg!(serde_json::to_string_pretty(&accept));
|
||||||
|
send::send(
|
||||||
|
client.as_ref(), &endpoint.actor.inbox,
|
||||||
|
"https://relay.fedi.buzz/",
|
||||||
|
&private_key,
|
||||||
|
accept,
|
||||||
|
).await
|
||||||
|
.map_err(|e| tracing::error!("post: {}", e));
|
||||||
|
});
|
||||||
|
|
||||||
|
StatusCode::OK.into_response()
|
||||||
|
} else {
|
||||||
|
(StatusCode::BAD_REQUEST, "Not a recognized request").into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(
|
||||||
|
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
|
||||||
|
"buzzrelay=trace,tower_http=trace,axum=trace".into()
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.with(tracing_subscriber::fmt::layer())
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let (private_key, public_key) = RsaSha256.generate_keys().unwrap();
|
||||||
|
|
||||||
|
let app = Router::new()
|
||||||
|
.route("/", get(actor).post(handler))
|
||||||
|
.with_state(State {
|
||||||
|
client: Arc::new(reqwest::Client::new()),
|
||||||
|
private_key, public_key,
|
||||||
|
});
|
||||||
|
|
||||||
|
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
|
||||||
|
tracing::debug!("listening on {}", addr);
|
||||||
|
axum::Server::bind(&addr)
|
||||||
|
.serve(app.into_make_service())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
59
src/send.rs
Normal file
59
src/send.rs
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
use http_digest_headers::{DigestHeader, DigestMethod};
|
||||||
|
use serde::Serialize;
|
||||||
|
use sigh::{PrivateKey, SigningConfig, alg::RsaSha256};
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum SendError {
|
||||||
|
#[error("HTTP Digest generation error")]
|
||||||
|
Digest,
|
||||||
|
#[error("JSON encoding error")]
|
||||||
|
Json(#[from] serde_json::Error),
|
||||||
|
#[error("Signature error")]
|
||||||
|
Signature(#[from] sigh::Error),
|
||||||
|
#[error("HTTP request error")]
|
||||||
|
HttpReq(#[from] http::Error),
|
||||||
|
#[error("HTTP client error")]
|
||||||
|
Http(#[from] reqwest::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send<T: Serialize>(
|
||||||
|
client: &reqwest::Client,
|
||||||
|
url: &str,
|
||||||
|
key_id: &str,
|
||||||
|
private_key: &PrivateKey,
|
||||||
|
body: T,
|
||||||
|
) -> Result<(), SendError> {
|
||||||
|
let body = serde_json::to_vec(&body)
|
||||||
|
.map_err(SendError::Json)?;
|
||||||
|
let mut digest_header = DigestHeader::new()
|
||||||
|
.with_method(DigestMethod::SHA256, &body)
|
||||||
|
.map(|h| format!("{}", h))
|
||||||
|
.map_err(|_| SendError::Digest)?;
|
||||||
|
if digest_header.starts_with("sha-") {
|
||||||
|
digest_header.replace_range(..4, "SHA-");
|
||||||
|
}
|
||||||
|
// mastodon uses base64::alphabet::STANDARD, not base64::alphabet::URL_SAFE
|
||||||
|
digest_header.replace_range(
|
||||||
|
7..,
|
||||||
|
&digest_header[7..].replace("-", "+").replace("_", "/")
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut req = http::Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri(url)
|
||||||
|
.header("content-type", "application/activity+json")
|
||||||
|
.header("date", chrono::Utc::now().to_rfc2822()
|
||||||
|
.replace("+0000", "GMT"))
|
||||||
|
.header("digest", digest_header)
|
||||||
|
.body(body)
|
||||||
|
.map_err(SendError::HttpReq)?;
|
||||||
|
SigningConfig::new(RsaSha256, private_key, key_id)
|
||||||
|
.sign(&mut req)?;
|
||||||
|
dbg!(&req);
|
||||||
|
let res = client.execute(req.try_into()?)
|
||||||
|
.await?;
|
||||||
|
dbg!(&res);
|
||||||
|
dbg!(res.text().await);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
Reference in a new issue