more dedup

This commit is contained in:
Felix Ableitner 2024-02-29 20:15:22 +01:00
parent 3c2f105868
commit 40d0419a4a
2 changed files with 45 additions and 65 deletions

View file

@ -7,12 +7,12 @@ use crate::{
filter_inboxes, filter_inboxes,
generate_request_headers, generate_request_headers,
get_pkey_cached, get_pkey_cached,
send,
serialize_activity, serialize_activity,
}, },
config::Data, config::Data,
error::Error, error::Error,
http_signatures::sign_request, http_signatures::sign_request,
reqwest_shim::ResponseExt,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
}; };
use bytes::Bytes; use bytes::Bytes;
@ -20,7 +20,6 @@ use futures::StreamExt;
use futures_core::Future; use futures_core::Future;
use itertools::Itertools; use itertools::Itertools;
use openssl::pkey::{PKey, Private}; use openssl::pkey::{PKey, Private};
use reqwest::Request;
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize; use serde::Serialize;
use std::{ use std::{
@ -125,6 +124,12 @@ pub(crate) struct SendActivityTask {
http_signature_compat: bool, http_signature_compat: bool,
} }
impl Display for SendActivityTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} to {}", self.activity_id, self.inbox)
}
}
async fn sign_and_send( async fn sign_and_send(
task: &SendActivityTask, task: &SendActivityTask,
client: &ClientWithMiddleware, client: &ClientWithMiddleware,
@ -160,44 +165,6 @@ async fn sign_and_send(
.await .await
} }
async fn send(
task: &SendActivityTask,
client: &ClientWithMiddleware,
request: Request,
) -> Result<(), Error> {
let response = client.execute(request).await;
match response {
Ok(o) if o.status().is_success() => {
debug!(
"Activity {} delivered successfully to {}",
task.activity_id, task.inbox
);
Ok(())
}
Ok(o) if o.status().is_client_error() => {
let text = o.text_limited().await?;
debug!(
"Activity {} was rejected by {}, aborting: {}",
task.activity_id, task.inbox, text,
);
Ok(())
}
Ok(o) => {
let status = o.status();
let text = o.text_limited().await?;
Err(Error::Other(format!(
"Queueing activity {} to {} for retry after failure with status {}: {}",
task.activity_id, task.inbox, status, text,
)))
}
Err(e) => Err(Error::Other(format!(
"Queueing activity {} to {} for retry after connection failure: {}",
task.activity_id, task.inbox, e
))),
}
}
/// A simple activity queue which spawns tokio workers to send out requests /// A simple activity queue which spawns tokio workers to send out requests
/// When creating a queue, it will spawn a task per worker thread /// When creating a queue, it will spawn a task per worker thread
/// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory) /// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
@ -520,8 +487,8 @@ async fn retry<T, E: Display + Debug, F: Future<Output = Result<T, E>>, A: FnMut
mod tests { mod tests {
use axum::extract::State; use axum::extract::State;
use bytes::Bytes; use bytes::Bytes;
use http::StatusCode; use http::{HeaderMap, StatusCode};
use std::time::Instant;use http::HeaderMap; use std::time::Instant;
use crate::http_signatures::generate_actor_keypair; use crate::http_signatures::generate_actor_keypair;

View file

@ -3,20 +3,23 @@
#![doc = include_str!("../docs/09_sending_activities.md")] #![doc = include_str!("../docs/09_sending_activities.md")]
use crate::{ use crate::{
config::Data, config::{Data, FederationConfig},
error::Error, error::Error,
http_signatures::sign_request, http_signatures::sign_request,
reqwest_shim::ResponseExt, reqwest_shim::ResponseExt,
traits::{ActivityHandler, Actor}, traits::{ActivityHandler, Actor},
FEDERATION_CONTENT_TYPE, FEDERATION_CONTENT_TYPE,
}; };
use crate::config::FederationConfig;
use bytes::Bytes; use bytes::Bytes;
use futures::{StreamExt}; use futures::StreamExt;
use httpdate::fmt_http_date; use httpdate::fmt_http_date;
use itertools::{Itertools}; use itertools::Itertools;
use openssl::pkey::{PKey, Private}; use openssl::pkey::{PKey, Private};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue},
Request,
};
use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize; use serde::Serialize;
use std::{ use std::{
self, self,
@ -37,6 +40,7 @@ pub struct SendActivityTask<'a> {
pub(crate) private_key: PKey<Private>, pub(crate) private_key: PKey<Private>,
pub(crate) http_signature_compat: bool, pub(crate) http_signature_compat: bool,
} }
impl Display for SendActivityTask<'_> { impl Display for SendActivityTask<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} to {}", self.activity_id, self.inbox) write!(f, "{} to {}", self.activity_id, self.inbox)
@ -99,26 +103,35 @@ impl SendActivityTask<'_> {
self.http_signature_compat, self.http_signature_compat,
) )
.await?; .await?;
let response = client.execute(request).await?;
match response { send(&self, client, request).await
o if o.status().is_success() => { }
debug!("Activity {self} delivered successfully"); }
Ok(())
}
o if o.status().is_client_error() => {
let text = o.text_limited().await?;
debug!("Activity {self} was rejected, aborting: {text}");
Ok(())
}
o => {
let status = o.status();
let text = o.text_limited().await?;
Err(Error::Other(format!( pub(crate) async fn send<T: Display>(
"Activity {self} failure with status {status}: {text}", activity: &T,
))) client: &ClientWithMiddleware,
} request: Request,
) -> Result<(), Error> {
let response = client.execute(request).await?;
match response {
o if o.status().is_success() => {
debug!("Activity {activity} delivered successfully");
Ok(())
}
o if o.status().is_client_error() => {
let text = o.text_limited().await?;
debug!("Activity {activity} was rejected, aborting: {text}");
Ok(())
}
o => {
let status = o.status();
let text = o.text_limited().await?;
Err(Error::Other(format!(
"Activity {activity} failure with status {status}: {text}",
)))
} }
} }
} }