Share client between object store and request extractor

Add config to limit connection pool size
This commit is contained in:
asonix 2023-06-23 11:20:20 -05:00
parent e139765e6d
commit a1d48cbb42
9 changed files with 188 additions and 116 deletions

View file

@ -1,6 +1,7 @@
[server] [server]
address = "0.0.0.0:8080" address = "0.0.0.0:8080"
worker_id = "pict-rs-1" worker_id = "pict-rs-1"
client_pool_size = 100
[tracing.logging] [tracing.logging]
format = "normal" format = "normal"
@ -33,7 +34,6 @@ filters = [
"thumbnail", "thumbnail",
] ]
skip_validate_imports = false skip_validate_imports = false
cache_duration = 168
[media.gif] [media.gif]
max_width = 128 max_width = 128

View file

@ -20,6 +20,19 @@ worker_id = 'pict-rs-1'
# Not specifying api_key disables internal endpoints # Not specifying api_key disables internal endpoints
api_key = 'API_KEY' api_key = 'API_KEY'
## Optional: connection pool size for internal http client
# environment variable: PICTRS__SERVER__CLIENT_POOL_SIZE
# default: 100
#
# This number is multiplied the number of cores available to pict-rs. Running on a 2 core machine
# with the default value will result in 200 pooled connections. Running on a 32 core machine with
# the default value will result in 3200 pooled connections.
#
# This number can be lowered to keep pict-rs within ulimit bounds if you encounter errors related to
# "Too many open files". Alternatively, increasing the ulimit of your system can solve this problem
# as well.
client_pool_size = 100
## Logging configuration ## Logging configuration
[tracing.logging] [tracing.logging]

View file

@ -45,6 +45,7 @@ impl Args {
address, address,
api_key, api_key,
worker_id, worker_id,
client_pool_size,
media_preprocess_steps, media_preprocess_steps,
media_skip_validate_imports, media_skip_validate_imports,
media_max_width, media_max_width,
@ -67,6 +68,7 @@ impl Args {
address, address,
api_key, api_key,
worker_id, worker_id,
client_pool_size,
}; };
let gif = if media_gif_max_width.is_none() let gif = if media_gif_max_width.is_none()
&& media_gif_max_height.is_none() && media_gif_max_height.is_none()
@ -281,6 +283,8 @@ struct Server {
worker_id: Option<String>, worker_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
api_key: Option<String>, api_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
client_pool_size: Option<usize>,
} }
#[derive(Debug, Default, serde::Serialize)] #[derive(Debug, Default, serde::Serialize)]
@ -438,6 +442,14 @@ struct Run {
#[arg(long)] #[arg(long)]
worker_id: Option<String>, worker_id: Option<String>,
/// Number of connections the internel HTTP client should maintain in its pool
///
/// This number defaults to 100, and the total number is multiplied by the number of cores
/// available to the program. This means that running on a 2 core system will result in 200
/// pooled connections, and running on a 32 core system will result in 3200 pooled connections.
#[arg(long)]
client_pool_size: Option<usize>,
/// Optional pre-processing steps for uploaded media. /// Optional pre-processing steps for uploaded media.
/// ///
/// All still images will be put through these steps before saving /// All still images will be put through these steps before saving

View file

@ -20,6 +20,7 @@ pub(crate) struct Defaults {
struct ServerDefaults { struct ServerDefaults {
address: SocketAddr, address: SocketAddr,
worker_id: String, worker_id: String,
client_pool_size: usize,
} }
#[derive(Clone, Debug, Default, serde::Serialize)] #[derive(Clone, Debug, Default, serde::Serialize)]
@ -115,6 +116,7 @@ impl Default for ServerDefaults {
ServerDefaults { ServerDefaults {
address: "0.0.0.0:8080".parse().expect("Valid address string"), address: "0.0.0.0:8080".parse().expect("Valid address string"),
worker_id: String::from("pict-rs-1"), worker_id: String::from("pict-rs-1"),
client_pool_size: 100,
} }
} }
} }

View file

@ -38,6 +38,9 @@ pub(crate) struct Server {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub(crate) api_key: Option<String>, pub(crate) api_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) client_pool_size: Option<usize>,
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]

View file

