From ab7fd9aaf7c6ac49c135fdf6cc2eb4f800255e86 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 13:39:27 -0500 Subject: [PATCH 1/8] Start work on using rusty-s3 instead of rust-s3 --- Cargo.lock | 246 ++++++------------------------------- Cargo.toml | 14 +-- src/config/commandline.rs | 4 - src/config/primitives.rs | 5 - src/main.rs | 34 +++--- src/store/object_store.rs | 248 ++++++++++++++++++++++++-------------- 6 files changed, 215 insertions(+), 336 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2a18e9e..41c6cfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -351,29 +351,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "aws-creds" -version = "0.29.1" -source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976" -dependencies = [ - "dirs", - "rust-ini", - "serde", - "serde-xml-rs", - "serde_derive", - "thiserror", - "url", -] - -[[package]] -name = "aws-region" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bdd1c0f4aa70f72812a2f3ec325d6d6162fb80cff093f847b4c394fd78c3643" -dependencies = [ - "thiserror", -] - [[package]] name = "axum" version = "0.5.16" @@ -720,26 +697,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" -dependencies = [ - "dirs-sys", -] - -[[package]] -name = "dirs-sys" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" -dependencies = [ - "libc", - "redox_users", - "winapi", -] - [[package]] name = "dlv-list" version = "0.3.0" @@ -1007,12 +964,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hex" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" - [[package]] name = "hmac" version = "0.12.1" @@ -1092,19 +1043,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" -dependencies = [ - "http", - "hyper", - "rustls", - "tokio", - "tokio-rustls", -] - [[package]] name = "hyper-timeout" version = "0.4.1" @@ -1162,12 +1100,6 @@ dependencies = [ "libc", ] -[[package]] -name = "ipnet" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" - [[package]] name = "itertools" version = "0.10.5" @@ -1280,22 +1212,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" [[package]] -name = "maybe-async" -version = "0.2.6" +name = "md-5" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6007f9dad048e0a224f27ca599d669fca8cfa0dac804725aab542b2eb032bce6" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "proc-macro2", - "quote", - "syn", + "digest", ] -[[package]] -name = "md5" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" - [[package]] name = "memchr" version = "2.5.0" @@ -1649,8 +1573,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "pin-project-lite", - "reqwest", - "rust-s3", + "rusty-s3", "serde", "serde_cbor", "serde_json", @@ -1809,6 +1732,16 @@ dependencies = [ "prost", ] +[[package]] +name = "quick-xml" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37dddbbe9df96afafcb8027fcf263971b726530e12f0787f620a7ba5b4846081" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.21" @@ -1857,17 +1790,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "redox_users" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" -dependencies = [ - "getrandom", - "redox_syscall", - "thiserror", -] - [[package]] name = "regex" version = "1.6.0" @@ -1903,46 +1825,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "reqwest" -version = "0.11.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" -dependencies = [ - "base64", - "bytes", - "encoding_rs", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-rustls", - "ipnet", - "js-sys", - "log", - "mime", - "once_cell", - "percent-encoding", - "pin-project-lite", - "rustls", - "rustls-pemfile", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-rustls", - "tokio-util", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", - "webpki-roots", - "winreg", -] - [[package]] name = "ring" version = "0.16.20" @@ -1988,35 +1870,6 @@ dependencies = [ "ordered-multimap", ] -[[package]] -name = "rust-s3" -version = "0.31.0" -source = "git+https://github.com/asonix/rust-s3?branch=asonix/generic-client#9e450d0038a29040ba5c47ffa570350c3b1ad976" -dependencies = [ - "async-trait", - "aws-creds", - "aws-region", - "base64", - "cfg-if", - "hex", - "hmac", - "http", - "log", - "maybe-async", - "md5", - "percent-encoding", - "reqwest", - "serde", - "serde-xml-rs", - "serde_derive", - "sha2", - "thiserror", - "time", - "tokio", - "tokio-stream", - "url", -] - [[package]] name = "rustc-demangle" version = "0.1.21" @@ -2045,12 +1898,22 @@ dependencies = [ ] [[package]] -name = "rustls-pemfile" -version = "1.0.1" +name = "rusty-s3" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" +checksum = "13332fd1e9538328a80183b9c0bde0cd7065ad2c4405f56b855a51a0a37fffd4" dependencies = [ "base64", + "hmac", + "md-5", + "percent-encoding", + "quick-xml", + "serde", + "serde_json", + "sha2", + "time", + "url", + "zeroize", ] [[package]] @@ -2089,25 +1952,13 @@ checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" [[package]] name = "serde" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" dependencies = [ "serde_derive", ] -[[package]] -name = "serde-xml-rs" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65162e9059be2f6a3421ebbb4fef3e74b7d9e7c60c50a0e292c6239f19f1edfa" -dependencies = [ - "log", - "serde", - "thiserror", - "xml-rs", -] - [[package]] name = "serde_cbor" version = "0.11.2" @@ -2120,9 +1971,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" +checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" dependencies = [ "proc-macro2", "quote", @@ -2843,18 +2694,6 @@ dependencies = [ "wasm-bindgen-shared", ] -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "wasm-bindgen-macro" version = "0.2.83" @@ -2998,21 +2837,6 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" -[[package]] -name = "winreg" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" -dependencies = [ - "winapi", -] - -[[package]] -name = "xml-rs" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" - [[package]] name = "yaml-rust" version = "0.4.5" @@ -3021,3 +2845,9 @@ checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" dependencies = [ "linked-hash-map", ] + +[[package]] +name = "zeroize" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" diff --git a/Cargo.toml b/Cargo.toml index 6364572..86e4ad5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,14 +39,7 @@ once_cell = "1.4.0" opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry-otlp = "0.11" pin-project-lite = "0.2.7" -reqwest = { version = "0.11.5", default-features = false, features = [ - "rustls-tls", - "stream", -] } -rust-s3 = { version = "0.31.0", default-features = false, features = [ - "fail-on-err", - "with-reqwest", -], git = "https://github.com/asonix/rust-s3", branch = "asonix/generic-client" } +rusty-s3 = "0.3.2" serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.2" serde_json = "1.0" @@ -57,7 +50,10 @@ thiserror = "1.0" time = { version = "0.3.0", features = ["serde", "serde-well-known"] } tokio = { version = "1", features = ["full", "tracing"] } tokio-uring = { version = "0.3", optional = true, features = ["bytes"] } -tokio-util = { version = "0.7", default-features = false, features = ["codec"] } +tokio-util = { version = "0.7", default-features = false, features = [ + "codec", + "io", +] } toml = "0.5.8" tracing = "0.1.15" tracing-error = "0.2.0" diff --git a/src/config/commandline.rs b/src/config/commandline.rs index b4aac13..0774d03 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -559,10 +559,6 @@ struct ObjectStorage { #[clap(short, long)] secret_key: Option, - /// The security token for accessing the bucket - #[clap(long)] - security_token: Option, - /// The session token for accessing the bucket #[clap(long)] session_token: Option, diff --git a/src/config/primitives.rs b/src/config/primitives.rs index f48027d..85aa354 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -79,11 +79,6 @@ pub(crate) struct ObjectStorage { #[clap(short, long)] pub(crate) secret_key: String, - /// The security token for accessing the bucket - #[clap(long)] - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) security_token: Option, - /// The session token for accessing the bucket #[clap(long)] #[serde(skip_serializing_if = "Option::is_none")] diff --git a/src/main.rs b/src/main.rs index 8db94a1..5ab88c2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -958,16 +958,10 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error { fn build_client() -> awc::Client { Client::builder() .wrap(Tracing) - .add_default_header(("User-Agent", "pict-rs v0.3.0-main")) + .add_default_header(("User-Agent", "pict-rs v0.4.0-main")) .finish() } -fn build_reqwest_client() -> reqwest::Result { - reqwest::Client::builder() - .user_agent("pict-rs v0.3.0-main") - .build() -} - fn next_worker_id() -> String { static WORKER_ID: AtomicU64 = AtomicU64::new(0); @@ -1092,22 +1086,24 @@ where } } config::Store::ObjectStorage(config::ObjectStorage { + endpoint, bucket_name, + url_style, region, access_key, secret_key, - security_token, session_token, }) => { let to = ObjectStore::build( + endpoint, bucket_name, - region.as_ref().clone(), + url_style, + region.as_ref(), Some(access_key.clone()), Some(secret_key.clone()), - security_token.clone(), session_token.clone(), repo.clone(), - build_reqwest_client()?, + build_client(), ) .await?; @@ -1136,22 +1132,24 @@ async fn main() -> color_eyre::Result<()> { migrate_inner(&repo, from, &to).await?; } config::Store::ObjectStorage(config::ObjectStorage { + endpoint, bucket_name, + url_style, region, access_key, secret_key, - security_token, session_token, }) => { let from = ObjectStore::build( + endpoint, &bucket_name, + url_style, Serde::into_inner(region), Some(access_key), Some(secret_key), - security_token, session_token, repo.clone(), - build_reqwest_client()?, + build_client(), ) .await?; @@ -1173,22 +1171,24 @@ async fn main() -> color_eyre::Result<()> { } } config::Store::ObjectStorage(config::ObjectStorage { + endpoint, bucket_name, + url_style, region, access_key, secret_key, - security_token, session_token, }) => { let store = ObjectStore::build( + endpoint, &bucket_name, + url_style, Serde::into_inner(region), Some(access_key), Some(secret_key), - security_token, session_token, repo.clone(), - build_reqwest_client()?, + build_client(), ) .await?; diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 30af737..e832eb3 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -3,16 +3,25 @@ use crate::{ repo::{Repo, SettingsRepo}, store::Store, }; -use actix_web::web::Bytes; -use futures_util::{Stream, TryStreamExt}; -use s3::{ - client::Client, command::Command, creds::Credentials, error::S3Error, request_trait::Request, - Bucket, Region, +use actix_web::{ + http::{ + header::{ByteRangeSpec, Range, CONTENT_LENGTH}, + StatusCode, + }, + web::Bytes, }; -use std::{pin::Pin, string::FromUtf8Error}; +use awc::{Client, ClientRequest}; +use futures_util::{Stream, TryStreamExt}; +use rusty_s3::{ + actions::{PutObject, S3Action}, + Bucket, Credentials, UrlStyle, +}; +use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio_util::io::ReaderStream; use tracing::Instrument; +use url::Url; mod object_id; pub(crate) use object_id::ObjectId; @@ -33,8 +42,8 @@ pub(crate) enum ObjectError { #[error("Invalid length")] Length, - #[error("Storage error")] - Anyhow(#[from] S3Error), + #[error("Invalid status")] + Status(StatusCode), } #[derive(Clone)] @@ -42,7 +51,8 @@ pub(crate) struct ObjectStore { path_gen: Generator, repo: Repo, bucket: Bucket, - client: reqwest::Client, + credentials: Credentials, + client: Client, } #[async_trait::async_trait(?Send)] @@ -55,26 +65,30 @@ impl Store for ObjectStore { where Reader: AsyncRead + Unpin, { - let path = self.next_file().await?; + let response = self + .put_object_request() + .await? + .send_stream(ReaderStream::new(reader)) + .await?; - self.bucket - .put_object_stream(&self.client, reader, &path) - .await - .map_err(ObjectError::from)?; + if response.status().is_success() { + return Ok(ObjectId::from_string(path)); + } - Ok(ObjectId::from_string(path)) + Err(ObjectError::Status(response.status()).into()) } #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { - let path = self.next_file().await?; + let req = self.put_object_request().await?; - self.bucket - .put_object(&self.client, &path, &bytes) - .await - .map_err(ObjectError::from)?; + let response = req.send_body(bytes).await?; - Ok(ObjectId::from_string(path)) + if response.status().is_success() { + return Ok(ObjectId::from_string(path)); + } + + Err(ObjectError::Status(response.status()).into()) } #[tracing::instrument] @@ -84,38 +98,16 @@ impl Store for ObjectStore { from_start: Option, len: Option, ) -> Result { - let path = identifier.as_str(); + let response = self + .get_object_request(identifier, from_start, len) + .send() + .await?; - let start = from_start.unwrap_or(0); - let end = len.map(|len| start + len - 1); + if response.status.is_success() { + return Ok(Box::pin(response)); + } - let request_span = tracing::trace_span!(parent: None, "Get Object"); - - // NOTE: isolating reqwest in it's own span is to prevent the request's span from getting - // smuggled into a long-lived task. Unfortunately, I am unable to create a minimal - // reproduction of this problem so I can't open a bug about it. - let request = request_span.in_scope(|| { - Client::request( - &self.client, - &self.bucket, - path, - Command::GetObjectRange { start, end }, - ) - }); - - let response = request_span - .in_scope(|| request.response()) - .instrument(request_span.clone()) - .await - .map_err(ObjectError::from)?; - - let stream = request_span.in_scope(|| { - response - .bytes_stream() - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - }); - - Ok(Box::pin(stream)) + Err(ObjectError::Status(response.status()).into()) } #[tracing::instrument(skip(writer))] @@ -127,39 +119,52 @@ impl Store for ObjectStore { where Writer: AsyncWrite + Send + Unpin, { - let path = identifier.as_str(); + let response = self + .get_object_request(identifier, from_start, len) + .send() + .await?; - self.bucket - .get_object_stream(&self.client, path, writer) - .await - .map_err(ObjectError::from) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, Error::from(e)))?; + if !response.status.is_success() { + return Err(ObjectError::Status(response.status()).into()); + } + + while let Some(res) = response.next().await { + let bytes = res?; + writer.write_all_buf(bytes).await?; + } + writer.flush().await?; Ok(()) } #[tracing::instrument] async fn len(&self, identifier: &Self::Identifier) -> Result { - let path = identifier.as_str(); + let response = self.head_object_request(identifier).send().await?; - let (head, _) = self - .bucket - .head_object(&self.client, path) - .await - .map_err(ObjectError::from)?; - let length = head.content_length.ok_or(ObjectError::Length)?; + if !response.status.is_success() { + return Err(ObjectError::Status(response.status()).into()); + } - Ok(length as u64) + let length = response + .headers() + .get(CONTENT_LENGTH) + .ok_or(ObjectError::Length)? + .to_str() + .ok_or(ObjectError::Length) + .parse::() + .map_err(|_| ObjectError::Length)?; + + Ok(length) } #[tracing::instrument] async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { - let path = identifier.as_str(); + let response = self.delete_object_request(identifier).send().await?; + + if !response.status.is_success() { + return Err(ObjectError::Status(response.status()).into()); + } - self.bucket - .delete_object(&self.client, path) - .await - .map_err(ObjectError::from)?; Ok(()) } } @@ -167,11 +172,12 @@ impl Store for ObjectStore { impl ObjectStore { #[allow(clippy::too_many_arguments)] pub(crate) async fn build( + endpoint: Url, bucket_name: &str, - region: Region, + url_style: UrlStyle, + region: &str, access_key: Option, secret_key: Option, - security_token: Option, session_token: Option, repo: Repo, client: reqwest::Client, @@ -181,28 +187,84 @@ impl ObjectStore { Ok(ObjectStore { path_gen, repo, - bucket: Bucket::new( - bucket_name, - match region { - Region::Custom { endpoint, .. } => Region::Custom { - region: String::from(""), - endpoint, - }, - region => region, - }, - Credentials { - access_key, - secret_key, - security_token, - session_token, - }, - ) - .map_err(ObjectError::from)? - .with_path_style(), + bucket: Bucket::new(endpoint, url_style, bucket_name, region) + .map_err(ObjectError::from)?, + credentials: Credentials::new_with_token(access_key, secret_key, session_token), client, }) } + async fn put_object_request(&self) -> Result { + let path = self.next_file().await?; + + let action = self.bucket.put_object(Some(&self.credentials), &path); + + Ok(self.build_request(action)) + } + + fn build_request(&self, action: A) -> ClientRequest { + let method = match A::METHOD { + rusty_s3::Method::Head => awc::http::Method::HEAD, + rusty_s3::Method::Get => awc::http::Method::GET, + rusty_s3::Method::Post => awc::http::Method::POST, + rusty_s3::Method::Put => awc::http::Method::PUT, + rusty_s3::Method::Delete => awc::http::Method::DELETE, + }; + + let url = action.sign(Duration::from_secs(5)); + + let req = self.client.request(method, url); + + action + .headers_mut() + .drain() + .fold(req, |req, tup| req.insert_header(tup)) + } + + fn get_object_request( + &self, + identifier: &ObjectId, + from_start: Option, + len: Option, + ) -> ClientRequest { + let action = self + .bucket + .get_object(Some(&self.credentials), identifier.as_str()); + + let req = self.build_request(action); + + let start = from_start.unwrap_or(0); + let end = len.map(|len| start + len - 1); + + let range = match (start, end) { + (Some(start), Some(end)) => Some(ByteRangeSpec::FromTo(start, end)), + (Some(start), None) => Some(ByteRangeSpec::From(start)), + _ => None, + }; + + if let Some(range) = range { + req.insert_header(Range::Bytes(vec![range])).send().await?; + } else { + req + } + } + + fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest { + let action = self + .bucket + .head_object(Some(&self.credentials), identifier.as_str()); + + self.build_request(action) + } + + fn delete_object_request(&self, identifier: &ObjectId) -> ClientRequest { + let action = self + .bucket + .delete_object(Some(&self.credentials), identifier.as_str()); + + self.build_request(action) + } + async fn next_directory(&self) -> Result { let path = self.path_gen.next(); @@ -243,8 +305,8 @@ impl std::fmt::Debug for ObjectStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ObjectStore") .field("path_gen", &"generator") - .field("bucket", &self.bucket.name) - .field("region", &self.bucket.region) + .field("bucket", &self.bucket.name()) + .field("region", &self.bucket.region()) .finish() } } From ff1771e016e7febb0bc1e15719ca00fbe2783c20 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 14:18:49 -0500 Subject: [PATCH 2/8] More migration work for rusty-s3 --- src/config/commandline.rs | 19 +++++++++++- src/config/primitives.rs | 22 ++++++++++++-- src/main.rs | 50 ++++++++++++++++++++----------- src/store.rs | 5 +++- src/store/file_store.rs | 5 ++++ src/store/object_store.rs | 63 +++++++++++++++++++++++---------------- 6 files changed, 116 insertions(+), 48 deletions(-) diff --git a/src/config/commandline.rs b/src/config/commandline.rs index 0774d03..69d98c4 100644 --- a/src/config/commandline.rs +++ b/src/config/commandline.rs @@ -543,13 +543,30 @@ struct Filesystem { #[derive(Clone, Debug, Parser, serde::Serialize)] #[serde(rename_all = "snake_case")] struct ObjectStorage { + /// The base endpoint for the object storage + /// + /// Examples: + /// - `http://localhost:9000` + /// - `https://s3.dualstack.eu-west-1.amazonaws.com` + #[clap(short, long)] + endpoint: Url, + + /// Determines whether to use path style or virtualhost style for accessing objects + /// + /// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object} + /// When false, objects will be fetched from {bucket_name}.{endpoint}/{object} + #[clap(short, long)] + use_path_style: bool, + /// The bucket in which to store media #[clap(short, long)] bucket_name: Option, /// The region the bucket is located in + /// + /// For minio deployments, this can just be 'minio' #[clap(short, long)] - region: Option>, + region: Option, /// The Access Key for the user accessing the bucket #[clap(short, long)] diff --git a/src/config/primitives.rs b/src/config/primitives.rs index 85aa354..1382499 100644 --- a/src/config/primitives.rs +++ b/src/config/primitives.rs @@ -1,8 +1,8 @@ use crate::magick::ValidInputType; -use crate::serde_str::Serde; use clap::ArgEnum; use std::{fmt::Display, path::PathBuf, str::FromStr}; use tracing::Level; +use url::Url; #[derive( Clone, @@ -63,13 +63,28 @@ pub(crate) struct Filesystem { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, clap::Parser)] #[serde(rename_all = "snake_case")] pub(crate) struct ObjectStorage { + /// The base endpoint for the object storage + /// + /// Examples: + /// - `http://localhost:9000` + /// - `https://s3.dualstack.eu-west-1.amazonaws.com` + #[clap(short, long)] + pub(crate) endpoint: Url, + + /// Determines whether to use path style or virtualhost style for accessing objects + /// + /// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object} + /// When false, objects will be fetched from {bucket_name}.{endpoint}/{object} + #[clap(short, long)] + pub(crate) use_path_style: bool, + /// The bucket in which to store media #[clap(short, long)] pub(crate) bucket_name: String, /// The region the bucket is located in #[clap(short, long)] - pub(crate) region: Serde, + pub(crate) region: String, /// The Access Key for the user accessing the bucket #[clap(short, long)] @@ -219,7 +234,8 @@ impl Display for LogFormat { #[cfg(test)] mod tests { - use super::{Serde, Targets}; + use super::Targets; + use crate::serde_str::Serde; #[test] fn builds_info_targets() { diff --git a/src/main.rs b/src/main.rs index 5ab88c2..e8d44b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use futures_util::{ Stream, StreamExt, TryStreamExt, }; use once_cell::sync::Lazy; +use rusty_s3::UrlStyle; use std::{ future::ready, path::PathBuf, @@ -970,15 +971,15 @@ fn next_worker_id() -> String { format!("{}-{}", CONFIG.server.worker_id, next_id) } -async fn launch( +async fn launch( repo: R, - store: S, + store: S::Config, ) -> color_eyre::Result<()> { repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .await?; HttpServer::new(move || { - let store = store.clone(); + let store = S::init(store.clone()); let repo = repo.clone(); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { @@ -1081,6 +1082,8 @@ where match to { config::Store::Filesystem(config::Filesystem { path }) => { let to = FileStore::build(path.clone(), repo.clone()).await?; + let to = FileStore::init(to); + match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, } @@ -1088,25 +1091,30 @@ where config::Store::ObjectStorage(config::ObjectStorage { endpoint, bucket_name, - url_style, + use_path_style, region, access_key, secret_key, session_token, }) => { let to = ObjectStore::build( - endpoint, + endpoint.clone(), bucket_name, - url_style, + if *use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, region.as_ref(), Some(access_key.clone()), Some(secret_key.clone()), session_token.clone(), repo.clone(), - build_client(), ) .await?; + let to = ObjectStore::init(to); + match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, } @@ -1129,29 +1137,34 @@ async fn main() -> color_eyre::Result<()> { match from { config::Store::Filesystem(config::Filesystem { path }) => { let from = FileStore::build(path.clone(), repo.clone()).await?; + let from = FileStore::init(from); migrate_inner(&repo, from, &to).await?; } config::Store::ObjectStorage(config::ObjectStorage { endpoint, bucket_name, - url_style, + use_path_style, region, access_key, secret_key, session_token, }) => { let from = ObjectStore::build( - endpoint, + endpoint.clone(), &bucket_name, - url_style, + if *use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, Serde::into_inner(region), Some(access_key), Some(secret_key), session_token, repo.clone(), - build_client(), ) .await?; + let from = ObjectStore::init(from); migrate_inner(&repo, from, &to).await?; } @@ -1167,33 +1180,36 @@ async fn main() -> color_eyre::Result<()> { let store = FileStore::build(path, repo.clone()).await?; match repo { - Repo::Sled(sled_repo) => launch(sled_repo, store).await, + Repo::Sled(sled_repo) => launch::<_, FileStore>(sled_repo, store).await, } } config::Store::ObjectStorage(config::ObjectStorage { endpoint, bucket_name, - url_style, + use_path_style, region, access_key, secret_key, session_token, }) => { let store = ObjectStore::build( - endpoint, + endpoint.clone(), &bucket_name, - url_style, + if *use_path_style { + UrlStyle::Path + } else { + UrlStyle::VirtualHost + }, Serde::into_inner(region), Some(access_key), Some(secret_key), session_token, repo.clone(), - build_client(), ) .await?; match repo { - Repo::Sled(sled_repo) => launch(sled_repo, store).await, + Repo::Sled(sled_repo) => launch::<_, ObjectStore>(sled_repo, store).await, } } } diff --git a/src/store.rs b/src/store.rs index 1df8834..bfa4c66 100644 --- a/src/store.rs +++ b/src/store.rs @@ -16,10 +16,13 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug { } #[async_trait::async_trait(?Send)] -pub(crate) trait Store: Send + Sync + Clone + Debug { +pub(crate) trait Store: Clone + Debug { + type Config: Send + Sync + Clone; type Identifier: Identifier + 'static; type Stream: Stream> + 'static; + fn init(config: Self::Config) -> Self; + async fn save_async_read(&self, reader: &mut Reader) -> Result where Reader: AsyncRead + Unpin; diff --git a/src/store/file_store.rs b/src/store/file_store.rs index a93dee8..6d11ff6 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -49,9 +49,14 @@ pub(crate) struct FileStore { #[async_trait::async_trait(?Send)] impl Store for FileStore { + type Config = Self; type Identifier = FileId; type Stream = Pin>>>; + fn init(config: Self::Config) -> Self { + config + } + #[tracing::instrument(skip(reader))] async fn save_async_read(&self, reader: &mut Reader) -> Result where diff --git a/src/store/object_store.rs b/src/store/object_store.rs index e832eb3..a7e7888 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -12,15 +12,11 @@ use actix_web::{ }; use awc::{Client, ClientRequest}; use futures_util::{Stream, TryStreamExt}; -use rusty_s3::{ - actions::{PutObject, S3Action}, - Bucket, Credentials, UrlStyle, -}; +use rusty_s3::{actions::S3Action, Bucket, Credentials, UrlStyle}; use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::io::ReaderStream; -use tracing::Instrument; use url::Url; mod object_id; @@ -55,24 +51,41 @@ pub(crate) struct ObjectStore { client: Client, } +#[derive(Clone)] +pub(crate) struct ObjectStoreConfig { + path_gen: Generator, + repo: Repo, + bucket: Bucket, + credentials: Credentials, +} + #[async_trait::async_trait(?Send)] impl Store for ObjectStore { + type Config = ObjectStoreConfig; type Identifier = ObjectId; type Stream = Pin>>>; + fn init(config: Self::Config) -> Self { + ObjectStore { + path_gen: config.path_gen, + repo: config.repo, + bucket: config.bucket, + credentials: config.credentials, + client: crate::build_client(), + } + } + #[tracing::instrument(skip(reader))] async fn save_async_read(&self, reader: &mut Reader) -> Result where Reader: AsyncRead + Unpin, { - let response = self - .put_object_request() - .await? - .send_stream(ReaderStream::new(reader)) - .await?; + let (req, object_id) = self.put_object_request().await?; + + let response = req.send_stream(ReaderStream::new(reader)).await?; if response.status().is_success() { - return Ok(ObjectId::from_string(path)); + return Ok(object_id); } Err(ObjectError::Status(response.status()).into()) @@ -80,12 +93,12 @@ impl Store for ObjectStore { #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { - let req = self.put_object_request().await?; + let (req, object_id) = self.put_object_request().await?; let response = req.send_body(bytes).await?; if response.status().is_success() { - return Ok(ObjectId::from_string(path)); + return Ok(object_id); } Err(ObjectError::Status(response.status()).into()) @@ -103,7 +116,7 @@ impl Store for ObjectStore { .send() .await?; - if response.status.is_success() { + if response.status().is_success() { return Ok(Box::pin(response)); } @@ -120,11 +133,11 @@ impl Store for ObjectStore { Writer: AsyncWrite + Send + Unpin, { let response = self - .get_object_request(identifier, from_start, len) + .get_object_request(identifier, None, None) .send() .await?; - if !response.status.is_success() { + if !response.status().is_success() { return Err(ObjectError::Status(response.status()).into()); } @@ -141,7 +154,7 @@ impl Store for ObjectStore { async fn len(&self, identifier: &Self::Identifier) -> Result { let response = self.head_object_request(identifier).send().await?; - if !response.status.is_success() { + if !response.status().is_success() { return Err(ObjectError::Status(response.status()).into()); } @@ -161,7 +174,7 @@ impl Store for ObjectStore { async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { let response = self.delete_object_request(identifier).send().await?; - if !response.status.is_success() { + if !response.status().is_success() { return Err(ObjectError::Status(response.status()).into()); } @@ -180,29 +193,27 @@ impl ObjectStore { secret_key: Option, session_token: Option, repo: Repo, - client: reqwest::Client, - ) -> Result { + ) -> Result { let path_gen = init_generator(&repo).await?; - Ok(ObjectStore { + Ok(ObjectStoreConfig { path_gen, repo, bucket: Bucket::new(endpoint, url_style, bucket_name, region) .map_err(ObjectError::from)?, credentials: Credentials::new_with_token(access_key, secret_key, session_token), - client, }) } - async fn put_object_request(&self) -> Result { + async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), Error> { let path = self.next_file().await?; let action = self.bucket.put_object(Some(&self.credentials), &path); - Ok(self.build_request(action)) + Ok((self.build_request(action), ObjectId::from_string(path))) } - fn build_request(&self, action: A) -> ClientRequest { + fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest { let method = match A::METHOD { rusty_s3::Method::Head => awc::http::Method::HEAD, rusty_s3::Method::Get => awc::http::Method::GET, @@ -243,7 +254,7 @@ impl ObjectStore { }; if let Some(range) = range { - req.insert_header(Range::Bytes(vec![range])).send().await?; + req.insert_header(Range::Bytes(vec![range])) } else { req } From 25209e29c035cec50ca3b0fac56be07ead8be2fb Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 17:18:53 -0500 Subject: [PATCH 3/8] Make it compile --- Cargo.lock | 7 +- Cargo.toml | 2 +- src/backgrounded.rs | 8 +-- src/generate.rs | 4 +- src/ingest.rs | 9 +-- src/ingest/hasher.rs | 18 +++--- src/main.rs | 104 +++++++++++++++--------------- src/store.rs | 47 ++++++++++---- src/store/file_store.rs | 55 ++++++---------- src/store/object_store.rs | 130 ++++++++++++++++++++++++-------------- 10 files changed, 216 insertions(+), 168 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41c6cfe..e07d1d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,7 +22,8 @@ dependencies = [ [[package]] name = "actix-form-data" version = "0.7.0-beta.0" -source = "git+https://git.asonix.dog/asonix/actix-form-data?branch=v0.7.x#3525bcd09cd030df3f2ed7684f2aad1bcc42d68b" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e721f3919cb43c566c0dbb6a9cb5ad5106ac42b6b3c0d21a7a3e762455de957a" dependencies = [ "actix-multipart", "actix-rt", @@ -1672,9 +1673,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.43" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" +checksum = "7bd7356a8122b6c4a24a82b278680c73357984ca2fc79a0f9fa6dea7dced7c58" dependencies = [ "unicode-ident", ] diff --git a/Cargo.toml b/Cargo.toml index 86e4ad5..cc58ce3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ io-uring = [ ] [dependencies] -actix-form-data = { version = "0.7.0-beta.0", git = "https://git.asonix.dog/asonix/actix-form-data", branch = "v0.7.x" } +actix-form-data = "0.7.0-beta.0" actix-rt = { version = "2.7.0", default-features = false } actix-server = "2.0.0" actix-web = { version = "4.0.0", default-features = false } diff --git a/src/backgrounded.rs b/src/backgrounded.rs index 73eb5e1..4049fbe 100644 --- a/src/backgrounded.rs +++ b/src/backgrounded.rs @@ -5,7 +5,6 @@ use crate::{ }; use actix_web::web::Bytes; use futures_util::{Stream, TryStreamExt}; -use tokio_util::io::StreamReader; use tracing::{Instrument, Span}; pub(crate) struct Backgrounded @@ -38,7 +37,7 @@ where pub(crate) async fn proxy

(repo: R, store: S, stream: P) -> Result where - P: Stream>, + P: Stream> + Unpin + 'static, { let mut this = Self { repo, @@ -53,14 +52,13 @@ where async fn do_proxy

(&mut self, store: S, stream: P) -> Result<(), Error> where - P: Stream>, + P: Stream> + Unpin + 'static, { UploadRepo::create(&self.repo, self.upload_id.expect("Upload id exists")).await?; let stream = stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); - let mut reader = StreamReader::new(Box::pin(stream)); - let identifier = store.save_async_read(&mut reader).await?; + let identifier = store.save_stream(stream).await?; self.identifier = Some(identifier); diff --git a/src/generate.rs b/src/generate.rs index 596a5a0..ae7ecb9 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -57,14 +57,14 @@ async fn process( identifier } else { let identifier = repo.identifier(hash.clone()).await?; - let mut reader = crate::ffmpeg::thumbnail( + let reader = crate::ffmpeg::thumbnail( store.clone(), identifier, InputFormat::Mp4, ThumbnailFormat::Jpeg, ) .await?; - let motion_identifier = store.save_async_read(&mut reader).await?; + let motion_identifier = store.save_async_read(reader).await?; repo.relate_motion_identifier(hash.clone(), &motion_identifier) .await?; diff --git a/src/ingest.rs b/src/ingest.rs index f4c49c7..e3a3fc6 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -55,7 +55,7 @@ where pub(crate) async fn ingest( repo: &R, store: &S, - stream: impl Stream>, + stream: impl Stream> + Unpin + 'static, declared_alias: Option, should_validate: bool, is_cached: bool, @@ -77,9 +77,10 @@ where ) .await?; - let mut hasher_reader = Hasher::new(validated_reader, Sha256::new()); + let hasher_reader = Hasher::new(validated_reader, Sha256::new()); + let hasher = hasher_reader.hasher(); - let identifier = store.save_async_read(&mut hasher_reader).await?; + let identifier = store.save_async_read(hasher_reader).await?; drop(permit); @@ -90,7 +91,7 @@ where identifier: Some(identifier.clone()), }; - let hash = hasher_reader.finalize_reset().await?; + let hash = hasher.borrow_mut().finalize_reset().to_vec(); session.hash = Some(hash.clone()); diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index 71708ef..a3ce995 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -1,8 +1,8 @@ -use crate::error::Error; -use actix_web::web; use sha2::{digest::FixedOutputReset, Digest}; use std::{ + cell::RefCell, pin::Pin, + rc::Rc, task::{Context, Poll}, }; use tokio::io::{AsyncRead, ReadBuf}; @@ -12,7 +12,7 @@ pin_project_lite::pin_project! { #[pin] inner: I, - hasher: D, + hasher: Rc>, } } @@ -23,14 +23,12 @@ where pub(super) fn new(reader: I, digest: D) -> Self { Hasher { inner: reader, - hasher: digest, + hasher: Rc::new(RefCell::new(digest)), } } - pub(super) async fn finalize_reset(self) -> Result, Error> { - let mut hasher = self.hasher; - let hash = web::block(move || hasher.finalize_reset().to_vec()).await?; - Ok(hash) + pub(super) fn hasher(&self) -> Rc> { + Rc::clone(&self.hasher) } } @@ -53,7 +51,9 @@ where let poll_res = reader.poll_read(cx, buf); let after_len = buf.filled().len(); if after_len > before_len { - hasher.update(&buf.filled()[before_len..after_len]); + hasher + .borrow_mut() + .update(&buf.filled()[before_len..after_len]); } poll_res } diff --git a/src/main.rs b/src/main.rs index e8d44b9..2d83b9f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,7 +66,11 @@ use self::{ UploadResult, }, serde_str::Serde, - store::{file_store::FileStore, object_store::ObjectStore, Identifier, Store}, + store::{ + file_store::FileStore, + object_store::{ObjectStore, ObjectStoreConfig}, + Identifier, Store, StoreConfig, + }, stream::{StreamLimit, StreamTimeout}, }; @@ -449,7 +453,7 @@ async fn download( #[instrument(name = "Downloading file inline", skip(stream))] async fn do_download_inline( - stream: impl Stream>, + stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, is_cached: bool, @@ -475,7 +479,7 @@ async fn do_download_inline( #[instrument(name = "Downloading file in background", skip(stream))] async fn do_download_backgrounded( - stream: impl Stream>, + stream: impl Stream> + Unpin + 'static, repo: web::Data, store: web::Data, is_cached: bool, @@ -971,15 +975,15 @@ fn next_worker_id() -> String { format!("{}-{}", CONFIG.server.worker_id, next_id) } -async fn launch( +async fn launch( repo: R, - store: S::Config, + store_config: SC, ) -> color_eyre::Result<()> { repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) .await?; HttpServer::new(move || { - let store = S::init(store.clone()); + let store = store_config.clone().build(); let repo = repo.clone(); tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { @@ -1008,20 +1012,23 @@ async fn launch( .service( web::resource("") .guard(guard::Post()) - .route(web::post().to(upload::)), + .route(web::post().to(upload::)), ) .service( web::scope("/backgrounded") .service( web::resource("") .guard(guard::Post()) - .route(web::post().to(upload_backgrounded::)), + .route(web::post().to(upload_backgrounded::)), ) .service( - web::resource("/claim").route(web::get().to(claim_upload::)), + web::resource("/claim") + .route(web::get().to(claim_upload::)), ), ) - .service(web::resource("/download").route(web::get().to(download::))) + .service( + web::resource("/download").route(web::get().to(download::)), + ) .service( web::resource("/delete/{delete_token}/{filename}") .route(web::delete().to(delete::)) @@ -1029,27 +1036,27 @@ async fn launch( ) .service( web::resource("/original/{filename}") - .route(web::get().to(serve::)) - .route(web::head().to(serve_head::)), + .route(web::get().to(serve::)) + .route(web::head().to(serve_head::)), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process::)) - .route(web::head().to(process_head::)), + .route(web::get().to(process::)) + .route(web::head().to(process_head::)), ) .service( web::resource("/process_backgrounded.{ext}") - .route(web::get().to(process_backgrounded::)), + .route(web::get().to(process_backgrounded::)), ) .service( web::scope("/details") .service( web::resource("/original/{filename}") - .route(web::get().to(details::)), + .route(web::get().to(details::)), ) .service( web::resource("/process.{ext}") - .route(web::get().to(process_details::)), + .route(web::get().to(process_details::)), ), ), ) @@ -1058,7 +1065,7 @@ async fn launch( .wrap(Internal( CONFIG.server.api_key.as_ref().map(|s| s.to_owned()), )) - .service(web::resource("/import").route(web::post().to(import::))) + .service(web::resource("/import").route(web::post().to(import::))) .service( web::resource("/variants").route(web::delete().to(clean_variants::)), ) @@ -1075,14 +1082,13 @@ async fn launch( Ok(()) } -async fn migrate_inner(repo: &Repo, from: S1, to: &config::Store) -> color_eyre::Result<()> +async fn migrate_inner(repo: &Repo, from: S1, to: config::Store) -> color_eyre::Result<()> where S1: Store, { match to { config::Store::Filesystem(config::Filesystem { path }) => { - let to = FileStore::build(path.clone(), repo.clone()).await?; - let to = FileStore::init(to); + let to = FileStore::build(path.clone(), repo.clone()).await?.build(); match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, @@ -1100,20 +1106,19 @@ where let to = ObjectStore::build( endpoint.clone(), bucket_name, - if *use_path_style { + if use_path_style { UrlStyle::Path } else { UrlStyle::VirtualHost }, - region.as_ref(), - Some(access_key.clone()), - Some(secret_key.clone()), - session_token.clone(), + region, + access_key, + secret_key, + session_token, repo.clone(), ) - .await?; - - let to = ObjectStore::init(to); + .await? + .build(); match repo { Repo::Sled(repo) => migrate_store(repo, from, to).await?, @@ -1136,9 +1141,8 @@ async fn main() -> color_eyre::Result<()> { Operation::MigrateStore { from, to } => { match from { config::Store::Filesystem(config::Filesystem { path }) => { - let from = FileStore::build(path.clone(), repo.clone()).await?; - let from = FileStore::init(from); - migrate_inner(&repo, from, &to).await?; + let from = FileStore::build(path.clone(), repo.clone()).await?.build(); + migrate_inner(&repo, from, to).await?; } config::Store::ObjectStorage(config::ObjectStorage { endpoint, @@ -1150,23 +1154,23 @@ async fn main() -> color_eyre::Result<()> { session_token, }) => { let from = ObjectStore::build( - endpoint.clone(), - &bucket_name, - if *use_path_style { + endpoint, + bucket_name, + if use_path_style { UrlStyle::Path } else { UrlStyle::VirtualHost }, - Serde::into_inner(region), - Some(access_key), - Some(secret_key), + region, + access_key, + secret_key, session_token, repo.clone(), ) - .await?; - let from = ObjectStore::init(from); + .await? + .build(); - migrate_inner(&repo, from, &to).await?; + migrate_inner(&repo, from, to).await?; } } @@ -1193,23 +1197,23 @@ async fn main() -> color_eyre::Result<()> { session_token, }) => { let store = ObjectStore::build( - endpoint.clone(), - &bucket_name, - if *use_path_style { + endpoint, + bucket_name, + if use_path_style { UrlStyle::Path } else { UrlStyle::VirtualHost }, - Serde::into_inner(region), - Some(access_key), - Some(secret_key), + region, + access_key, + secret_key, session_token, repo.clone(), ) .await?; match repo { - Repo::Sled(sled_repo) => launch::<_, ObjectStore>(sled_repo, store).await, + Repo::Sled(sled_repo) => launch::<_, ObjectStoreConfig>(sled_repo, store).await, } } } @@ -1271,10 +1275,8 @@ where S2: Store, { let stream = from.to_stream(identifier, None, None).await?; - futures_util::pin_mut!(stream); - let mut reader = tokio_util::io::StreamReader::new(stream); - let new_identifier = to.save_async_read(&mut reader).await?; + let new_identifier = to.save_stream(stream).await?; Ok(new_identifier) } diff --git a/src/store.rs b/src/store.rs index bfa4c66..def70ce 100644 --- a/src/store.rs +++ b/src/store.rs @@ -15,17 +15,24 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug { Self: Sized; } +pub(crate) trait StoreConfig: Send + Sync + Clone { + type Store: Store; + + fn build(self) -> Self::Store; +} + #[async_trait::async_trait(?Send)] pub(crate) trait Store: Clone + Debug { - type Config: Send + Sync + Clone; type Identifier: Identifier + 'static; - type Stream: Stream> + 'static; + type Stream: Stream> + Unpin + 'static; - fn init(config: Self::Config) -> Self; - - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin; + Reader: AsyncRead + Unpin + 'static; + + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static; async fn save_bytes(&self, bytes: Bytes) -> Result; @@ -42,7 +49,7 @@ pub(crate) trait Store: Clone + Debug { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin; + Writer: AsyncWrite + Unpin; async fn len(&self, identifier: &Self::Identifier) -> Result; @@ -57,13 +64,20 @@ where type Identifier = T::Identifier; type Stream = T::Stream; - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader).await } + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + T::save_stream(self, stream).await + } + async fn save_bytes(&self, bytes: Bytes) -> Result { T::save_bytes(self, bytes).await } @@ -83,7 +97,7 @@ where writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { T::read_into(self, identifier, writer).await } @@ -105,13 +119,20 @@ where type Identifier = T::Identifier; type Stream = T::Stream; - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { T::save_async_read(self, reader).await } + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + T::save_stream(self, stream).await + } + async fn save_bytes(&self, bytes: Bytes) -> Result { T::save_bytes(self, bytes).await } @@ -131,7 +152,7 @@ where writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { T::read_into(self, identifier, writer).await } diff --git a/src/store/file_store.rs b/src/store/file_store.rs index 6d11ff6..fde1b81 100644 --- a/src/store/file_store.rs +++ b/src/store/file_store.rs @@ -2,7 +2,7 @@ use crate::{ error::Error, file::File, repo::{Repo, SettingsRepo}, - store::Store, + store::{Store, StoreConfig}, }; use actix_web::web::Bytes; use futures_util::stream::Stream; @@ -12,6 +12,7 @@ use std::{ }; use storage_path_generator::Generator; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::io::StreamReader; use tracing::{debug, error, instrument, Instrument}; mod file_id; @@ -47,24 +48,27 @@ pub(crate) struct FileStore { repo: Repo, } +impl StoreConfig for FileStore { + type Store = FileStore; + + fn build(self) -> Self::Store { + self + } +} + #[async_trait::async_trait(?Send)] impl Store for FileStore { - type Config = Self; type Identifier = FileId; type Stream = Pin>>>; - fn init(config: Self::Config) -> Self { - config - } - #[tracing::instrument(skip(reader))] - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, mut reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, { let path = self.next_file().await?; - if let Err(e) = self.safe_save_reader(&path, reader).await { + if let Err(e) = self.safe_save_reader(&path, &mut reader).await { self.safe_remove_file(&path).await?; return Err(e.into()); } @@ -72,6 +76,13 @@ impl Store for FileStore { Ok(self.file_id_from_path(path)?) } + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, + { + self.save_async_read(StreamReader::new(stream)).await + } + #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { let path = self.next_file().await?; @@ -114,7 +125,7 @@ impl Store for FileStore { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { let path = self.path_from_file_id(identifier); @@ -260,30 +271,6 @@ impl FileStore { Ok(()) } - - // try moving a file - #[instrument(name = "Moving file", fields(from = tracing::field::debug(&from.as_ref()), to = tracing::field::debug(&to.as_ref())))] - pub(crate) async fn safe_move_file, Q: AsRef>( - &self, - from: P, - to: Q, - ) -> Result<(), FileError> { - safe_create_parent(&to).await?; - - debug!("Checking if {:?} already exists", to.as_ref()); - if let Err(e) = tokio::fs::metadata(&to).await { - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e.into()); - } - } else { - return Err(FileError::FileExists); - } - - debug!("Moving {:?} to {:?}", from.as_ref(), to.as_ref()); - tokio::fs::copy(&from, &to).await?; - self.safe_remove_file(from).await?; - Ok(()) - } } pub(crate) async fn safe_create_parent>(path: P) -> Result<(), FileError> { diff --git a/src/store/object_store.rs b/src/store/object_store.rs index a7e7888..a124808 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -1,18 +1,19 @@ use crate::{ error::Error, repo::{Repo, SettingsRepo}, - store::Store, + store::{Store, StoreConfig}, }; use actix_web::{ + error::PayloadError, http::{ header::{ByteRangeSpec, Range, CONTENT_LENGTH}, StatusCode, }, web::Bytes, }; -use awc::{Client, ClientRequest}; -use futures_util::{Stream, TryStreamExt}; -use rusty_s3::{actions::S3Action, Bucket, Credentials, UrlStyle}; +use awc::{error::SendRequestError, Client, ClientRequest}; +use futures_util::{Stream, StreamExt, TryStreamExt}; +use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle}; use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; @@ -32,6 +33,12 @@ pub(crate) enum ObjectError { #[error("Failed to generate path")] PathGenerator(#[from] storage_path_generator::PathError), + #[error("Failed to generate request")] + S3(#[from] BucketError), + + #[error("Error making request")] + SendRequest(String), + #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), @@ -42,6 +49,12 @@ pub(crate) enum ObjectError { Status(StatusCode), } +impl From for ObjectError { + fn from(e: SendRequestError) -> Self { + Self::SendRequest(e.to_string()) + } +} + #[derive(Clone)] pub(crate) struct ObjectStore { path_gen: Generator, @@ -59,30 +72,48 @@ pub(crate) struct ObjectStoreConfig { credentials: Credentials, } -#[async_trait::async_trait(?Send)] -impl Store for ObjectStore { - type Config = ObjectStoreConfig; - type Identifier = ObjectId; - type Stream = Pin>>>; +impl StoreConfig for ObjectStoreConfig { + type Store = ObjectStore; - fn init(config: Self::Config) -> Self { + fn build(self) -> Self::Store { ObjectStore { - path_gen: config.path_gen, - repo: config.repo, - bucket: config.bucket, - credentials: config.credentials, + path_gen: self.path_gen, + repo: self.repo, + bucket: self.bucket, + credentials: self.credentials, client: crate::build_client(), } } +} + +fn payload_to_io_error(e: PayloadError) -> std::io::Error { + match e { + PayloadError::Io(io) => io, + otherwise => std::io::Error::new(std::io::ErrorKind::Other, otherwise.to_string()), + } +} + +#[async_trait::async_trait(?Send)] +impl Store for ObjectStore { + type Identifier = ObjectId; + type Stream = Pin>>>; #[tracing::instrument(skip(reader))] - async fn save_async_read(&self, reader: &mut Reader) -> Result + async fn save_async_read(&self, reader: Reader) -> Result where - Reader: AsyncRead + Unpin, + Reader: AsyncRead + Unpin + 'static, + { + self.save_stream(ReaderStream::new(reader)).await + } + + #[tracing::instrument(skip(stream))] + async fn save_stream(&self, stream: S) -> Result + where + S: Stream> + Unpin + 'static, { let (req, object_id) = self.put_object_request().await?; - let response = req.send_stream(ReaderStream::new(reader)).await?; + let response = req.send_stream(stream).await.map_err(ObjectError::from)?; if response.status().is_success() { return Ok(object_id); @@ -95,7 +126,7 @@ impl Store for ObjectStore { async fn save_bytes(&self, bytes: Bytes) -> Result { let (req, object_id) = self.put_object_request().await?; - let response = req.send_body(bytes).await?; + let response = req.send_body(bytes).await.map_err(ObjectError::from)?; if response.status().is_success() { return Ok(object_id); @@ -114,10 +145,11 @@ impl Store for ObjectStore { let response = self .get_object_request(identifier, from_start, len) .send() - .await?; + .await + .map_err(ObjectError::from)?; if response.status().is_success() { - return Ok(Box::pin(response)); + return Ok(Box::pin(response.map_err(payload_to_io_error))); } Err(ObjectError::Status(response.status()).into()) @@ -130,20 +162,24 @@ impl Store for ObjectStore { writer: &mut Writer, ) -> Result<(), std::io::Error> where - Writer: AsyncWrite + Send + Unpin, + Writer: AsyncWrite + Unpin, { - let response = self + let mut response = self .get_object_request(identifier, None, None) .send() - .await?; + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, ObjectError::from(e)))?; if !response.status().is_success() { - return Err(ObjectError::Status(response.status()).into()); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + ObjectError::Status(response.status()), + )); } while let Some(res) = response.next().await { - let bytes = res?; - writer.write_all_buf(bytes).await?; + let mut bytes = res.map_err(payload_to_io_error)?; + writer.write_all_buf(&mut bytes).await?; } writer.flush().await?; @@ -152,7 +188,11 @@ impl Store for ObjectStore { #[tracing::instrument] async fn len(&self, identifier: &Self::Identifier) -> Result { - let response = self.head_object_request(identifier).send().await?; + let response = self + .head_object_request(identifier) + .send() + .await + .map_err(ObjectError::from)?; if !response.status().is_success() { return Err(ObjectError::Status(response.status()).into()); @@ -163,7 +203,7 @@ impl Store for ObjectStore { .get(CONTENT_LENGTH) .ok_or(ObjectError::Length)? .to_str() - .ok_or(ObjectError::Length) + .map_err(|_| ObjectError::Length)? .parse::() .map_err(|_| ObjectError::Length)?; @@ -186,11 +226,11 @@ impl ObjectStore { #[allow(clippy::too_many_arguments)] pub(crate) async fn build( endpoint: Url, - bucket_name: &str, + bucket_name: String, url_style: UrlStyle, - region: &str, - access_key: Option, - secret_key: Option, + region: String, + access_key: String, + secret_key: String, session_token: Option, repo: Repo, ) -> Result { @@ -201,7 +241,11 @@ impl ObjectStore { repo, bucket: Bucket::new(endpoint, url_style, bucket_name, region) .map_err(ObjectError::from)?, - credentials: Credentials::new_with_token(access_key, secret_key, session_token), + credentials: if let Some(token) = session_token { + Credentials::new_with_token(access_key, secret_key, token) + } else { + Credentials::new(access_key, secret_key) + }, }) } @@ -213,7 +257,7 @@ impl ObjectStore { Ok((self.build_request(action), ObjectId::from_string(path))) } - fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest { + fn build_request<'a, A: S3Action<'a>>(&'a self, mut action: A) -> ClientRequest { let method = match A::METHOD { rusty_s3::Method::Head => awc::http::Method::HEAD, rusty_s3::Method::Get => awc::http::Method::GET, @@ -224,11 +268,11 @@ impl ObjectStore { let url = action.sign(Duration::from_secs(5)); - let req = self.client.request(method, url); + let req = self.client.request(method, url.as_str()); action .headers_mut() - .drain() + .iter() .fold(req, |req, tup| req.insert_header(tup)) } @@ -247,17 +291,11 @@ impl ObjectStore { let start = from_start.unwrap_or(0); let end = len.map(|len| start + len - 1); - let range = match (start, end) { - (Some(start), Some(end)) => Some(ByteRangeSpec::FromTo(start, end)), - (Some(start), None) => Some(ByteRangeSpec::From(start)), - _ => None, - }; - - if let Some(range) = range { - req.insert_header(Range::Bytes(vec![range])) + req.insert_header(Range::Bytes(vec![if let Some(end) = end { + ByteRangeSpec::FromTo(start, end) } else { - req - } + ByteRangeSpec::From(start) + }])) } fn head_object_request(&self, identifier: &ObjectId) -> ClientRequest { From a6adde874e1a45d50fa2ee694354a3b2f307d7d9 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 20:33:59 -0500 Subject: [PATCH 4/8] Implement s3 multipart uploads --- Cargo.lock | 2 + Cargo.toml | 2 + src/store/object_store.rs | 236 ++++++++++++++++++++++++++++++++++---- 3 files changed, 217 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e07d1d5..aa85935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1568,12 +1568,14 @@ dependencies = [ "console-subscriber", "dashmap", "futures-util", + "md-5", "mime", "num_cpus", "once_cell", "opentelemetry", "opentelemetry-otlp", "pin-project-lite", + "quick-xml", "rusty-s3", "serde", "serde_cbor", diff --git a/Cargo.toml b/Cargo.toml index cc58ce3..b747da4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ config = "0.13.0" console-subscriber = "0.1" dashmap = "5.1.0" futures-util = "0.3.17" +md-5 = "0.10.5" mime = "0.3.1" num_cpus = "1.13" once_cell = "1.4.0" @@ -43,6 +44,7 @@ rusty-s3 = "0.3.2" serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.2" serde_json = "1.0" +quick-xml = { version = "0.24.1", features = ["serialize"] } sha2 = "0.10.0" sled = { version = "0.34.7" } storage-path-generator = "0.1.0" diff --git a/src/store/object_store.rs b/src/store/object_store.rs index a124808..5a7c0e3 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -9,9 +9,9 @@ use actix_web::{ header::{ByteRangeSpec, Range, CONTENT_LENGTH}, StatusCode, }, - web::Bytes, + web::{Bytes, BytesMut}, }; -use awc::{error::SendRequestError, Client, ClientRequest}; +use awc::{error::SendRequestError, Client, ClientRequest, SendClientRequest}; use futures_util::{Stream, StreamExt, TryStreamExt}; use rusty_s3::{actions::S3Action, Bucket, BucketError, Credentials, UrlStyle}; use std::{pin::Pin, string::FromUtf8Error, time::Duration}; @@ -23,6 +23,8 @@ use url::Url; mod object_id; pub(crate) use object_id::ObjectId; +const CHUNK_SIZE: usize = 8_388_608; // 8 Mebibytes, min is 5 (5_242_880); + // - Settings Tree // - last-path -> last generated path @@ -42,11 +44,17 @@ pub(crate) enum ObjectError { #[error("Failed to parse string")] Utf8(#[from] FromUtf8Error), + #[error("Failed to parse xml")] + Xml(#[from] quick_xml::de::DeError), + #[error("Invalid length")] Length, - #[error("Invalid status")] - Status(StatusCode), + #[error("Invalid etag response")] + Etag, + + #[error("Invalid status: {0}\n{1}")] + Status(StatusCode, String), } impl From for ObjectError { @@ -72,6 +80,16 @@ pub(crate) struct ObjectStoreConfig { credentials: Credentials, } +#[derive(serde::Deserialize, Debug)] +struct InitiateMultipartUploadResponse { + #[serde(rename = "Bucket")] + _bucket: String, + #[serde(rename = "Key")] + _key: String, + #[serde(rename = "UploadId")] + upload_id: String, +} + impl StoreConfig for ObjectStoreConfig { type Store = ObjectStore; @@ -107,32 +125,105 @@ impl Store for ObjectStore { } #[tracing::instrument(skip(stream))] - async fn save_stream(&self, stream: S) -> Result + async fn save_stream(&self, mut stream: S) -> Result where S: Stream> + Unpin + 'static, { - let (req, object_id) = self.put_object_request().await?; + let (req, object_id) = self.create_multipart_request().await?; + let mut response = req.send().await.map_err(ObjectError::from)?; - let response = req.send_stream(stream).await.map_err(ObjectError::from)?; + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); - if response.status().is_success() { - return Ok(object_id); + return Err(ObjectError::Status(response.status(), body).into()); } - Err(ObjectError::Status(response.status()).into()) + let body = response.body().await?; + let body: InitiateMultipartUploadResponse = + quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?; + let upload_id = &body.upload_id; + + let res = async { + let mut etags = Vec::new(); + let mut complete = false; + let mut part_number = 0; + + while !complete { + part_number += 1; + let mut bytes = BytesMut::with_capacity(CHUNK_SIZE); + + while bytes.len() < CHUNK_SIZE { + if let Some(res) = stream.next().await { + bytes.extend_from_slice(&res?); + } else { + complete = true; + break; + } + } + + let mut response = self + .create_upload_part_request(&bytes, &object_id, part_number, upload_id) + .send_body(bytes) + .await?; + + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + let etag = response + .headers() + .get("etag") + .ok_or(ObjectError::Etag)? + .to_str() + .map_err(|_| ObjectError::Etag)? + .to_string(); + + etags.push(etag); + } + + let mut response = self + .send_complete_multipart_request( + &object_id, + upload_id, + etags.iter().map(|s| s.as_ref()), + ) + .await?; + + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + Ok(()) as Result<(), Error> + } + .await; + + if let Err(e) = res { + self.create_abort_multipart_request(&object_id, upload_id) + .send() + .await?; + return Err(e); + } + + Ok(object_id) } #[tracing::instrument(skip(bytes))] async fn save_bytes(&self, bytes: Bytes) -> Result { let (req, object_id) = self.put_object_request().await?; - let response = req.send_body(bytes).await.map_err(ObjectError::from)?; + let mut response = req.send_body(bytes).await.map_err(ObjectError::from)?; if response.status().is_success() { return Ok(object_id); } - Err(ObjectError::Status(response.status()).into()) + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + Err(ObjectError::Status(response.status(), body).into()) } #[tracing::instrument] @@ -142,7 +233,7 @@ impl Store for ObjectStore { from_start: Option, len: Option, ) -> Result { - let response = self + let mut response = self .get_object_request(identifier, from_start, len) .send() .await @@ -152,7 +243,9 @@ impl Store for ObjectStore { return Ok(Box::pin(response.map_err(payload_to_io_error))); } - Err(ObjectError::Status(response.status()).into()) + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + Err(ObjectError::Status(response.status(), body).into()) } #[tracing::instrument(skip(writer))] @@ -171,9 +264,12 @@ impl Store for ObjectStore { .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, ObjectError::from(e)))?; if !response.status().is_success() { + let body = response.body().await.map_err(payload_to_io_error)?; + let body = String::from_utf8_lossy(&body).to_string(); + return Err(std::io::Error::new( std::io::ErrorKind::Other, - ObjectError::Status(response.status()), + ObjectError::Status(response.status(), body), )); } @@ -188,14 +284,16 @@ impl Store for ObjectStore { #[tracing::instrument] async fn len(&self, identifier: &Self::Identifier) -> Result { - let response = self + let mut response = self .head_object_request(identifier) .send() .await .map_err(ObjectError::from)?; if !response.status().is_success() { - return Err(ObjectError::Status(response.status()).into()); + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); } let length = response @@ -212,10 +310,12 @@ impl Store for ObjectStore { #[tracing::instrument] async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Error> { - let response = self.delete_object_request(identifier).send().await?; + let mut response = self.delete_object_request(identifier).send().await?; if !response.status().is_success() { - return Err(ObjectError::Status(response.status()).into()); + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); } Ok(()) @@ -252,12 +352,100 @@ impl ObjectStore { async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), Error> { let path = self.next_file().await?; - let action = self.bucket.put_object(Some(&self.credentials), &path); + let mut action = self.bucket.put_object(Some(&self.credentials), &path); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); Ok((self.build_request(action), ObjectId::from_string(path))) } - fn build_request<'a, A: S3Action<'a>>(&'a self, mut action: A) -> ClientRequest { + async fn create_multipart_request(&self) -> Result<(ClientRequest, ObjectId), Error> { + let path = self.next_file().await?; + + let mut action = self + .bucket + .create_multipart_upload(Some(&self.credentials), &path); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + + Ok((self.build_request(action), ObjectId::from_string(path))) + } + + fn create_upload_part_request( + &self, + bytes: &[u8], + object_id: &ObjectId, + part_number: u16, + upload_id: &str, + ) -> ClientRequest { + use md5::Digest; + + let mut action = self.bucket.upload_part( + Some(&self.credentials), + object_id.as_str(), + part_number, + upload_id, + ); + + let mut hasher = md5::Md5::new(); + hasher.update(bytes); + let hash = hasher.finalize(); + let hash_string = base64::encode(&hash); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + action.headers_mut().insert("content-md5", hash_string); + + self.build_request(action) + } + + fn send_complete_multipart_request<'a, I: Iterator>( + &'a self, + object_id: &'a ObjectId, + upload_id: &'a str, + etags: I, + ) -> SendClientRequest { + let mut action = self.bucket.complete_multipart_upload( + Some(&self.credentials), + object_id.as_str(), + upload_id, + etags, + ); + + action + .headers_mut() + .insert("content-type", "application/octet-stream"); + + let (req, action) = self.build_request_inner(action); + + req.send_body(action.body()) + } + + fn create_abort_multipart_request( + &self, + object_id: &ObjectId, + upload_id: &str, + ) -> ClientRequest { + let action = self.bucket.abort_multipart_upload( + Some(&self.credentials), + object_id.as_str(), + upload_id, + ); + + self.build_request(action) + } + + fn build_request<'a, A: S3Action<'a>>(&'a self, action: A) -> ClientRequest { + let (req, _) = self.build_request_inner(action); + req + } + + fn build_request_inner<'a, A: S3Action<'a>>(&'a self, mut action: A) -> (ClientRequest, A) { let method = match A::METHOD { rusty_s3::Method::Head => awc::http::Method::HEAD, rusty_s3::Method::Get => awc::http::Method::GET, @@ -270,10 +458,12 @@ impl ObjectStore { let req = self.client.request(method, url.as_str()); - action + let req = action .headers_mut() .iter() - .fold(req, |req, tup| req.insert_header(tup)) + .fold(req, |req, tup| req.insert_header(tup)); + + (req, action) } fn get_object_request( From 29a998a665b4f9d43cabfaf04bad10f7f17b8760 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 20:34:13 -0500 Subject: [PATCH 5/8] Update object storage testing configs --- docker/object-storage/docker-compose.yml | 4 +--- docker/object-storage/otel.yml | 3 ++- docker/object-storage/pict-rs.toml | 17 +++++++---------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/docker/object-storage/docker-compose.yml b/docker/object-storage/docker-compose.yml index 9f1f905..ac82284 100644 --- a/docker/object-storage/docker-compose.yml +++ b/docker/object-storage/docker-compose.yml @@ -14,8 +14,6 @@ services: environment: - PICTRS__TRACING__CONSOLE__ADDRESS=0.0.0.0:6669 - PICTRS__TRACING__OPENTELEMETRY__URL=http://otel:4137 - links: - - "minio:pict-rs.minio" stdin_open: true tty: true volumes: @@ -23,7 +21,7 @@ services: - ../../:/opt/app pictrs_proxy: - image: asonix/pictrs-proxy:0.3 + image: asonix/pictrs-proxy:0.4.0-alpha.4 ports: - "8081:8081" environment: diff --git a/docker/object-storage/otel.yml b/docker/object-storage/otel.yml index 8270b08..91168c3 100644 --- a/docker/object-storage/otel.yml +++ b/docker/object-storage/otel.yml @@ -11,7 +11,8 @@ exporters: logging: jaeger: endpoint: jaeger:14250 - insecure: true + tls: + insecure: true service: pipelines: diff --git a/docker/object-storage/pict-rs.toml b/docker/object-storage/pict-rs.toml index f710493..426f3fb 100644 --- a/docker/object-storage/pict-rs.toml +++ b/docker/object-storage/pict-rs.toml @@ -1,6 +1,7 @@ [server] address = '0.0.0.0:8080' worker_id = 'pict-rs-1' + [tracing.logging] format = 'normal' targets = 'warn,tracing_actix_web=info,actix_server=info,actix_web=info' @@ -21,13 +22,7 @@ max_height = 10000 max_area = 40000000 max_file_size = 40 enable_silent_video = true -filters = [ - 'blur', - 'crop', - 'identity', - 'resize', - 'thumbnail', -] +filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail'] skip_validate_imports = false [repo] @@ -37,7 +32,9 @@ cache_capacity = 67108864 [store] type = 'object_storage' +endpoint = 'http://minio:9000' +use_path_style = true bucket_name = 'pict-rs' -region = 'http://minio:9000' -access_key = 'Q7Z3AY3JO01N27UNH5IR' -secret_key = 'bH3Kj6UVJF+btBtWsExVurN3ukEilC3saECsupzi' +region = 'minio' +access_key = 'pictrs' +secret_key = 'pictrspass' From bf3c47e4579bbae4fb54e4b2b78b02b7f87e0499 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 24 Sep 2022 22:07:06 -0500 Subject: [PATCH 6/8] Improve concurrency for upload streams --- src/store/object_store.rs | 143 ++++++++++++++++++++++++++++---------- 1 file changed, 107 insertions(+), 36 deletions(-) diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 5a7c0e3..9fe0425 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -3,8 +3,9 @@ use crate::{ repo::{Repo, SettingsRepo}, store::{Store, StoreConfig}, }; +use actix_rt::task::JoinError; use actix_web::{ - error::PayloadError, + error::{BlockingError, PayloadError}, http::{ header::{ByteRangeSpec, Range, CONTENT_LENGTH}, StatusCode, @@ -18,6 +19,7 @@ use std::{pin::Pin, string::FromUtf8Error, time::Duration}; use storage_path_generator::{Generator, Path}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::io::ReaderStream; +use tracing::Instrument; use url::Url; mod object_id; @@ -53,6 +55,9 @@ pub(crate) enum ObjectError { #[error("Invalid etag response")] Etag, + #[error("Task cancelled")] + Cancelled, + #[error("Invalid status: {0}\n{1}")] Status(StatusCode, String), } @@ -63,6 +68,18 @@ impl From for ObjectError { } } +impl From for ObjectError { + fn from(_: JoinError) -> Self { + Self::Cancelled + } +} + +impl From for ObjectError { + fn from(_: BlockingError) -> Self { + Self::Cancelled + } +} + #[derive(Clone)] pub(crate) struct ObjectStore { path_gen: Generator, @@ -111,6 +128,34 @@ fn payload_to_io_error(e: PayloadError) -> std::io::Error { } } +#[tracing::instrument(skip(stream))] +async fn read_chunk(stream: &mut S) -> std::io::Result +where + S: Stream> + Unpin + 'static, +{ + let mut buf = Vec::new(); + let mut total_len = 0; + + while total_len < CHUNK_SIZE { + if let Some(res) = stream.next().await { + let bytes = res?; + total_len += bytes.len(); + buf.push(bytes); + } else { + break; + } + } + + let bytes = buf + .iter() + .fold(BytesMut::with_capacity(total_len), |mut acc, item| { + acc.extend_from_slice(item); + acc + }); + + Ok(bytes.freeze()) +} + #[async_trait::async_trait(?Send)] impl Store for ObjectStore { type Identifier = ObjectId; @@ -144,43 +189,61 @@ impl Store for ObjectStore { let upload_id = &body.upload_id; let res = async { - let mut etags = Vec::new(); let mut complete = false; let mut part_number = 0; + let mut futures = Vec::new(); while !complete { part_number += 1; - let mut bytes = BytesMut::with_capacity(CHUNK_SIZE); - while bytes.len() < CHUNK_SIZE { - if let Some(res) = stream.next().await { - bytes.extend_from_slice(&res?); - } else { - complete = true; - break; + let bytes = read_chunk(&mut stream).await?; + complete = bytes.len() < CHUNK_SIZE; + + let this = self.clone(); + + let object_id2 = object_id.clone(); + let upload_id2 = upload_id.clone(); + let handle = actix_rt::spawn( + async move { + let mut response = this + .create_upload_part_request( + bytes.clone(), + &object_id2, + part_number, + &upload_id2, + ) + .await? + .send_body(bytes) + .await?; + + if !response.status().is_success() { + let body = String::from_utf8_lossy(&response.body().await?).to_string(); + + return Err(ObjectError::Status(response.status(), body).into()); + } + + let etag = response + .headers() + .get("etag") + .ok_or(ObjectError::Etag)? + .to_str() + .map_err(|_| ObjectError::Etag)? + .to_string(); + + drop(response); + + Ok(etag) as Result } - } + .instrument(tracing::info_span!("Upload Part")), + ); - let mut response = self - .create_upload_part_request(&bytes, &object_id, part_number, upload_id) - .send_body(bytes) - .await?; + futures.push(handle); + } - if !response.status().is_success() { - let body = String::from_utf8_lossy(&response.body().await?).to_string(); + let mut etags = Vec::new(); - return Err(ObjectError::Status(response.status(), body).into()); - } - - let etag = response - .headers() - .get("etag") - .ok_or(ObjectError::Etag)? - .to_str() - .map_err(|_| ObjectError::Etag)? - .to_string(); - - etags.push(etag); + for future in futures { + etags.push(future.await.map_err(ObjectError::from)??); } let mut response = self @@ -375,13 +438,13 @@ impl ObjectStore { Ok((self.build_request(action), ObjectId::from_string(path))) } - fn create_upload_part_request( + async fn create_upload_part_request( &self, - bytes: &[u8], + bytes: Bytes, object_id: &ObjectId, part_number: u16, upload_id: &str, - ) -> ClientRequest { + ) -> Result { use md5::Digest; let mut action = self.bucket.upload_part( @@ -391,17 +454,25 @@ impl ObjectStore { upload_id, ); - let mut hasher = md5::Md5::new(); - hasher.update(bytes); - let hash = hasher.finalize(); - let hash_string = base64::encode(&hash); + let hashing_span = tracing::info_span!("Hashing request body"); + let hash_string = actix_web::web::block(move || { + let guard = hashing_span.enter(); + let mut hasher = md5::Md5::new(); + hasher.update(&bytes); + let hash = hasher.finalize(); + let hash_string = base64::encode(&hash); + drop(guard); + hash_string + }) + .await + .map_err(ObjectError::from)?; action .headers_mut() .insert("content-type", "application/octet-stream"); action.headers_mut().insert("content-md5", hash_string); - self.build_request(action) + Ok(self.build_request(action)) } fn send_complete_multipart_request<'a, I: Iterator>( From dc9541784614b044f671ce26a4ca4729a7a4b06b Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 25 Sep 2022 08:27:11 -0500 Subject: [PATCH 7/8] Improve concurrency for upload streams part 2 --- src/store/object_store.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/store/object_store.rs b/src/store/object_store.rs index 9fe0425..f9b01b2 100644 --- a/src/store/object_store.rs +++ b/src/store/object_store.rs @@ -188,6 +188,7 @@ impl Store for ObjectStore { quick_xml::de::from_reader(&*body).map_err(ObjectError::from)?; let upload_id = &body.upload_id; + // hack-ish: use async block as Result boundary let res = async { let mut complete = false; let mut part_number = 0; @@ -230,6 +231,7 @@ impl Store for ObjectStore { .map_err(|_| ObjectError::Etag)? .to_string(); + // early-drop response to close its tracing spans drop(response); Ok(etag) as Result @@ -240,6 +242,9 @@ impl Store for ObjectStore { futures.push(handle); } + // early-drop stream to allow the next Part to be polled concurrently + drop(stream); + let mut etags = Vec::new(); for future in futures { From 04bc586a265bfe245074859778897c5718de18a9 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 25 Sep 2022 09:09:05 -0500 Subject: [PATCH 8/8] Fix hasher test --- src/ingest/hasher.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ingest/hasher.rs b/src/ingest/hasher.rs index a3ce995..811600b 100644 --- a/src/ingest/hasher.rs +++ b/src/ingest/hasher.rs @@ -89,11 +89,11 @@ mod test { let hash = test_on_arbiter!(async move { let file1 = tokio::fs::File::open("./client-examples/earth.gif").await?; - let mut hasher = Hasher::new(file1, Sha256::new()); + let mut reader = Hasher::new(file1, Sha256::new()); - tokio::io::copy(&mut hasher, &mut tokio::io::sink()).await?; + tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?; - hasher.finalize_reset().await + Ok(reader.hasher().borrow_mut().finalize_reset().to_vec()) as std::io::Result<_> }) .unwrap();