Better fetch errors

This commit is contained in:
Felix Ableitner 2023-12-12 00:02:35 +01:00
parent 12aad8bf3c
commit e50dbfa8f0
5 changed files with 42 additions and 43 deletions

View file

@ -16,16 +16,12 @@ use futures::StreamExt;
use httpdate::fmt_http_date; use httpdate::fmt_http_date;
use itertools::Itertools; use itertools::Itertools;
use openssl::pkey::{PKey, Private}; use openssl::pkey::{PKey, Private};
use reqwest::{ use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
header::{HeaderMap, HeaderName, HeaderValue},
Request,
};
use reqwest_middleware::ClientWithMiddleware;
use serde::Serialize; use serde::Serialize;
use std::{ use std::{
self, self,
fmt::{Debug, Display}, fmt::{Debug, Display},
time::{Duration, SystemTime}, time::SystemTime,
}; };
use tracing::debug; use tracing::debug;
use url::Url; use url::Url;
@ -33,12 +29,12 @@ use url::Url;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// all info needed to send one activity to one inbox /// all info needed to send one activity to one inbox
pub struct SendActivityTask<'a> { pub struct SendActivityTask<'a> {
actor_id: &'a Url, pub(crate) actor_id: &'a Url,
activity_id: &'a Url, activity_id: &'a Url,
activity: Bytes, pub(crate) activity: Bytes,
inbox: Url, pub(crate) inbox: Url,
private_key: PKey<Private>, pub(crate) private_key: PKey<Private>,
http_signature_compat: bool, pub(crate) http_signature_compat: bool,
} }
impl Display for SendActivityTask<'_> { impl Display for SendActivityTask<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@ -95,35 +91,27 @@ impl SendActivityTask<'_> {
} }
/// convert a sendactivitydata to a request, signing and sending it /// convert a sendactivitydata to a request, signing and sending it
pub async fn sign_and_send<Datatype: Clone>(&self, data: &Data<Datatype>) -> Result<(), Error> { pub async fn sign_and_send<Datatype: Clone>(self, data: &Data<Datatype>) -> Result<(), Error> {
let req = self let request_builder = data
.sign(&data.config.client, data.config.request_timeout) .config
.await?; .client
self.send(&data.config.client, req).await .post(self.inbox.to_string())
} .timeout(data.config.request_timeout)
async fn sign( .headers(generate_request_headers(&self.inbox));
&self,
client: &ClientWithMiddleware,
timeout: Duration,
) -> Result<Request, Error> {
let task = self;
let request_builder = client
.post(task.inbox.to_string())
.timeout(timeout)
.headers(generate_request_headers(&task.inbox));
let request = sign_request( let request = sign_request(
request_builder, request_builder,
task.actor_id, self.actor_id,
task.activity.clone(), self.activity.clone(),
task.private_key.clone(), self.private_key.clone(),
task.http_signature_compat, self.http_signature_compat,
) )
.await?; .await?;
Ok(request) let response = data
} .config
.client
async fn send(&self, client: &ClientWithMiddleware, request: Request) -> Result<(), Error> { .execute(request)
let response = client.execute(request).await?; .await
.map_err(|e| Error::FetchError(self.inbox.clone(), e))?;
match response { match response {
o if o.status().is_success() => { o if o.status().is_success() => {

View file

@ -39,11 +39,17 @@ pub enum Error {
#[error(transparent)] #[error(transparent)]
Json(#[from] serde_json::Error), Json(#[from] serde_json::Error),
/// Reqwest Middleware Error /// Reqwest Middleware Error
#[error(transparent)] //#[error(transparent)]
ReqwestMiddleware(#[from] reqwest_middleware::Error), //ReqwestMiddleware(#[from] reqwest_middleware::Error),
/// Reqwest Error /// Reqwest Error
//#[error(transparent)]
//Reqwest(#[from] reqwest::Error),
#[error("Failed to fetch object from {0}: {1}")]
FetchError(Url, reqwest_middleware::Error),
#[error("Failed to send activity to {0}: {1}")]
SendActivityError(Url, reqwest::Error),
#[error(transparent)] #[error(transparent)]
Reqwest(#[from] reqwest::Error), ReqwestPollStreamError(reqwest::Error),
/// UTF-8 error /// UTF-8 error
#[error(transparent)] #[error(transparent)]
Utf8(#[from] FromUtf8Error), Utf8(#[from] FromUtf8Error),

View file

@ -83,10 +83,11 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
data.config.http_signature_compat, data.config.http_signature_compat,
) )
.await?; .await?;
config.client.execute(req).await? config.client.execute(req).await
} else { } else {
req.send().await? req.send().await
}; }
.map_err(|e| Error::FetchError(url.clone(), e))?;
if res.status() == StatusCode::GONE { if res.status() == StatusCode::GONE {
return Err(Error::ObjectDeleted(url.clone())); return Err(Error::ObjectDeleted(url.clone()));

View file

@ -100,6 +100,7 @@ pub(crate) async fn sign_request(
true => CONFIG_COMPAT.clone(), true => CONFIG_COMPAT.clone(),
}; };
request_builder request_builder
// TODO: this should simply return SignError and wrap reqwest::Error
.signature_with_digest( .signature_with_digest(
sig_conf.clone(), sig_conf.clone(),
key_id, key_id,

View file

@ -30,7 +30,10 @@ impl Future for BytesFuture {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
let this = self.as_mut().project(); let this = self.as_mut().project();
if let Some(chunk) = ready!(this.stream.poll_next(cx)).transpose()? { if let Some(chunk) = ready!(this.stream.poll_next(cx))
.transpose()
.map_err(Error::ReqwestPollStreamError)?
{
this.aggregator.put(chunk); this.aggregator.put(chunk);
if this.aggregator.len() > *this.limit { if this.aggregator.len() > *this.limit {
return Poll::Ready(Err(Error::ResponseBodyLimit)); return Poll::Ready(Err(Error::ResponseBodyLimit));