Impose limits on the size of downloaded content from foreign servers

This commit is contained in:
asonix 2024-06-23 13:35:24 -05:00
parent 97567cf598
commit 5aa97212b3
7 changed files with 85 additions and 7 deletions

1
Cargo.lock generated
View file

@ -399,6 +399,7 @@ dependencies = [
"console-subscriber",
"dashmap",
"dotenv",
"futures-core",
"http-signature-normalization-actix",
"http-signature-normalization-reqwest",
"lru",

View file

@ -38,6 +38,7 @@ config = { version = "0.14.0", default-features = false, features = ["toml", "js
console-subscriber = { version = "0.2", optional = true }
dashmap = "5.1.0"
dotenv = "0.15.0"
futures-core = "0.3.30"
lru = "0.12.0"
metrics = "0.22.0"
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = [

View file

@ -123,6 +123,9 @@ pub(crate) enum ErrorKind {
#[error("Couldn't sign request")]
SignRequest,
#[error("Response body from server exceeded limits")]
BodyTooLarge,
#[error("Couldn't make request")]
Reqwest(#[from] reqwest::Error),

View file

@ -38,6 +38,7 @@ mod middleware;
mod requests;
mod routes;
mod spawner;
mod stream;
mod telegram;
use crate::config::UrlKind;

View file

@ -2,6 +2,7 @@ use crate::{
data::LastOnline,
error::{Error, ErrorKind},
spawner::Spawner,
stream::{aggregate, limit_stream},
};
use activitystreams::iri_string::types::IriString;
use actix_web::http::header::Date;
@ -24,6 +25,9 @@ const ONE_MINUTE: u64 = 60 * ONE_SECOND;
const ONE_HOUR: u64 = 60 * ONE_MINUTE;
const ONE_DAY: u64 = 24 * ONE_HOUR;
// 20 KB
const JSON_SIZE_LIMIT: usize = 20 * 1024;
#[derive(Debug)]
pub(crate) enum BreakerStrategy {
// Requires a successful response
@ -262,7 +266,7 @@ impl Requests {
where
T: serde::de::DeserializeOwned,
{
let body = self
let stream = self
.do_deliver(
url,
&serde_json::json!({}),
@ -271,8 +275,9 @@ impl Requests {
strategy,
)
.await?
.bytes()
.await?;
.bytes_stream();
let body = aggregate(limit_stream(stream, JSON_SIZE_LIMIT)).await?;
Ok(serde_json::from_slice(&body)?)
}
@ -299,11 +304,12 @@ impl Requests {
where
T: serde::de::DeserializeOwned,
{
let body = self
let stream = self
.do_fetch_response(url, accept, strategy)
.await?
.bytes()
.await?;
.bytes_stream();
let body = aggregate(limit_stream(stream, JSON_SIZE_LIMIT)).await?;
Ok(serde_json::from_slice(&body)?)
}

View file

@ -2,10 +2,14 @@ use crate::{
data::MediaCache,
error::Error,
requests::{BreakerStrategy, Requests},
stream::limit_stream,
};
use actix_web::{body::BodyStream, web, HttpResponse};
use uuid::Uuid;
// 16 MB
const IMAGE_SIZE_LIMIT: usize = 16 * 1024 * 1024;
#[tracing::instrument(name = "Media", skip(media, requests))]
pub(crate) async fn route(
media: web::Data<MediaCache>,
@ -25,7 +29,10 @@ pub(crate) async fn route(
response.insert_header((name.clone(), value.clone()));
}
return Ok(response.body(BodyStream::new(res.bytes_stream())));
return Ok(response.body(BodyStream::new(limit_stream(
res.bytes_stream(),
IMAGE_SIZE_LIMIT,
))));
}
Ok(HttpResponse::NotFound().finish())

59
src/stream.rs Normal file
View file

@ -0,0 +1,59 @@
use crate::error::{Error, ErrorKind};
use actix_web::web::{Bytes, BytesMut};
use futures_core::Stream;
use streem::IntoStreamer;
pub(crate) fn limit_stream<'a, S>(
input: S,
limit: usize,
) -> impl Stream<Item = Result<Bytes, Error>> + Send + 'a
where
S: Stream<Item = reqwest::Result<Bytes>> + Send + 'a,
{
streem::try_from_fn(move |yielder| async move {
let stream = std::pin::pin!(input);
let mut stream = stream.into_streamer();
let mut count = 0;
while let Some(bytes) = stream.try_next().await? {
count += bytes.len();
if count > limit {
return Err(ErrorKind::BodyTooLarge.into());
}
yielder.yield_ok(bytes).await;
}
Ok(())
})
}
pub(crate) async fn aggregate<S>(input: S) -> Result<Bytes, Error>
where
S: Stream<Item = Result<Bytes, Error>>,
{
let stream = std::pin::pin!(input);
let mut streamer = stream.into_streamer();
let mut buf = Vec::new();
while let Some(bytes) = streamer.try_next().await? {
buf.push(bytes);
}
if buf.len() == 1 {
return Ok(buf.pop().expect("buf has exactly one element"));
}
let total_size: usize = buf.iter().map(|b| b.len()).sum();
let mut bytes_mut = BytesMut::with_capacity(total_size);
for bytes in &buf {
bytes_mut.extend_from_slice(&bytes);
}
Ok(bytes_mut.freeze())
}