Replace awc with reqwest

This commit is contained in:
asonix 2023-07-21 16:58:31 -05:00
parent da0719392e
commit 2f0a3618d8
8 changed files with 296 additions and 189 deletions

266
Cargo.lock generated
View file

@ -159,25 +159,6 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "actix-tls"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fde0cf292f7cdc7f070803cb9a0d45c018441321a78b1042ffbbb81ec333297"
dependencies = [
"actix-codec",
"actix-rt",
"actix-service",
"actix-utils",
"futures-core",
"http",
"log",
"pin-project-lite",
"tokio-rustls",
"tokio-util",
"webpki-roots",
]
[[package]]
name = "actix-utils"
version = "3.0.1"
@ -367,40 +348,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "awc"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ef547a81796eb2dfe9b345aba34c2e08391a0502493711395b36dd64052b69"
dependencies = [
"actix-codec",
"actix-http",
"actix-rt",
"actix-service",
"actix-tls",
"actix-utils",
"ahash 0.7.6",
"base64 0.21.2",
"bytes",
"cfg-if",
"derive_more",
"futures-core",
"futures-util",
"h2",
"http",
"itoa",
"log",
"mime",
"percent-encoding",
"pin-project-lite",
"rand",
"rustls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
]
[[package]]
name = "axum"
version = "0.6.18"
@ -972,8 +919,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
@ -1126,6 +1075,20 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97"
dependencies = [
"futures-util",
"http",
"hyper",
"rustls",
"tokio",
"tokio-rustls",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
@ -1193,6 +1156,12 @@ dependencies = [
"libc",
]
[[package]]
name = "ipnet"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
[[package]]
name = "is-terminal"
version = "0.4.9"
@ -1348,6 +1317,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -1642,7 +1621,7 @@ dependencies = [
[[package]]
name = "pict-rs"
version = "0.5.0-alpha.3"
version = "0.5.0-alpha.4"
dependencies = [
"actix-form-data",
"actix-rt",
@ -1650,7 +1629,6 @@ dependencies = [
"actix-web",
"anyhow",
"async-trait",
"awc",
"base64 0.21.2",
"clap",
"color-eyre",
@ -1667,6 +1645,9 @@ dependencies = [
"opentelemetry-otlp",
"pin-project-lite",
"quick-xml 0.29.0",
"reqwest",
"reqwest-middleware",
"reqwest-tracing",
"rusty-s3",
"serde",
"serde_cbor",
@ -1683,7 +1664,6 @@ dependencies = [
"toml 0.7.6",
"tracing",
"tracing-actix-web",
"tracing-awc",
"tracing-error",
"tracing-futures",
"tracing-log",
@ -1893,6 +1873,81 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ea92a5b6195c6ef2a0295ea818b312502c6fc94dde986c5553242e18fd4ce2"
[[package]]
name = "reqwest"
version = "0.11.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55"
dependencies = [
"base64 0.21.2",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-rustls",
"ipnet",
"js-sys",
"log",
"mime",
"mime_guess",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
"winreg",
]
[[package]]
name = "reqwest-middleware"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4531c89d50effe1fac90d095c8b133c20c5c714204feee0bfc3fd158e784209d"
dependencies = [
"anyhow",
"async-trait",
"http",
"reqwest",
"serde",
"task-local-extensions",
"thiserror",
]
[[package]]
name = "reqwest-tracing"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b97ad83c2fc18113346b7158d79732242002427c30f620fa817c1f32901e0a8"
dependencies = [
"anyhow",
"async-trait",
"getrandom",
"matchit",
"opentelemetry",
"reqwest",
"reqwest-middleware",
"task-local-extensions",
"tracing",
"tracing-opentelemetry",
]
[[package]]
name = "ring"
version = "0.16.20"
@ -1968,14 +2023,33 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.20.8"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36"
dependencies = [
"log",
"ring",
"rustls-webpki",
"sct",
"webpki",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2"
dependencies = [
"base64 0.21.2",
]
[[package]]
name = "rustls-webpki"
version = "0.101.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e"
dependencies = [
"ring",
"untrusted",
]
[[package]]
@ -2242,6 +2316,15 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "task-local-extensions"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba323866e5d033818e3240feeb9f7db2c4296674e4d9e16b97b7bf8f490434e8"
dependencies = [
"pin-utils",
]
[[package]]
name = "thiserror"
version = "1.0.43"
@ -2358,13 +2441,12 @@ dependencies = [
[[package]]
name = "tokio-rustls"
version = "0.23.4"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
@ -2580,23 +2662,6 @@ dependencies = [
"syn 2.0.26",
]
[[package]]
name = "tracing-awc"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa1a68fce4d1a7fad459f81ddcafbdd7c6f6bcda5c7e07d5f42db637931fac7"
dependencies = [
"actix-http",
"actix-service",
"awc",
"bytes",
"futures-core",
"opentelemetry",
"pin-project-lite",
"tracing",
"tracing-opentelemetry",
]
[[package]]
name = "tracing-core"
version = "0.1.31"
@ -2701,6 +2766,15 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.13"
@ -2814,6 +2888,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.87"
@ -2843,6 +2929,19 @@ version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
[[package]]
name = "wasm-streams"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.64"
@ -2969,6 +3068,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "yaml-rust"
version = "0.4.5"

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.5.0-alpha.3"
version = "0.5.0-alpha.4"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
@ -11,12 +11,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = []
io-uring = [
"actix-rt/io-uring",
"actix-server/io-uring",
"tokio-uring",
"sled/io_uring",
]
io-uring = ["actix-rt/io-uring", "actix-server/io-uring", "tokio-uring", "sled/io_uring"]
[dependencies]
actix-form-data = "0.7.0-beta.4"
@ -25,7 +20,6 @@ actix-server = "2.0.0"
actix-web = { version = "4.0.0", default-features = false }
anyhow = "1.0"
async-trait = "0.1.51"
awc = { version = "3.0.0", default-features = false, features = ["rustls"] }
base64 = "0.21.0"
clap = { version = "4.0.2", features = ["derive"] }
color-eyre = "0.6"
@ -42,6 +36,9 @@ opentelemetry = { version = "0.19", features = ["rt-tokio"] }
opentelemetry-otlp = "0.12"
pin-project-lite = "0.2.7"
quick-xml = { version = "0.29.0", features = ["serialize"] }
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "stream"] }
reqwest-middleware = "0.2.2"
reqwest-tracing = { version = "0.4.5", features = ["opentelemetry_0_19"] }
rusty-s3 = "0.4.1"
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.2"
@ -79,8 +76,3 @@ uuid = { version = "1", features = ["v4", "serde"] }
version = "0.7.5"
default-features = false
features = ["opentelemetry_0_19"]
[dependencies.tracing-awc]
version = "0.1.7"
default-features = false
features = ["opentelemetry_0_19"]

View file

@ -12,7 +12,7 @@
rustPlatform.buildRustPackage {
pname = "pict-rs";
version = "0.5.0-alpha.3";
version = "0.5.0-alpha.4";
src = ./.;
cargoLock = {

View file

@ -31,13 +31,7 @@ max_file_count = 1
# environment variable: PICTRS__CLIENT__POOL_SIZE
# default: 100
#
# This number is multiplied the number of cores available to pict-rs. Running on a 2 core machine
# with the default value will result in 200 pooled connections. Running on a 32 core machine with
# the default value will result in 3200 pooled connections.
#
# This number can be lowered to keep pict-rs within ulimit bounds if you encounter errors related to
# "Too many open files". Alternatively, increasing the ulimit of your system can solve this problem
# as well.
# Sets the maximum number of allowed idle connections per-host.
pool_size = 100
## Optional: time (in seconds) the client will wait for a response before giving up

View file

@ -2,8 +2,10 @@ use actix_web::{
body::MessageBody,
web::{Bytes, BytesMut},
};
use futures_util::Stream;
use std::{
collections::{vec_deque::IntoIter, VecDeque},
convert::Infallible,
pin::Pin,
task::{Context, Poll},
};
@ -76,3 +78,11 @@ impl MessageBody for BytesStream {
Ok(self.into_bytes())
}
}
impl Stream for BytesStream {
type Item = Result<Bytes, Infallible>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.get_mut().inner.pop_front().map(Ok))
}
}

View file

@ -82,6 +82,15 @@ pub(crate) enum UploadError {
#[error("Error in exiftool")]
Exiftool(#[from] crate::exiftool::ExifError),
#[error("Error building reqwest client")]
BuildClient(#[source] reqwest::Error),
#[error("Error making request")]
RequestMiddleware(#[from] reqwest_middleware::Error),
#[error("Error in request response")]
Request(#[from] reqwest::Error),
#[error("pict-rs is in read-only mode")]
ReadOnly,
@ -115,12 +124,6 @@ pub(crate) enum UploadError {
#[error("Unable to download image, bad response {0}")]
Download(actix_web::http::StatusCode),
#[error("Unable to download image")]
Payload(#[from] awc::error::PayloadError),
#[error("Unable to send request, {0}")]
SendRequest(String),
#[error("Tried to save an image with an already-taken name")]
DuplicateAlias,
@ -140,12 +143,6 @@ pub(crate) enum UploadError {
Timeout(#[from] crate::stream::TimeoutError),
}
impl From<awc::error::SendRequestError> for UploadError {
fn from(e: awc::error::SendRequestError) -> Self {
UploadError::SendRequest(e.to_string())
}
}
impl From<actix_web::error::BlockingError> for UploadError {
fn from(_: actix_web::error::BlockingError) -> Self {
UploadError::Canceled

View file

@ -33,7 +33,6 @@ use actix_web::{
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
};
use awc::{Client, Connector};
use formats::InputProcessableFormat;
use futures_util::{
stream::{empty, once},
@ -41,6 +40,8 @@ use futures_util::{
};
use once_cell::sync::{Lazy, OnceCell};
use repo::sled::SledRepo;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::TracingMiddleware;
use rusty_s3::UrlStyle;
use std::{
future::ready,
@ -51,7 +52,6 @@ use std::{
};
use tokio::sync::Semaphore;
use tracing_actix_web::TracingLogger;
use tracing_awc::Tracing;
use tracing_futures::Instrument;
use self::{
@ -470,7 +470,7 @@ struct UrlQuery {
/// download an image from a URL
#[tracing::instrument(name = "Downloading file", skip(client, repo, store))]
async fn download<R: FullRepo + 'static, S: Store + 'static>(
client: web::Data<Client>,
client: web::Data<ClientWithMiddleware>,
repo: web::Data<R>,
store: web::Data<S>,
query: web::Query<UrlQuery>,
@ -486,6 +486,7 @@ async fn download<R: FullRepo + 'static, S: Store + 'static>(
}
let stream = res
.bytes_stream()
.map_err(Error::from)
.limit((CONFIG.media.max_file_size * MEGABYTES) as u64);
@ -1182,15 +1183,17 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error {
error
}
fn build_client() -> awc::Client {
let connector = Connector::new().limit(CONFIG.client.pool_size);
fn build_client() -> Result<ClientWithMiddleware, Error> {
let client = reqwest::Client::builder()
.user_agent("pict-rs v0.5.0-main")
.use_rustls_tls()
.pool_max_idle_per_host(CONFIG.client.pool_size)
.build()
.map_err(UploadError::BuildClient)?;
Client::builder()
.connector(connector)
.wrap(Tracing)
.add_default_header(("User-Agent", "pict-rs v0.4.1"))
.timeout(Duration::from_secs(CONFIG.client.timeout))
.finish()
Ok(ClientBuilder::new(client)
.with(TracingMiddleware::default())
.build())
}
fn next_worker_id() -> String {
@ -1209,7 +1212,7 @@ fn configure_endpoints<
config: &mut web::ServiceConfig,
repo: R,
store: S,
client: Client,
client: ClientWithMiddleware,
extra_config: F,
) {
config
@ -1301,10 +1304,11 @@ where
async fn launch_file_store<R: FullRepo + 'static, F: Fn(&mut web::ServiceConfig) + Send + Clone>(
repo: R,
store: FileStore,
client: ClientWithMiddleware,
extra_config: F,
) -> std::io::Result<()> {
HttpServer::new(move || {
let client = build_client();
let client = client.clone();
let store = store.clone();
let repo = repo.clone();
@ -1328,10 +1332,11 @@ async fn launch_object_store<
>(
repo: R,
store_config: ObjectStoreConfig,
client: ClientWithMiddleware,
extra_config: F,
) -> std::io::Result<()> {
HttpServer::new(move || {
let client = build_client();
let client = client.clone();
let store = store_config.clone().build(client.clone());
let repo = repo.clone();
@ -1351,7 +1356,7 @@ async fn launch_object_store<
async fn migrate_inner<S1>(
repo: Repo,
client: Client,
client: ClientWithMiddleware,
from: S1,
to: config::primitives::Store,
skip_missing_files: bool,
@ -1481,6 +1486,7 @@ fn sled_extra_config(sc: &mut web::ServiceConfig) {
pub async fn run() -> color_eyre::Result<()> {
let repo = Repo::open(CONFIG.repo.clone())?;
repo.migrate_from_db(CONFIG.old_db.path.clone()).await?;
let client = build_client()?;
match (*OPERATION).clone() {
Operation::Run => (),
@ -1489,8 +1495,6 @@ pub async fn run() -> color_eyre::Result<()> {
from,
to,
} => {
let client = build_client();
match from {
config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?;
@ -1551,7 +1555,7 @@ pub async fn run() -> color_eyre::Result<()> {
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
launch_file_store(sled_repo, store, sled_extra_config).await?;
launch_file_store(sled_repo, store, client, sled_extra_config).await?;
}
}
}
@ -1592,7 +1596,7 @@ pub async fn run() -> color_eyre::Result<()> {
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
launch_object_store(sled_repo, store, sled_extra_config).await?;
launch_object_store(sled_repo, store, client, sled_extra_config).await?;
}
}
}

View file

@ -5,16 +5,17 @@ use crate::{
};
use actix_rt::task::JoinError;
use actix_web::{
error::{BlockingError, PayloadError},
error::BlockingError,
http::{
header::{ByteRangeSpec, Range, CONTENT_LENGTH},
StatusCode,
},
web::Bytes,
};
use awc::{error::SendRequestError, Client, ClientRequest, ClientResponse, SendClientRequest};
use base64::{prelude::BASE64_STANDARD, Engine};
use futures_util::{Stream, StreamExt, TryStreamExt};
use reqwest::{header::RANGE, Body, Response};
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle};
use std::{pin::Pin, string::FromUtf8Error, time::Duration};
use storage_path_generator::{Generator, Path};
@ -46,8 +47,11 @@ pub(crate) enum ObjectError {
#[error("IO Error")]
IO(#[from] std::io::Error),
#[error("Error making request: {0}")]
SendRequest(String),
#[error("Error making request")]
RequestMiddleware(#[from] reqwest_middleware::Error),
#[error("Error in request response")]
Request(#[from] reqwest::Error),
#[error("Failed to parse string")]
Utf8(#[from] FromUtf8Error),
@ -66,15 +70,6 @@ pub(crate) enum ObjectError {
#[error("Invalid status: {0}\n{1}")]
Status(StatusCode, String),
#[error("Unable to upload image")]
Upload(awc::error::PayloadError),
}
impl From<SendRequestError> for ObjectError {
fn from(e: SendRequestError) -> Self {
Self::SendRequest(e.to_string())
}
}
impl From<JoinError> for ObjectError {
@ -95,7 +90,7 @@ pub(crate) struct ObjectStore {
repo: Repo,
bucket: Bucket,
credentials: Credentials,
client: Client,
client: ClientWithMiddleware,
signature_expiration: Duration,
client_timeout: Duration,
public_endpoint: Option<Url>,
@ -123,7 +118,7 @@ struct InitiateMultipartUploadResponse {
}
impl ObjectStoreConfig {
pub(crate) fn build(self, client: Client) -> ObjectStore {
pub(crate) fn build(self, client: ClientWithMiddleware) -> ObjectStore {
ObjectStore {
path_gen: self.path_gen,
repo: self.repo,
@ -137,11 +132,8 @@ impl ObjectStoreConfig {
}
}
fn payload_to_io_error(e: PayloadError) -> std::io::Error {
match e {
PayloadError::Io(io) => io,
otherwise => std::io::Error::new(std::io::ErrorKind::Other, otherwise.to_string()),
}
fn payload_to_io_error(e: reqwest::Error) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
}
#[tracing::instrument(skip(stream))]
@ -162,15 +154,15 @@ where
Ok(buf)
}
async fn status_error(mut response: ClientResponse) -> StoreError {
let body = match response.body().await {
Err(e) => return ObjectError::Upload(e).into(),
async fn status_error(response: Response) -> StoreError {
let status = response.status();
let body = match response.text().await {
Err(e) => return ObjectError::Request(e).into(),
Ok(body) => body,
};
let body = String::from_utf8_lossy(&body).to_string();
ObjectError::Status(response.status(), body).into()
ObjectError::Status(status, body).into()
}
#[async_trait::async_trait(?Send)]
@ -220,7 +212,8 @@ impl Store for ObjectStore {
drop(stream);
let (req, object_id) = self.put_object_request(content_type).await?;
let response = req
.send_body(first_chunk)
.body(Body::wrap_stream(first_chunk))
.send()
.await
.map_err(ObjectError::from)?;
@ -234,13 +227,13 @@ impl Store for ObjectStore {
let mut first_chunk = Some(first_chunk);
let (req, object_id) = self.create_multipart_request(content_type).await?;
let mut response = req.send().await.map_err(ObjectError::from)?;
let response = req.send().await.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response).await);
}
let body = response.body().await.map_err(ObjectError::Upload)?;
let body = response.bytes().await.map_err(ObjectError::Request)?;
let body: InitiateMultipartUploadResponse =
quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?;
let upload_id = &body.upload_id;
@ -276,7 +269,8 @@ impl Store for ObjectStore {
&upload_id2,
)
.await?
.send_body(buf)
.body(Body::wrap_stream(buf))
.send()
.await
.map_err(ObjectError::from)?;
@ -348,7 +342,7 @@ impl Store for ObjectStore {
) -> Result<Self::Identifier, StoreError> {
let (req, object_id) = self.put_object_request(content_type).await?;
let response = req.send_body(bytes).await.map_err(ObjectError::from)?;
let response = req.body(bytes).send().await.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response).await);
@ -381,7 +375,9 @@ impl Store for ObjectStore {
return Err(status_error(response).await);
}
Ok(Box::pin(response.map_err(payload_to_io_error)))
Ok(Box::pin(
response.bytes_stream().map_err(payload_to_io_error),
))
}
#[tracing::instrument(skip(self, writer))]
@ -393,7 +389,7 @@ impl Store for ObjectStore {
where
Writer: AsyncWrite + Unpin,
{
let mut response = self
let response = self
.get_object_request(identifier, None, None)
.send()
.await
@ -406,7 +402,9 @@ impl Store for ObjectStore {
));
}
while let Some(res) = response.next().await {
let mut stream = response.bytes_stream();
while let Some(res) = stream.next().await {
let mut bytes = res.map_err(payload_to_io_error)?;
writer.write_all_buf(&mut bytes).await?;
}
@ -489,7 +487,7 @@ impl ObjectStore {
})
}
async fn head_bucket_request(&self) -> Result<ClientRequest, StoreError> {
async fn head_bucket_request(&self) -> Result<RequestBuilder, StoreError> {
let action = self.bucket.head_bucket(Some(&self.credentials));
Ok(self.build_request(action))
@ -498,7 +496,7 @@ impl ObjectStore {
async fn put_object_request(
&self,
content_type: mime::Mime,
) -> Result<(ClientRequest, ObjectId), StoreError> {
) -> Result<(RequestBuilder, ObjectId), StoreError> {
let path = self.next_file().await?;
let mut action = self.bucket.put_object(Some(&self.credentials), &path);
@ -513,7 +511,7 @@ impl ObjectStore {
async fn create_multipart_request(
&self,
content_type: mime::Mime,
) -> Result<(ClientRequest, ObjectId), StoreError> {
) -> Result<(RequestBuilder, ObjectId), StoreError> {
let path = self.next_file().await?;
let mut action = self
@ -533,7 +531,7 @@ impl ObjectStore {
object_id: &ObjectId,
part_number: u16,
upload_id: &str,
) -> Result<ClientRequest, ObjectError> {
) -> Result<RequestBuilder, ObjectError> {
use md5::Digest;
let mut action = self.bucket.upload_part(
@ -571,12 +569,12 @@ impl ObjectStore {
Ok(self.build_request(action))
}
fn send_complete_multipart_request<'a, I: Iterator<Item = &'a str>>(
async fn send_complete_multipart_request<'a, I: Iterator<Item = &'a str>>(
&'a self,
object_id: &'a ObjectId,
upload_id: &'a str,
etags: I,
) -> SendClientRequest {
) -> Result<Response, reqwest_middleware::Error> {
let mut action = self.bucket.complete_multipart_upload(
Some(&self.credentials),
object_id.as_str(),
@ -590,14 +588,14 @@ impl ObjectStore {
let (req, action) = self.build_request_inner(action);
req.send_body(action.body())
req.body(action.body()).send().await
}
fn create_abort_multipart_request(
&self,
object_id: &ObjectId,
upload_id: &str,
) -> ClientRequest {
) -> RequestBuilder {
let action = self.bucket.abort_multipart_upload(
Some(&self.credentials),
object_id.as_str(),
@ -607,18 +605,18 @@ impl ObjectStore {
self.build_request(action)
}
fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest {
fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> RequestBuilder {
let (req, _) = self.build_request_inner(action);
req
}
fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (ClientRequest, A) {
fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (RequestBuilder, A) {
let method = match A::METHOD {
rusty_s3::Method::Head => awc::http::Method::HEAD,
rusty_s3::Method::Get => awc::http::Method::GET,
rusty_s3::Method::Post => awc::http::Method::POST,
rusty_s3::Method::Put => awc::http::Method::PUT,
rusty_s3::Method::Delete => awc::http::Method::DELETE,
rusty_s3::Method::Head => reqwest::Method::HEAD,
rusty_s3::Method::Get => reqwest::Method::GET,
rusty_s3::Method::Post => reqwest::Method::POST,
rusty_s3::Method::Put => reqwest::Method::PUT,
rusty_s3::Method::Delete => reqwest::Method::DELETE,
};
let url = action.sign(self.signature_expiration);
@ -631,7 +629,7 @@ impl ObjectStore {
let req = action
.headers_mut()
.iter()
.fold(req, |req, tup| req.insert_header(tup));
.fold(req, |req, (name, value)| req.header(name, value));
(req, action)
}
@ -641,7 +639,7 @@ impl ObjectStore {
identifier: &ObjectId,
from_start: Option<u64>,
len: Option<u64>,
) -> ClientRequest {
) -> RequestBuilder {
let action = self
.bucket
.get_object(Some(&self.credentials), identifier.as_str());
@ -651,14 +649,18 @@ impl ObjectStore {
let start = from_start.unwrap_or(0);
let end = len.map(|len| start + len - 1);
req.insert_header(Range::Bytes(vec![if let Some(end) = end {
ByteRangeSpec::FromTo(start, end)
} else {
ByteRangeSpec::From(start)
}]))
req.header(
RANGE,
Range::Bytes(vec![if let Some(end) = end {
ByteRangeSpec::FromTo(start, end)
} else {
ByteRangeSpec::From(start)
}])
.to_string(),
)
}
fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest {
fn head_object_request(&self, identifier: &ObjectId) -> RequestBuilder {
let action = self
.bucket
.head_object(Some(&self.credentials), identifier.as_str());
@ -666,7 +668,7 @@ impl ObjectStore {
self.build_request(action)
}
fn delete_object_request(&self, identifier: &ObjectId) -> ClientRequest {
fn delete_object_request(&self, identifier: &ObjectId) -> RequestBuilder {
let action = self
.bucket
.delete_object(Some(&self.credentials), identifier.as_str());