diff --git a/src/activity_queue.rs b/src/activity_queue.rs index 10a0fa1..9eca4e8 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -7,12 +7,12 @@ use crate::{ filter_inboxes, generate_request_headers, get_pkey_cached, + send, serialize_activity, }, config::Data, error::Error, http_signatures::sign_request, - reqwest_shim::ResponseExt, traits::{ActivityHandler, Actor}, }; use bytes::Bytes; @@ -20,7 +20,6 @@ use futures::StreamExt; use futures_core::Future; use itertools::Itertools; use openssl::pkey::{PKey, Private}; -use reqwest::Request; use reqwest_middleware::ClientWithMiddleware; use serde::Serialize; use std::{ @@ -125,6 +124,12 @@ pub(crate) struct SendActivityTask { http_signature_compat: bool, } +impl Display for SendActivityTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} to {}", self.activity_id, self.inbox) + } +} + async fn sign_and_send( task: &SendActivityTask, client: &ClientWithMiddleware, @@ -160,44 +165,6 @@ async fn sign_and_send( .await } -async fn send( - task: &SendActivityTask, - client: &ClientWithMiddleware, - request: Request, -) -> Result<(), Error> { - let response = client.execute(request).await; - - match response { - Ok(o) if o.status().is_success() => { - debug!( - "Activity {} delivered successfully to {}", - task.activity_id, task.inbox - ); - Ok(()) - } - Ok(o) if o.status().is_client_error() => { - let text = o.text_limited().await?; - debug!( - "Activity {} was rejected by {}, aborting: {}", - task.activity_id, task.inbox, text, - ); - Ok(()) - } - Ok(o) => { - let status = o.status(); - let text = o.text_limited().await?; - Err(Error::Other(format!( - "Queueing activity {} to {} for retry after failure with status {}: {}", - task.activity_id, task.inbox, status, text, - ))) - } - Err(e) => Err(Error::Other(format!( - "Queueing activity {} to {} for retry after connection failure: {}", - task.activity_id, task.inbox, e - ))), - } -} - /// A simple activity queue which spawns tokio workers to send out requests /// When creating a queue, it will spawn a task per worker thread /// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory) @@ -520,8 +487,8 @@ async fn retry>, A: FnMut mod tests { use axum::extract::State; use bytes::Bytes; - use http::StatusCode; - use std::time::Instant;use http::HeaderMap; + use http::{HeaderMap, StatusCode}; + use std::time::Instant; use crate::http_signatures::generate_actor_keypair; diff --git a/src/activity_sending.rs b/src/activity_sending.rs index 9940a96..3dda441 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -3,20 +3,23 @@ #![doc = include_str!("../docs/09_sending_activities.md")] use crate::{ - config::Data, + config::{Data, FederationConfig}, error::Error, http_signatures::sign_request, reqwest_shim::ResponseExt, traits::{ActivityHandler, Actor}, FEDERATION_CONTENT_TYPE, }; -use crate::config::FederationConfig; use bytes::Bytes; -use futures::{StreamExt}; +use futures::StreamExt; use httpdate::fmt_http_date; -use itertools::{Itertools}; +use itertools::Itertools; use openssl::pkey::{PKey, Private}; -use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; +use reqwest::{ + header::{HeaderMap, HeaderName, HeaderValue}, + Request, +}; +use reqwest_middleware::ClientWithMiddleware; use serde::Serialize; use std::{ self, @@ -37,6 +40,7 @@ pub struct SendActivityTask<'a> { pub(crate) private_key: PKey, pub(crate) http_signature_compat: bool, } + impl Display for SendActivityTask<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{} to {}", self.activity_id, self.inbox) @@ -99,26 +103,35 @@ impl SendActivityTask<'_> { self.http_signature_compat, ) .await?; - let response = client.execute(request).await?; - match response { - o if o.status().is_success() => { - debug!("Activity {self} delivered successfully"); - Ok(()) - } - o if o.status().is_client_error() => { - let text = o.text_limited().await?; - debug!("Activity {self} was rejected, aborting: {text}"); - Ok(()) - } - o => { - let status = o.status(); - let text = o.text_limited().await?; + send(&self, client, request).await + } +} - Err(Error::Other(format!( - "Activity {self} failure with status {status}: {text}", - ))) - } +pub(crate) async fn send( + activity: &T, + client: &ClientWithMiddleware, + request: Request, +) -> Result<(), Error> { + let response = client.execute(request).await?; + + match response { + o if o.status().is_success() => { + debug!("Activity {activity} delivered successfully"); + Ok(()) + } + o if o.status().is_client_error() => { + let text = o.text_limited().await?; + debug!("Activity {activity} was rejected, aborting: {text}"); + Ok(()) + } + o => { + let status = o.status(); + let text = o.text_limited().await?; + + Err(Error::Other(format!( + "Activity {activity} failure with status {status}: {text}", + ))) } } }