Make object storage timeouts configurable

This commit is contained in:
asonix 2023-07-11 13:01:58 -05:00
parent f5c39f9be5
commit 691bca286c
7 changed files with 164 additions and 17 deletions

View file

@ -4,7 +4,7 @@ use std::path::{Path, PathBuf};
mod commandline; mod commandline;
mod defaults; mod defaults;
mod file; mod file;
mod primitives; pub mod primitives;
use commandline::{Args, Output}; use commandline::{Args, Output};
use config::Config; use config::Config;
@ -12,11 +12,10 @@ use defaults::Defaults;
pub(crate) use commandline::Operation; pub(crate) use commandline::Operation;
pub(crate) use file::{ pub(crate) use file::{
ConfigFile as Configuration, Media as MediaConfiguration, OpenTelemetry, Repo, Sled, Tracing, ConfigFile as Configuration, Media as MediaConfiguration, ObjectStorage, OpenTelemetry, Repo,
}; Sled, Store, Tracing,
pub(crate) use primitives::{
AudioCodec, Filesystem, ImageFormat, LogFormat, ObjectStorage, Store, VideoCodec,
}; };
pub(crate) use primitives::{AudioCodec, Filesystem, ImageFormat, LogFormat, VideoCodec};
/// Source for pict-rs configuration when embedding as a library /// Source for pict-rs configuration when embedding as a library
pub enum ConfigSource<P, T> { pub enum ConfigSource<P, T> {

View file

@ -519,6 +519,8 @@ struct Run {
#[derive(Clone, Debug, Subcommand, serde::Serialize)] #[derive(Clone, Debug, Subcommand, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
#[serde(tag = "type")] #[serde(tag = "type")]
// allow large enum variant - this is an instantiated-once config
#[allow(clippy::large_enum_variant)]
enum Store { enum Store {
/// configure filesystem storage /// configure filesystem storage
Filesystem(Filesystem), Filesystem(Filesystem),
@ -529,6 +531,8 @@ enum Store {
/// Run pict-rs with the provided storage /// Run pict-rs with the provided storage
#[derive(Debug, Subcommand)] #[derive(Debug, Subcommand)]
// allow large enum variant - this is an instantiated-once config
#[allow(clippy::large_enum_variant)]
enum RunStore { enum RunStore {
/// Run pict-rs with filesystem storage /// Run pict-rs with filesystem storage
Filesystem(RunFilesystem), Filesystem(RunFilesystem),
@ -550,6 +554,8 @@ struct MigrateStore {
/// Configure the pict-rs storage migration /// Configure the pict-rs storage migration
#[derive(Debug, Subcommand)] #[derive(Debug, Subcommand)]
// allow large enum variant - this is an instantiated-once config
#[allow(clippy::large_enum_variant)]
enum MigrateStoreFrom { enum MigrateStoreFrom {
/// Migrate from the provided filesystem storage /// Migrate from the provided filesystem storage
Filesystem(MigrateFilesystem), Filesystem(MigrateFilesystem),
@ -560,6 +566,8 @@ enum MigrateStoreFrom {
/// Configure the destination storage for pict-rs storage migration /// Configure the destination storage for pict-rs storage migration
#[derive(Debug, Subcommand)] #[derive(Debug, Subcommand)]
// allow large enum variant - this is an instantiated-once config
#[allow(clippy::large_enum_variant)]
enum MigrateStoreTo { enum MigrateStoreTo {
/// Migrate to the provided filesystem storage /// Migrate to the provided filesystem storage
Filesystem(MigrateFilesystemInner), Filesystem(MigrateFilesystemInner),
@ -667,25 +675,44 @@ struct ObjectStorage {
/// The bucket in which to store media /// The bucket in which to store media
#[arg(short, long)] #[arg(short, long)]
#[serde(skip_serializing_if = "Option::is_none")]
bucket_name: Option<String>, bucket_name: Option<String>,
/// The region the bucket is located in /// The region the bucket is located in
/// ///
/// For minio deployments, this can just be 'minio' /// For minio deployments, this can just be 'minio'
#[arg(short, long)] #[arg(short, long)]
#[serde(skip_serializing_if = "Option::is_none")]
region: Option<String>, region: Option<String>,
/// The Access Key for the user accessing the bucket /// The Access Key for the user accessing the bucket
#[arg(short, long)] #[arg(short, long)]
#[serde(skip_serializing_if = "Option::is_none")]
access_key: Option<String>, access_key: Option<String>,
/// The secret key for the user accessing the bucket /// The secret key for the user accessing the bucket
#[arg(short, long)] #[arg(short, long)]
#[serde(skip_serializing_if = "Option::is_none")]
secret_key: Option<String>, secret_key: Option<String>,
/// The session token for accessing the bucket /// The session token for accessing the bucket
#[arg(long)] #[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
session_token: Option<String>, session_token: Option<String>,
/// How long signatures for object storage requests are valid (in seconds)
///
/// This defaults to 15 seconds
#[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
signature_duration: Option<u64>,
/// How long a client can wait on an object storage request before giving up (in seconds)
///
/// This defaults to 30 seconds
#[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
client_timeout: Option<u64>,
} }
/// Configuration for the sled-backed data repository /// Configuration for the sled-backed data repository

View file

@ -101,9 +101,15 @@ struct SledDefaults {
#[derive(Clone, Debug, serde::Serialize)] #[derive(Clone, Debug, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
#[serde(tag = "type")] pub(super) struct StoreDefaults {
pub(super) enum StoreDefaults { #[serde(rename = "type")]
Filesystem(FilesystemDefaults), type_: String,
#[serde(flatten)]
pub(super) filesystem: FilesystemDefaults,
#[serde(flatten)]
pub(super) object_storage: ObjectStorageDefaults,
} }
#[derive(Clone, Debug, serde::Serialize)] #[derive(Clone, Debug, serde::Serialize)]
@ -112,6 +118,14 @@ pub(super) struct FilesystemDefaults {
path: PathBuf, path: PathBuf,
} }
#[derive(Clone, Debug, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(super) struct ObjectStorageDefaults {
signature_duration: u64,
client_timeout: u64,
}
impl Default for ServerDefaults { impl Default for ServerDefaults {
fn default() -> Self { fn default() -> Self {
ServerDefaults { ServerDefaults {
@ -211,7 +225,11 @@ impl Default for SledDefaults {
impl Default for StoreDefaults { impl Default for StoreDefaults {
fn default() -> Self { fn default() -> Self {
Self::Filesystem(FilesystemDefaults::default()) Self {
type_: String::from("filesystem"),
filesystem: FilesystemDefaults::default(),
object_storage: ObjectStorageDefaults::default(),
}
} }
} }
@ -223,6 +241,15 @@ impl Default for FilesystemDefaults {
} }
} }
impl Default for ObjectStorageDefaults {
fn default() -> Self {
Self {
signature_duration: 15,
client_timeout: 30,
}
}
}
impl From<crate::config::commandline::Filesystem> for crate::config::primitives::Filesystem { impl From<crate::config::commandline::Filesystem> for crate::config::primitives::Filesystem {
fn from(value: crate::config::commandline::Filesystem) -> Self { fn from(value: crate::config::commandline::Filesystem) -> Self {
Self { Self {

View file

@ -1,5 +1,5 @@
use crate::{ use crate::{
config::primitives::{AudioCodec, ImageFormat, LogFormat, Store, Targets, VideoCodec}, config::primitives::{AudioCodec, Filesystem, ImageFormat, LogFormat, Targets, VideoCodec},
serde_str::Serde, serde_str::Serde,
}; };
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
@ -22,6 +22,59 @@ pub(crate) struct ConfigFile {
pub(crate) store: Store, pub(crate) store: Store,
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
// allow large enum variant - this is an instantiated-once config
#[allow(clippy::large_enum_variant)]
pub(crate) enum Store {
Filesystem(Filesystem),
ObjectStorage(ObjectStorage),
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) struct ObjectStorage {
/// The base endpoint for the object storage
///
/// Examples:
/// - `http://localhost:9000`
/// - `https://s3.dualstack.eu-west-1.amazonaws.com`
pub(crate) endpoint: Url,
/// Determines whether to use path style or virtualhost style for accessing objects
///
/// When this is true, objects will be fetched from {endpoint}/{bucket_name}/{object}
/// When false, objects will be fetched from {bucket_name}.{endpoint}/{object}
pub(crate) use_path_style: bool,
/// The bucket in which to store media
pub(crate) bucket_name: String,
/// The region the bucket is located in
pub(crate) region: String,
/// The Access Key for the user accessing the bucket
pub(crate) access_key: String,
/// The secret key for the user accessing the bucket
pub(crate) secret_key: String,
/// The session token for accessing the bucket
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_token: Option<String>,
/// How long signatures for object storage requests are valid (in seconds)
///
/// This defaults to 15 seconds
pub(crate) signature_duration: u64,
/// How long a client can wait on an object storage request before giving up (in seconds)
///
/// This defaults to 30 seconds
pub(crate) client_timeout: u64,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
#[serde(tag = "type")] #[serde(tag = "type")]

View file

@ -142,11 +142,27 @@ pub(crate) struct ObjectStorage {
#[arg(long)] #[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_token: Option<String>, pub(crate) session_token: Option<String>,
/// How long signatures for object storage requests are valid (in seconds)
///
/// This defaults to 15 seconds
#[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) signature_duration: Option<u64>,
/// How long a client can wait on an object storage request before giving up (in seconds)
///
/// This defaults to 30 seconds
#[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) client_timeout: Option<u64>,
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
#[serde(tag = "type")] #[serde(tag = "type")]
// allow large enum variant - this is an instantiated-once config
#[allow(clippy::large_enum_variant)]
pub(crate) enum Store { pub(crate) enum Store {
Filesystem(Filesystem), Filesystem(Filesystem),

View file

@ -1266,21 +1266,21 @@ async fn migrate_inner<S1>(
repo: &Repo, repo: &Repo,
client: Client, client: Client,
from: S1, from: S1,
to: config::Store, to: config::primitives::Store,
skip_missing_files: bool, skip_missing_files: bool,
) -> color_eyre::Result<()> ) -> color_eyre::Result<()>
where where
S1: Store, S1: Store,
{ {
match to { match to {
config::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let to = FileStore::build(path.clone(), repo.clone()).await?; let to = FileStore::build(path.clone(), repo.clone()).await?;
match repo { match repo {
Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?,
} }
} }
config::Store::ObjectStorage(config::ObjectStorage { config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
endpoint, endpoint,
bucket_name, bucket_name,
use_path_style, use_path_style,
@ -1288,6 +1288,8 @@ where
access_key, access_key,
secret_key, secret_key,
session_token, session_token,
signature_duration,
client_timeout,
}) => { }) => {
let to = ObjectStore::build( let to = ObjectStore::build(
endpoint.clone(), endpoint.clone(),
@ -1301,6 +1303,8 @@ where
access_key, access_key,
secret_key, secret_key,
session_token, session_token,
signature_duration.unwrap_or(15),
client_timeout.unwrap_or(30),
repo.clone(), repo.clone(),
) )
.await? .await?
@ -1399,11 +1403,11 @@ pub async fn run() -> color_eyre::Result<()> {
let client = build_client(); let client = build_client();
match from { match from {
config::Store::Filesystem(config::Filesystem { path }) => { config::primitives::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?; let from = FileStore::build(path.clone(), repo.clone()).await?;
migrate_inner(&repo, client, from, to, skip_missing_files).await?; migrate_inner(&repo, client, from, to, skip_missing_files).await?;
} }
config::Store::ObjectStorage(config::ObjectStorage { config::primitives::Store::ObjectStorage(config::primitives::ObjectStorage {
endpoint, endpoint,
bucket_name, bucket_name,
use_path_style, use_path_style,
@ -1411,6 +1415,8 @@ pub async fn run() -> color_eyre::Result<()> {
access_key, access_key,
secret_key, secret_key,
session_token, session_token,
signature_duration,
client_timeout,
}) => { }) => {
let from = ObjectStore::build( let from = ObjectStore::build(
endpoint, endpoint,
@ -1424,6 +1430,8 @@ pub async fn run() -> color_eyre::Result<()> {
access_key, access_key,
secret_key, secret_key,
session_token, session_token,
signature_duration.unwrap_or(15),
client_timeout.unwrap_or(30),
repo.clone(), repo.clone(),
) )
.await? .await?
@ -1460,6 +1468,8 @@ pub async fn run() -> color_eyre::Result<()> {
access_key, access_key,
secret_key, secret_key,
session_token, session_token,
signature_duration,
client_timeout,
}) => { }) => {
let store = ObjectStore::build( let store = ObjectStore::build(
endpoint, endpoint,
@ -1473,6 +1483,8 @@ pub async fn run() -> color_eyre::Result<()> {
access_key, access_key,
secret_key, secret_key,
session_token, session_token,
signature_duration,
client_timeout,
repo.clone(), repo.clone(),
) )
.await?; .await?;

View file

@ -96,6 +96,8 @@ pub(crate) struct ObjectStore {
bucket: Bucket, bucket: Bucket,
credentials: Credentials, credentials: Credentials,
client: Client, client: Client,
signature_expiration: Duration,
client_timeout: Duration,
} }
#[derive(Clone)] #[derive(Clone)]
@ -104,6 +106,8 @@ pub(crate) struct ObjectStoreConfig {
repo: Repo, repo: Repo,
bucket: Bucket, bucket: Bucket,
credentials: Credentials, credentials: Credentials,
signature_expiration: u64,
client_timeout: u64,
} }
#[derive(serde::Deserialize, Debug)] #[derive(serde::Deserialize, Debug)]
@ -124,6 +128,8 @@ impl ObjectStoreConfig {
bucket: self.bucket, bucket: self.bucket,
credentials: self.credentials, credentials: self.credentials,
client, client,
signature_expiration: Duration::from_secs(self.signature_expiration),
client_timeout: Duration::from_secs(self.client_timeout),
} }
} }
} }
@ -437,6 +443,8 @@ impl ObjectStore {
access_key: String, access_key: String,
secret_key: String, secret_key: String,
session_token: Option<String>, session_token: Option<String>,
signature_expiration: u64,
client_timeout: u64,
repo: Repo, repo: Repo,
) -> Result<ObjectStoreConfig, StoreError> { ) -> Result<ObjectStoreConfig, StoreError> {
let path_gen = init_generator(&repo).await?; let path_gen = init_generator(&repo).await?;
@ -451,6 +459,8 @@ impl ObjectStore {
} else { } else {
Credentials::new(access_key, secret_key) Credentials::new(access_key, secret_key)
}, },
signature_expiration,
client_timeout,
}) })
} }
@ -580,9 +590,12 @@ impl ObjectStore {
rusty_s3::Method::Delete => awc::http::Method::DELETE, rusty_s3::Method::Delete => awc::http::Method::DELETE,
}; };
let url = action.sign(Duration::from_secs(15)); let url = action.sign(self.signature_expiration);
let req = self.client.request(method, url.as_str()); let req = self
.client
.request(method, url.as_str())
.timeout(self.client_timeout);
let req = action let req = action
.headers_mut() .headers_mut()