diff --git a/src/activity_sending.rs b/src/activity_sending.rs index 911d827..24a25f4 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -16,16 +16,12 @@ use futures::StreamExt; use httpdate::fmt_http_date; use itertools::Itertools; use openssl::pkey::{PKey, Private}; -use reqwest::{ - header::{HeaderMap, HeaderName, HeaderValue}, - Request, -}; -use reqwest_middleware::ClientWithMiddleware; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::Serialize; use std::{ self, fmt::{Debug, Display}, - time::{Duration, SystemTime}, + time::SystemTime, }; use tracing::debug; use url::Url; @@ -33,12 +29,12 @@ use url::Url; #[derive(Clone, Debug)] /// all info needed to send one activity to one inbox pub struct SendActivityTask<'a> { - actor_id: &'a Url, + pub(crate) actor_id: &'a Url, activity_id: &'a Url, - activity: Bytes, - inbox: Url, - private_key: PKey, - http_signature_compat: bool, + pub(crate) activity: Bytes, + pub(crate) inbox: Url, + pub(crate) private_key: PKey, + pub(crate) http_signature_compat: bool, } impl Display for SendActivityTask<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -95,35 +91,27 @@ impl SendActivityTask<'_> { } /// convert a sendactivitydata to a request, signing and sending it - pub async fn sign_and_send(&self, data: &Data) -> Result<(), Error> { - let req = self - .sign(&data.config.client, data.config.request_timeout) - .await?; - self.send(&data.config.client, req).await - } - async fn sign( - &self, - client: &ClientWithMiddleware, - timeout: Duration, - ) -> Result { - let task = self; - let request_builder = client - .post(task.inbox.to_string()) - .timeout(timeout) - .headers(generate_request_headers(&task.inbox)); + pub async fn sign_and_send(self, data: &Data) -> Result<(), Error> { + let request_builder = data + .config + .client + .post(self.inbox.to_string()) + .timeout(data.config.request_timeout) + .headers(generate_request_headers(&self.inbox)); let request = sign_request( request_builder, - task.actor_id, - task.activity.clone(), - task.private_key.clone(), - task.http_signature_compat, + self.actor_id, + self.activity.clone(), + self.private_key.clone(), + self.http_signature_compat, ) .await?; - Ok(request) - } - - async fn send(&self, client: &ClientWithMiddleware, request: Request) -> Result<(), Error> { - let response = client.execute(request).await?; + let response = data + .config + .client + .execute(request) + .await + .map_err(|e| Error::FetchError(self.inbox.clone(), e))?; match response { o if o.status().is_success() => { diff --git a/src/error.rs b/src/error.rs index 2d179d1..1b326d9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -39,11 +39,17 @@ pub enum Error { #[error(transparent)] Json(#[from] serde_json::Error), /// Reqwest Middleware Error - #[error(transparent)] - ReqwestMiddleware(#[from] reqwest_middleware::Error), + //#[error(transparent)] + //ReqwestMiddleware(#[from] reqwest_middleware::Error), /// Reqwest Error + //#[error(transparent)] + //Reqwest(#[from] reqwest::Error), + #[error("Failed to fetch object from {0}: {1}")] + FetchError(Url, reqwest_middleware::Error), + #[error("Failed to send activity to {0}: {1}")] + SendActivityError(Url, reqwest::Error), #[error(transparent)] - Reqwest(#[from] reqwest::Error), + ReqwestPollStreamError(reqwest::Error), /// UTF-8 error #[error(transparent)] Utf8(#[from] FromUtf8Error), diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 4a0d502..54b008b 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -83,10 +83,11 @@ async fn fetch_object_http_with_accept( data.config.http_signature_compat, ) .await?; - config.client.execute(req).await? + config.client.execute(req).await } else { - req.send().await? - }; + req.send().await + } + .map_err(|e| Error::FetchError(url.clone(), e))?; if res.status() == StatusCode::GONE { return Err(Error::ObjectDeleted(url.clone())); diff --git a/src/http_signatures.rs b/src/http_signatures.rs index 59d1d48..4064a17 100644 --- a/src/http_signatures.rs +++ b/src/http_signatures.rs @@ -100,6 +100,7 @@ pub(crate) async fn sign_request( true => CONFIG_COMPAT.clone(), }; request_builder + // TODO: this should simply return SignError and wrap reqwest::Error .signature_with_digest( sig_conf.clone(), key_id, diff --git a/src/reqwest_shim.rs b/src/reqwest_shim.rs index 9db846e..980b0b9 100644 --- a/src/reqwest_shim.rs +++ b/src/reqwest_shim.rs @@ -30,7 +30,10 @@ impl Future for BytesFuture { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { let this = self.as_mut().project(); - if let Some(chunk) = ready!(this.stream.poll_next(cx)).transpose()? { + if let Some(chunk) = ready!(this.stream.poll_next(cx)) + .transpose() + .map_err(Error::ReqwestPollStreamError)? + { this.aggregator.put(chunk); if this.aggregator.len() > *this.limit { return Poll::Ready(Err(Error::ResponseBodyLimit));