@ -30,7 +30,7 @@ use actix_web::{
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES}, http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer, web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
}; };
use awc::Client; use awc::{Client, Connector};
use futures_util::{ use futures_util::{
stream::{empty, once}, stream::{empty, once},
Stream, StreamExt, TryStreamExt, Stream, StreamExt, TryStreamExt,
@ -61,14 +61,14 @@ use self::{
middleware::{Deadline, Internal}, middleware::{Deadline, Internal},
queue::queue_generate, queue::queue_generate,
repo::{ repo::{
Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, Repo, SettingsRepo, UploadId, Alias, DeleteToken, FullRepo, HashRepo, IdentifierRepo, QueueRepo, Repo, SettingsRepo,
UploadResult, UploadId, UploadResult,
}, },
serde_str::Serde, serde_str::Serde,
store::{ store::{
file_store::FileStore, file_store::FileStore,
object_store::{ObjectStore, ObjectStoreConfig}, object_store::{ObjectStore, ObjectStoreConfig},
Identifier, Store, StoreConfig, Identifier, Store,
}, },
stream::{StreamLimit, StreamTimeout}, stream::{StreamLimit, StreamTimeout},
}; };
@ -981,7 +981,14 @@ fn transform_error(error: actix_form_data::Error) -> actix_web::Error {
} }
fn build_client() -> awc::Client { fn build_client() -> awc::Client {
let connector = CONFIG
.server
.client_pool_size
.map(|size| Connector::new().limit(size))
.unwrap_or_else(Connector::new);
Client::builder() Client::builder()
.connector(connector)
.wrap(Tracing) .wrap(Tracing)
.add_default_header(("User-Agent", "pict-rs v0.4.0-main")) .add_default_header(("User-Agent", "pict-rs v0.4.0-main"))
.timeout(Duration::from_secs(30)) .timeout(Duration::from_secs(30))
@ -996,15 +1003,88 @@ fn next_worker_id() -> String {
format!("{}-{}", CONFIG.server.worker_id, next_id) format!("{}-{}", CONFIG.server.worker_id, next_id)
} }
async fn launch<R: FullRepo + 'static, SC: StoreConfig + 'static>( fn configure_endpoints<R: FullRepo + 'static, S: Store + 'static>(
config: &mut web::ServiceConfig,
repo: R, repo: R,
store_config: SC, store: S,
) -> color_eyre::Result<()> { client: Client,
repo.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec()) ) {
.await?; config
.app_data(web::Data::new(repo))
.app_data(web::Data::new(store))
.app_data(web::Data::new(client))
.route("/healthz", web::get().to(healthz::<R>))
.service(
web::scope("/image")
.service(
web::resource("")
.guard(guard::Post())
.route(web::post().to(upload::<R, S>)),
)
.service(
web::scope("/backgrounded")
.service(
web::resource("")
.guard(guard::Post())
.route(web::post().to(upload_backgrounded::<R, S>)),
)
.service(
web::resource("/claim").route(web::get().to(claim_upload::<R, S>)),
),
)
.service(web::resource("/download").route(web::get().to(download::<R, S>)))
.service(
web::resource("/delete/{delete_token}/{filename}")
.route(web::delete().to(delete::<R>))
.route(web::get().to(delete::<R>)),
)
.service(
web::resource("/original/{filename}")
.route(web::get().to(serve::<R, S>))
.route(web::head().to(serve_head::<R, S>)),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process::<R, S>))
.route(web::head().to(process_head::<R, S>)),
)
.service(
web::resource("/process_backgrounded.{ext}")
.route(web::get().to(process_backgrounded::<R, S>)),
)
.service(
web::scope("/details")
.service(
web::resource("/original/{filename}")
.route(web::get().to(details::<R, S>)),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process_details::<R, S>)),
),
),
)
.service(
web::scope("/internal")
.wrap(Internal(
CONFIG.server.api_key.as_ref().map(|s| s.to_owned()),
))
.service(web::resource("/import").route(web::post().to(import::<R, S>)))
.service(web::resource("/variants").route(web::delete().to(clean_variants::<R>)))
.service(web::resource("/purge").route(web::post().to(purge::<R>)))
.service(web::resource("/aliases").route(web::get().to(aliases::<R>)))
.service(web::resource("/identifier").route(web::get().to(identifier::<R, S>))),
);
}
async fn launch_file_store<R: FullRepo + 'static>(
repo: R,
store: FileStore,
) -> std::io::Result<()> {
HttpServer::new(move || { HttpServer::new(move || {
let store = store_config.clone().build(); let client = build_client();
let store = store.clone();
let repo = repo.clone(); let repo = repo.clone();
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| { tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
@ -1025,91 +1105,51 @@ async fn launch<R: FullRepo + 'static, SC: StoreConfig + 'static>(
App::new() App::new()
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
.wrap(Deadline) .wrap(Deadline)
.app_data(web::Data::new(repo)) .configure(move |sc| configure_endpoints(sc, repo, store, client))
.app_data(web::Data::new(store))
.app_data(web::Data::new(build_client()))
.route("/healthz", web::get().to(healthz::<R>))
.service(
web::scope("/image")
.service(
web::resource("")
.guard(guard::Post())
.route(web::post().to(upload::<R, SC::Store>)),
)
.service(
web::scope("/backgrounded")
.service(
web::resource("")
.guard(guard::Post())
.route(web::post().to(upload_backgrounded::<R, SC::Store>)),
)
.service(
web::resource("/claim")
.route(web::get().to(claim_upload::<R, SC::Store>)),
),
)
.service(
web::resource("/download").route(web::get().to(download::<R, SC::Store>)),
)
.service(
web::resource("/delete/{delete_token}/{filename}")
.route(web::delete().to(delete::<R>))
.route(web::get().to(delete::<R>)),
)
.service(
web::resource("/original/{filename}")
.route(web::get().to(serve::<R, SC::Store>))
.route(web::head().to(serve_head::<R, SC::Store>)),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process::<R, SC::Store>))
.route(web::head().to(process_head::<R, SC::Store>)),
)
.service(
web::resource("/process_backgrounded.{ext}")
.route(web::get().to(process_backgrounded::<R, SC::Store>)),
)
.service(
web::scope("/details")
.service(
web::resource("/original/{filename}")
.route(web::get().to(details::<R, SC::Store>)),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process_details::<R, SC::Store>)),
),
),
)
.service(
web::scope("/internal")
.wrap(Internal(
CONFIG.server.api_key.as_ref().map(|s| s.to_owned()),
))
.service(web::resource("/import").route(web::post().to(import::<R, SC::Store>)))
.service(
web::resource("/variants").route(web::delete().to(clean_variants::<R>)),
)
.service(web::resource("/purge").route(web::post().to(purge::<R>)))
.service(web::resource("/aliases").route(web::get().to(aliases::<R>)))
.service(
web::resource("/identifier")
.route(web::get().to(identifier::<R, SC::Store>)),
),
)
}) })
.bind(CONFIG.server.address)? .bind(CONFIG.server.address)?
.run() .run()
.await?; .await
}
self::tmp_file::remove_tmp_dir().await?; async fn launch_object_store<R: FullRepo + 'static>(
repo: R,
store_config: ObjectStoreConfig,
) -> std::io::Result<()> {
HttpServer::new(move || {
let client = build_client();
Ok(()) let store = store_config.clone().build(client.clone());
let repo = repo.clone();
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(queue::process_cleanup(
repo.clone(),
store.clone(),
next_worker_id(),
))
});
tracing::trace_span!(parent: None, "Spawn task").in_scope(|| {
actix_rt::spawn(queue::process_images(
repo.clone(),
store.clone(),
next_worker_id(),
))
});
App::new()
.wrap(TracingLogger::default())
.wrap(Deadline)
.configure(move |sc| configure_endpoints(sc, repo, store, client))
})
.bind(CONFIG.server.address)?
.run()
.await
} }
async fn migrate_inner<S1>( async fn migrate_inner<S1>(
repo: &Repo, repo: &Repo,
client: Client,
from: S1, from: S1,
to: config::Store, to: config::Store,
skip_missing_files: bool, skip_missing_files: bool,
@ -1119,7 +1159,7 @@ where
{ {
match to { match to {
config::Store::Filesystem(config::Filesystem { path }) => { config::Store::Filesystem(config::Filesystem { path }) => {
let to = FileStore::build(path.clone(), repo.clone()).await?.build(); let to = FileStore::build(path.clone(), repo.clone()).await?;
match repo { match repo {
Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?,
@ -1149,7 +1189,7 @@ where
repo.clone(), repo.clone(),
) )
.await? .await?
.build(); .build(client);
match repo { match repo {
Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?, Repo::Sled(repo) => migrate_store(repo, from, to, skip_missing_files).await?,
@ -1229,10 +1269,12 @@ pub async fn run() -> color_eyre::Result<()> {
from, from,
to, to,
} => { } => {
let client = build_client();
match from { match from {
config::Store::Filesystem(config::Filesystem { path }) => { config::Store::Filesystem(config::Filesystem { path }) => {
let from = FileStore::build(path.clone(), repo.clone()).await?.build(); let from = FileStore::build(path.clone(), repo.clone()).await?;
migrate_inner(&repo, from, to, skip_missing_files).await?; migrate_inner(&repo, client, from, to, skip_missing_files).await?;
} }
config::Store::ObjectStorage(config::ObjectStorage { config::Store::ObjectStorage(config::ObjectStorage {
endpoint, endpoint,
@ -1258,9 +1300,9 @@ pub async fn run() -> color_eyre::Result<()> {
repo.clone(), repo.clone(),
) )
.await? .await?
.build(); .build(client.clone());
migrate_inner(&repo, from, to, skip_missing_files).await?; migrate_inner(&repo, client, from, to, skip_missing_files).await?;
} }
} }
@ -1274,7 +1316,13 @@ pub async fn run() -> color_eyre::Result<()> {
let store = FileStore::build(path, repo.clone()).await?; let store = FileStore::build(path, repo.clone()).await?;
match repo { match repo {
Repo::Sled(sled_repo) => launch::<_, FileStore>(sled_repo, store).await, Repo::Sled(sled_repo) => {
sled_repo
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
launch_file_store(sled_repo, store).await?;
}
} }
} }
config::Store::ObjectStorage(config::ObjectStorage { config::Store::ObjectStorage(config::ObjectStorage {
@ -1303,10 +1351,20 @@ pub async fn run() -> color_eyre::Result<()> {
.await?; .await?;
match repo { match repo {
Repo::Sled(sled_repo) => launch::<_, ObjectStoreConfig>(sled_repo, store).await, Repo::Sled(sled_repo) => {
sled_repo
.requeue_in_progress(CONFIG.server.worker_id.as_bytes().to_vec())
.await?;
launch_object_store(sled_repo, store).await?;
} }
} }
} }
}
self::tmp_file::remove_tmp_dir().await?;
Ok(())
} }
const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress"; const STORE_MIGRATION_PROGRESS: &str = "store-migration-progress";

View file

@ -62,12 +62,6 @@ pub(crate) trait Identifier: Send + Sync + Clone + Debug {
fn string_repr(&self) -> String; fn string_repr(&self) -> String;
} }
pub(crate) trait StoreConfig: Send + Sync + Clone {
type Store: Store;
fn build(self) -> Self::Store;
}
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
pub(crate) trait Store: Clone + Debug { pub(crate) trait Store: Clone + Debug {
type Identifier: Identifier + 'static; type Identifier: Identifier + 'static;

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
file::File, file::File,
repo::{Repo, SettingsRepo}, repo::{Repo, SettingsRepo},
store::{Store, StoreConfig}, store::Store,
}; };
use actix_web::web::Bytes; use actix_web::web::Bytes;
use futures_util::stream::Stream; use futures_util::stream::Stream;
@ -49,14 +49,6 @@ pub(crate) struct FileStore {
repo: Repo, repo: Repo,
} }
impl StoreConfig for FileStore {
type Store = FileStore;
fn build(self) -> Self::Store {
self
}
}
#[async_trait::async_trait(?Send)] #[async_trait::async_trait(?Send)]
impl Store for FileStore { impl Store for FileStore {
type Identifier = FileId; type Identifier = FileId;

View file

@ -1,7 +1,7 @@
use crate::{ use crate::{
bytes_stream::BytesStream, bytes_stream::BytesStream,
repo::{Repo, SettingsRepo}, repo::{Repo, SettingsRepo},
store::{Store, StoreConfig}, store::Store,
}; };
use actix_rt::task::JoinError; use actix_rt::task::JoinError;
use actix_web::{ use actix_web::{
@ -116,16 +116,14 @@ struct InitiateMultipartUploadResponse {
upload_id: String, upload_id: String,
} }
impl StoreConfig for ObjectStoreConfig { impl ObjectStoreConfig {
type Store = ObjectStore; pub(crate) fn build(self, client: Client) -> ObjectStore {
fn build(self) -> Self::Store {
ObjectStore { ObjectStore {
path_gen: self.path_gen, path_gen: self.path_gen,
repo: self.repo, repo: self.repo,
bucket: self.bucket, bucket: self.bucket,
credentials: self.credentials, credentials: self.credentials,
client: crate::build_client(), client,
} }
} }
} }