From 50db596ce05a9d8eafd3c1208edd01148776d4ee Mon Sep 17 00:00:00 2001 From: Nutomic Date: Tue, 12 Dec 2023 11:30:21 +0100 Subject: [PATCH] Better error when activity receive fails (#89) * Minor refactoring * Better error when receive fails * clippy * add test case * comments * take ref --- src/activity_sending.rs | 40 +++++++----------------- src/actix_web/inbox.rs | 67 ++++++++++++++++++++++++++++++++--------- src/axum/inbox.rs | 13 +++----- src/error.rs | 3 ++ src/lib.rs | 41 +++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 53 deletions(-) diff --git a/src/activity_sending.rs b/src/activity_sending.rs index 911d827..849fde6 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -16,16 +16,12 @@ use futures::StreamExt; use httpdate::fmt_http_date; use itertools::Itertools; use openssl::pkey::{PKey, Private}; -use reqwest::{ - header::{HeaderMap, HeaderName, HeaderValue}, - Request, -}; -use reqwest_middleware::ClientWithMiddleware; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::Serialize; use std::{ self, fmt::{Debug, Display}, - time::{Duration, SystemTime}, + time::SystemTime, }; use tracing::debug; use url::Url; @@ -96,33 +92,19 @@ impl SendActivityTask<'_> { /// convert a sendactivitydata to a request, signing and sending it pub async fn sign_and_send(&self, data: &Data) -> Result<(), Error> { - let req = self - .sign(&data.config.client, data.config.request_timeout) - .await?; - self.send(&data.config.client, req).await - } - async fn sign( - &self, - client: &ClientWithMiddleware, - timeout: Duration, - ) -> Result { - let task = self; + let client = &data.config.client; let request_builder = client - .post(task.inbox.to_string()) - .timeout(timeout) - .headers(generate_request_headers(&task.inbox)); + .post(self.inbox.to_string()) + .timeout(data.config.request_timeout) + .headers(generate_request_headers(&self.inbox)); let request = sign_request( request_builder, - task.actor_id, - task.activity.clone(), - task.private_key.clone(), - task.http_signature_compat, + self.actor_id, + self.activity.clone(), + self.private_key.clone(), + self.http_signature_compat, ) .await?; - Ok(request) - } - - async fn send(&self, client: &ClientWithMiddleware, request: Request) -> Result<(), Error> { let response = client.execute(request).await?; match response { @@ -286,7 +268,7 @@ mod tests { let start = Instant::now(); for _ in 0..num_messages { - message.sign_and_send(&data).await?; + message.clone().sign_and_send(&data).await?; } info!("Queue Sent: {:?}", start.elapsed()); diff --git a/src/actix_web/inbox.rs b/src/actix_web/inbox.rs index e7e19e8..a2b55d4 100644 --- a/src/actix_web/inbox.rs +++ b/src/actix_web/inbox.rs @@ -3,8 +3,8 @@ use crate::{ config::Data, error::Error, - fetch::object_id::ObjectId, http_signatures::{verify_body_hash, verify_signature}, + parse_received_activity, traits::{ActivityHandler, Actor, Object}, }; use actix_web::{web::Bytes, HttpRequest, HttpResponse}; @@ -29,11 +29,7 @@ where { verify_body_hash(request.headers().get("Digest"), &body)?; - let activity: Activity = serde_json::from_slice(&body).map_err(Error::Json)?; - data.config.verify_url_and_domain(&activity).await?; - let actor = ObjectId::::from(activity.actor().clone()) - .dereference(data) - .await?; + let (activity, actor) = parse_received_activity::(&body, data).await?; verify_signature( request.headers(), @@ -54,12 +50,14 @@ mod test { use crate::{ activity_sending::generate_request_headers, config::FederationConfig, + fetch::object_id::ObjectId, http_signatures::sign_request, traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR}, }; use actix_web::test::TestRequest; use reqwest::Client; use reqwest_middleware::ClientWithMiddleware; + use serde_json::json; use url::Url; #[tokio::test] @@ -105,22 +103,49 @@ mod test { assert_eq!(&err, &Error::ActivitySignatureInvalid) } - async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig) { + #[tokio::test] + async fn test_receive_unparseable_activity() { + let (_, _, config) = setup_receive_test().await; + + let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap(); + let id = "http://localhost:123/1"; + let activity = json!({ + "actor": actor.as_str(), + "to": ["https://www.w3.org/ns/activitystreams#Public"], + "object": "http://ds9.lemmy.ml/post/1", + "cc": ["http://enterprise.lemmy.ml/c/main"], + "type": "Delete", + "id": id + } + ); + let body: Bytes = serde_json::to_vec(&activity).unwrap().into(); + let incoming_request = construct_request(&body, &actor).await; + + // intentionally cause a parse error by using wrong type for deser + let res = receive_activity::( + incoming_request.to_http_request(), + body, + &config.to_request_data(), + ) + .await; + + match res { + Err(Error::ParseReceivedActivity(url, _)) => { + assert_eq!(id, url.as_str()); + } + _ => unreachable!(), + } + } + + async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest { let inbox = "https://example.com/inbox"; let headers = generate_request_headers(&Url::parse(inbox).unwrap()); let request_builder = ClientWithMiddleware::from(Client::default()) .post(inbox) .headers(headers); - let activity = Follow { - actor: ObjectId::parse("http://localhost:123").unwrap(), - object: ObjectId::parse("http://localhost:124").unwrap(), - kind: Default::default(), - id: "http://localhost:123/1".try_into().unwrap(), - }; - let body: Bytes = serde_json::to_vec(&activity).unwrap().into(); let outgoing_request = sign_request( request_builder, - &activity.actor.into_inner(), + actor, body.clone(), DB_USER_KEYPAIR.private_key().unwrap(), false, @@ -131,6 +156,18 @@ mod test { for h in outgoing_request.headers() { incoming_request = incoming_request.append_header(h); } + incoming_request + } + + async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig) { + let activity = Follow { + actor: ObjectId::parse("http://localhost:123").unwrap(), + object: ObjectId::parse("http://localhost:124").unwrap(), + kind: Default::default(), + id: "http://localhost:123/1".try_into().unwrap(), + }; + let body: Bytes = serde_json::to_vec(&activity).unwrap().into(); + let incoming_request = construct_request(&body, activity.actor.inner()).await; let config = FederationConfig::builder() .domain("localhost:8002") diff --git a/src/axum/inbox.rs b/src/axum/inbox.rs index c110bb6..5bb147a 100644 --- a/src/axum/inbox.rs +++ b/src/axum/inbox.rs @@ -5,8 +5,8 @@ use crate::{ config::Data, error::Error, - fetch::object_id::ObjectId, - http_signatures::{verify_body_hash, verify_signature}, + http_signatures::verify_signature, + parse_received_activity, traits::{ActivityHandler, Actor, Object}, }; use axum::{ @@ -33,13 +33,8 @@ where ::Error: From, Datatype: Clone, { - verify_body_hash(activity_data.headers.get("Digest"), &activity_data.body)?; - - let activity: Activity = serde_json::from_slice(&activity_data.body).map_err(Error::Json)?; - data.config.verify_url_and_domain(&activity).await?; - let actor = ObjectId::::from(activity.actor().clone()) - .dereference(data) - .await?; + let (activity, actor) = + parse_received_activity::(&activity_data.body, data).await?; verify_signature( &activity_data.headers, diff --git a/src/error.rs b/src/error.rs index 2d179d1..89f6abf 100644 --- a/src/error.rs +++ b/src/error.rs @@ -38,6 +38,9 @@ pub enum Error { /// JSON Error #[error(transparent)] Json(#[from] serde_json::Error), + /// Failed to parse an activity received from another instance + #[error("Failed to parse incoming activity with id {0}: {1}")] + ParseReceivedActivity(Url, serde_json::Error), /// Reqwest Middleware Error #[error(transparent)] ReqwestMiddleware(#[from] reqwest_middleware::Error), diff --git a/src/lib.rs b/src/lib.rs index c660253..42da8df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,48 @@ pub mod protocol; pub(crate) mod reqwest_shim; pub mod traits; +use crate::{ + config::Data, + error::Error, + fetch::object_id::ObjectId, + traits::{ActivityHandler, Actor, Object}, +}; pub use activitystreams_kinds as kinds; +use serde::{de::DeserializeOwned, Deserialize}; +use url::Url; + /// Mime type for Activitypub data, used for `Accept` and `Content-Type` HTTP headers pub static FEDERATION_CONTENT_TYPE: &str = "application/activity+json"; + +/// Deserialize incoming inbox activity to the given type, perform basic +/// validation and extract the actor. +async fn parse_received_activity( + body: &[u8], + data: &Data, +) -> Result<(Activity, ActorT), ::Error> +where + Activity: ActivityHandler + DeserializeOwned + Send + 'static, + ActorT: Object + Actor + Send + 'static, + for<'de2> ::Kind: serde::Deserialize<'de2>, + ::Error: From + From<::Error>, + ::Error: From, + Datatype: Clone, +{ + let activity: Activity = serde_json::from_slice(body).map_err(|e| { + // Attempt to include activity id in error message + #[derive(Deserialize)] + struct Id { + id: Url, + } + match serde_json::from_slice::(body) { + Ok(id) => Error::ParseReceivedActivity(id.id, e), + Err(e) => Error::Json(e), + } + })?; + data.config.verify_url_and_domain(&activity).await?; + let actor = ObjectId::::from(activity.actor().clone()) + .dereference(data) + .await?; + Ok((activity, actor)) +}