diff --git a/.drone.yml b/.drone.yml index af9793f..5641110 100644 --- a/.drone.yml +++ b/.drone.yml @@ -14,7 +14,7 @@ steps: - /root/.cargo/bin/cargo fmt -- --check - name: cargo check - image: rust:1.61-bullseye + image: rust:1.65-bullseye environment: CARGO_HOME: .cargo commands: @@ -35,7 +35,7 @@ steps: - cargo clippy --workspace --all-features -- -D clippy::unwrap_used - name: cargo test - image: rust:1.61-bullseye + image: rust:1.65-bullseye environment: CARGO_HOME: .cargo RUST_BACKTRACE: 1 @@ -43,14 +43,14 @@ steps: - cargo test --workspace --no-fail-fast - name: cargo run actix - image: rust:1.61-bullseye + image: rust:1.65-bullseye environment: CARGO_HOME: .cargo RUST_BACKTRACE: 1 commands: - cargo run --example simple_federation_actix - name: cargo run axum - image: rust:1.61-bullseye + image: rust:1.65-bullseye environment: CARGO_HOME: .cargo RUST_BACKTRACE: 1 diff --git a/Cargo.lock b/Cargo.lock index c4ff2c6..4bd97c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,11 +14,13 @@ dependencies = [ "axum", "background-jobs", "base64", + "bytes", "chrono", "derive_builder", "dyn-clone", "enum_delegate", "env_logger", + "futures-core", "http", "http-signature-normalization", "http-signature-normalization-reqwest", @@ -27,6 +29,7 @@ dependencies = [ "itertools", "once_cell", "openssl", + "pin-project-lite", "rand", "reqwest", "reqwest-middleware", @@ -417,9 +420,9 @@ checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" [[package]] name = "bytes" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" [[package]] name = "bytestring" @@ -1481,6 +1484,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index eff0b93..6724f50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.58" url = { version = "2.3.1", features = ["serde"] } serde_json = { version = "1.0.87", features = ["preserve_order"] } anyhow = "1.0.66" -reqwest = { version = "0.11.12", features = ["json"] } +reqwest = { version = "0.11.12", features = ["json", "stream"] } reqwest-middleware = "0.2.0" tracing = "0.1.37" base64 = "0.13.1" @@ -33,6 +33,9 @@ httpdate = "1.0.2" http-signature-normalization-reqwest = { version = "0.7.1", default-features = false, features = ["sha-2", "middleware"] } http-signature-normalization = "0.6.0" actix-rt = { version = "2.7.0" } +bytes = "1.3.0" +futures-core = { version = "0.3.25", default-features = false } +pin-project-lite = "0.2.9" actix-web = { version = "4.2.1", default-features = false, optional = true } axum = { version = "0.6.0", features = ["json", "headers", "macros", "original-uri"], optional = true } diff --git a/src/core/activity_queue.rs b/src/core/activity_queue.rs index be37cfb..cac04c7 100644 --- a/src/core/activity_queue.rs +++ b/src/core/activity_queue.rs @@ -1,6 +1,7 @@ use crate::{ core::signatures::{sign_request, PublicKey}, traits::ActivityHandler, + utils::reqwest_shim::ResponseExt, Error, InstanceSettings, LocalInstance, @@ -167,7 +168,7 @@ async fn do_send( } Ok(o) => { let status = o.status(); - let text = o.text().await.map_err(Error::conv)?; + let text = o.text_limited().await.map_err(Error::conv)?; Err(anyhow!( "Queueing activity {} to {} for retry after failure with status {}: {}", task.activity_id, diff --git a/src/lib.rs b/src/lib.rs index 29efc55..a1f0148 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,6 +173,8 @@ pub enum Error { NotFound, #[error("Request limit was reached during fetch")] RequestLimit, + #[error("Response body limit was reached during fetch")] + ResponseBodyLimit, #[error("Object to be fetched was deleted")] ObjectDeleted, #[error("{0}")] diff --git a/src/utils.rs b/src/utils.rs index 5f658b9..296904f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,10 +1,12 @@ -use crate::{Error, LocalInstance, APUB_JSON_CONTENT_TYPE}; +use crate::{utils::reqwest_shim::ResponseExt, Error, LocalInstance, APUB_JSON_CONTENT_TYPE}; use http::{header::HeaderName, HeaderValue, StatusCode}; use serde::de::DeserializeOwned; use std::collections::BTreeMap; use tracing::info; use url::Url; +pub(crate) mod reqwest_shim; + pub async fn fetch_object_http( url: &Url, instance: &LocalInstance, @@ -33,7 +35,7 @@ pub async fn fetch_object_http( return Err(Error::ObjectDeleted); } - res.json().await.map_err(Error::conv) + res.json_limited().await } /// Check that both urls have the same domain. If not, return UrlVerificationError. diff --git a/src/utils/reqwest_shim.rs b/src/utils/reqwest_shim.rs new file mode 100644 index 0000000..74d345b --- /dev/null +++ b/src/utils/reqwest_shim.rs @@ -0,0 +1,136 @@ +use crate::Error; +use bytes::{BufMut, Bytes, BytesMut}; +use futures_core::{ready, stream::BoxStream, Stream}; +use pin_project_lite::pin_project; +use reqwest::Response; +use serde::de::DeserializeOwned; +use std::{ + future::Future, + marker::PhantomData, + mem, + pin::Pin, + task::{Context, Poll}, +}; + +/// 100KB +const MAX_BODY_SIZE: usize = 102400; + +pin_project! { + pub struct BytesFuture { + #[pin] + stream: BoxStream<'static, reqwest::Result>, + limit: usize, + aggregator: BytesMut, + } +} + +impl Future for BytesFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let this = self.as_mut().project(); + if let Some(chunk) = ready!(this.stream.poll_next(cx)) + .transpose() + .map_err(Error::conv)? + { + this.aggregator.put(chunk); + if this.aggregator.len() > *this.limit { + return Poll::Ready(Err(Error::ResponseBodyLimit)); + } + + continue; + } + + break; + } + + Poll::Ready(Ok(mem::take(&mut self.aggregator).freeze())) + } +} + +pin_project! { + pub struct JsonFuture { + _t: PhantomData, + #[pin] + future: BytesFuture, + } +} + +impl Future for JsonFuture +where + T: DeserializeOwned, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let bytes = ready!(this.future.poll(cx))?; + Poll::Ready(serde_json::from_slice(&bytes).map_err(Error::conv)) + } +} + +pin_project! { + pub struct TextFuture { + #[pin] + future: BytesFuture, + } +} + +impl Future for TextFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let bytes = ready!(this.future.poll(cx))?; + Poll::Ready(String::from_utf8(bytes.to_vec()).map_err(Error::conv)) + } +} + +/// Response shim to work around [an issue in reqwest](https://github.com/seanmonstar/reqwest/issues/1234) (there is an [open pull request](https://github.com/seanmonstar/reqwest/pull/1532) fixing this). +/// +/// Reqwest doesn't limit the response body size by default nor does it offer an option to configure one. +/// Since we have to fetch data from untrusted sources, not restricting the maximum size is a DoS hazard for us. +/// +/// This shim reimplements the `bytes`, `json`, and `text` functions and restricts the bodies to 100KB. +/// +/// TODO: Remove this shim as soon as reqwest gets support for size-limited bodies. +pub trait ResponseExt { + type BytesFuture; + type JsonFuture; + type TextFuture; + + /// Size limited version of `bytes` to work around a reqwest issue. Check [`ResponseExt`] docs for details. + fn bytes_limited(self) -> Self::BytesFuture; + /// Size limited version of `json` to work around a reqwest issue. Check [`ResponseExt`] docs for details. + fn json_limited(self) -> Self::JsonFuture; + /// Size limited version of `text` to work around a reqwest issue. Check [`ResponseExt`] docs for details. + fn text_limited(self) -> Self::TextFuture; +} + +impl ResponseExt for Response { + type BytesFuture = BytesFuture; + type JsonFuture = JsonFuture; + type TextFuture = TextFuture; + + fn bytes_limited(self) -> Self::BytesFuture { + BytesFuture { + stream: Box::pin(self.bytes_stream()), + limit: MAX_BODY_SIZE, + aggregator: BytesMut::new(), + } + } + + fn json_limited(self) -> Self::JsonFuture { + JsonFuture { + _t: PhantomData, + future: self.bytes_limited(), + } + } + + fn text_limited(self) -> Self::TextFuture { + TextFuture { + future: self.bytes_limited(), + } + } +}