From 82e8042c6663defb8c5700c43b10dac3592b24a2 Mon Sep 17 00:00:00 2001 From: asonix Date: Tue, 21 Apr 2020 12:07:39 -0500 Subject: [PATCH] Clean db code, switch to deadpool --- Cargo.lock | 82 +++++++++++-------- Cargo.toml | 7 +- src/config.rs | 8 +- src/data/node.rs | 2 +- src/db.rs | 188 ++++++++++++++++++-------------------------- src/error.rs | 15 ++-- src/jobs/storage.rs | 6 +- src/main.rs | 2 +- src/notify.rs | 2 +- 9 files changed, 149 insertions(+), 163 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8d53d3..b645480 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -470,8 +470,9 @@ dependencies = [ [[package]] name = "background-jobs" -version = "0.8.0-alpha.1" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45" +version = "0.8.0-alpha.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb38c4a5de33324650e9023829b0f4129eb5418b29f5dfe69a52100ff5bc50d7" dependencies = [ "background-jobs-actix", "background-jobs-core", @@ -479,8 +480,9 @@ dependencies = [ [[package]] name = "background-jobs-actix" -version = "0.8.0-alpha.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45" +version = "0.8.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bb7f892dcd3ee34aab169d60587232d47aa054e4401c3067a64a6871eda806a" dependencies = [ "actix", "actix-rt", @@ -501,7 +503,8 @@ dependencies = [ [[package]] name = "background-jobs-core" version = "0.8.0-alpha.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c3447d183c7f1c6e2f9c564860712fb5b11ffa9be12caa28791674b865c85fb" dependencies = [ "actix", "anyhow", @@ -556,30 +559,6 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d5ca2cd0adc3f48f9e9ea5a6bbdf9ccc0bfade884847e484d452414c7ccffb3" -[[package]] -name = "bb8" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7744a35a99f0ae3a6f5681f5af800e9074c658b1d0d314e9f0c3166455a1c3f6" -dependencies = [ - "async-trait", - "futures", - "tokio", -] - -[[package]] -name = "bb8-postgres" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39a233af6ea3952e20d01863c87b4f6689b2f806249688b0908b5f02d4fa41ac" -dependencies = [ - "async-trait", - "bb8", - "futures", - "tokio", - "tokio-postgres", -] - [[package]] name = "bit-vec" version = "0.6.1" @@ -781,6 +760,16 @@ dependencies = [ "maybe-uninit", ] +[[package]] +name = "crossbeam-queue" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -802,6 +791,36 @@ dependencies = [ "subtle 1.0.0", ] +[[package]] +name = "deadpool" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ce52b0b1ad88ed0b2be2bc3c65ad39dd1a5d9633b1a8a314fc017fbe0027d2" +dependencies = [ + "async-trait", + "config", + "crossbeam-queue", + "num_cpus", + "serde 1.0.106", + "tokio", +] + +[[package]] +name = "deadpool-postgres" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45575d9acf1535dddcfc5841fd8f1776287bdc328f8d9e76531f4dfd2eb9788f" +dependencies = [ + "async-trait", + "config", + "deadpool", + "futures", + "log", + "serde 1.0.106", + "tokio", + "tokio-postgres", +] + [[package]] name = "derive_more" version = "0.99.5" @@ -1953,11 +1972,11 @@ dependencies = [ "anyhow", "async-trait", "background-jobs", - "background-jobs-core", "base64 0.12.0", - "bb8-postgres", "bytes", "config", + "deadpool", + "deadpool-postgres", "dotenv", "env_logger", "futures", @@ -1978,6 +1997,7 @@ dependencies = [ "structopt", "thiserror", "tokio", + "tokio-postgres", "ttl_cache", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 980600f..f599f92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,12 +21,12 @@ actix-webfinger = "0.3.0-alpha.3" activitystreams = "0.5.0" ammonia = "3.1.0" async-trait = "0.1.24" -background-jobs = { version = "0.8.0-alpha.1", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] } -background-jobs-core = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs" } +background-jobs = "0.8.0-alpha.2" bytes = "0.5.4" base64 = "0.12" -bb8-postgres = { version = "0.4.0", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] } config = "0.10.1" +deadpool = "0.5.1" +deadpool-postgres = "0.5.5" dotenv = "0.15.0" env_logger = "0.7.1" futures = "0.3.4" @@ -46,6 +46,7 @@ sha2 = "0.8" structopt = "0.3.12" thiserror = "1.0" tokio = { version = "0.2.13", features = ["sync"] } +tokio-postgres = { version = "0.5.1", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] } ttl_cache = "0.5.1" uuid = { version = "0.8", features = ["v4", "serde"] } diff --git a/src/config.rs b/src/config.rs index 820f70d..1ee0498 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,7 +17,7 @@ pub struct Config { database_url: String, pretty_log: bool, publish_blocks: bool, - connections_per_core: u32, + max_connections: usize, } pub enum UrlKind { @@ -46,7 +46,7 @@ impl Config { .set_default("https", false)? .set_default("pretty_log", true)? .set_default("publish_blocks", false)? - .set_default("connections_per_core", 2)? + .set_default("max_connections", 2)? .merge(Environment::new())?; Ok(config.try_into()?) @@ -56,8 +56,8 @@ impl Config { self.pretty_log } - pub fn connections_per_core(&self) -> u32 { - self.connections_per_core + pub fn max_connections(&self) -> usize { + self.max_connections } pub fn validate_signatures(&self) -> bool { diff --git a/src/data/node.rs b/src/data/node.rs index c816431..cfae9bd 100644 --- a/src/data/node.rs +++ b/src/data/node.rs @@ -1,6 +1,5 @@ use crate::{db::Db, error::MyError}; use activitystreams::primitives::XsdAnyUri; -use bb8_postgres::tokio_postgres::types::Json; use log::{debug, error}; use std::{ collections::{HashMap, HashSet}, @@ -8,6 +7,7 @@ use std::{ time::{Duration, SystemTime}, }; use tokio::sync::RwLock; +use tokio_postgres::types::Json; use uuid::Uuid; pub type ListenersCache = Arc>>; diff --git a/src/db.rs b/src/db.rs index 1ac300f..7ffbffc 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,20 +1,15 @@ use crate::error::MyError; use activitystreams::primitives::XsdAnyUri; -use bb8_postgres::{ - bb8, - tokio_postgres::{ - error::{Error, SqlState}, - row::Row, - Client, Config, NoTls, - }, - PostgresConnectionManager, -}; +use deadpool_postgres::{Manager, Pool}; use log::{info, warn}; use rsa::RSAPrivateKey; use rsa_pem::KeyExt; -use std::{collections::HashSet, convert::TryInto}; - -pub type Pool = bb8::Pool>; +use std::collections::HashSet; +use tokio_postgres::{ + error::{Error, SqlState}, + row::Row, + Client, Config, NoTls, +}; #[derive(Clone)] pub struct Db { @@ -22,19 +17,15 @@ pub struct Db { } impl Db { - pub async fn build(config: &crate::config::Config) -> Result { - let cpus: u32 = num_cpus::get().try_into()?; - let max_conns = cpus * config.connections_per_core(); - + pub fn build(config: &crate::config::Config) -> Result { + let max_conns = config.max_connections(); let config: Config = config.database_url().parse()?; - let manager = PostgresConnectionManager::new(config, NoTls); - let pool = bb8::Pool::builder() - .max_size(max_conns) - .build(manager) - .await?; + let manager = Manager::new(config, NoTls); - Ok(Db { pool }) + Ok(Db { + pool: Pool::new(manager, max_conns), + }) } pub fn pool(&self) -> &Pool { @@ -42,16 +33,33 @@ impl Db { } pub async fn remove_listener(&self, inbox: XsdAnyUri) -> Result<(), MyError> { - let conn = self.pool.get().await?; + info!("DELETE FROM listeners WHERE actor_id = {};", inbox.as_str()); + self.pool + .get() + .await? + .execute( + "DELETE FROM listeners WHERE actor_id = $1::TEXT;", + &[&inbox.as_str()], + ) + .await?; - remove_listener(&conn, &inbox).await?; Ok(()) } pub async fn add_listener(&self, inbox: XsdAnyUri) -> Result<(), MyError> { - let conn = self.pool.get().await?; + info!( + "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now'); [{}]", + inbox.as_str(), + ); + self.pool + .get() + .await? + .execute( + "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now');", + &[&inbox.as_str()], + ) + .await?; - add_listener(&conn, &inbox).await?; Ok(()) } @@ -98,33 +106,64 @@ impl Db { } pub async fn hydrate_blocks(&self) -> Result, MyError> { - let conn = self.pool.get().await?; + info!("SELECT domain_name FROM blocks"); + let rows = self + .pool + .get() + .await? + .query("SELECT domain_name FROM blocks", &[]) + .await?; - Ok(hydrate_blocks(&conn).await?) + parse_rows(rows) } pub async fn hydrate_whitelists(&self) -> Result, MyError> { - let conn = self.pool.get().await?; + info!("SELECT domain_name FROM whitelists"); + let rows = self + .pool + .get() + .await? + .query("SELECT domain_name FROM whitelists", &[]) + .await?; - Ok(hydrate_whitelists(&conn).await?) + parse_rows(rows) } pub async fn hydrate_listeners(&self) -> Result, MyError> { - let conn = self.pool.get().await?; + info!("SELECT actor_id FROM listeners"); + let rows = self + .pool + .get() + .await? + .query("SELECT actor_id FROM listeners", &[]) + .await?; - Ok(hydrate_listeners(&conn).await?) + parse_rows(rows) } pub async fn hydrate_private_key(&self) -> Result, MyError> { - let conn = self.pool.get().await?; + info!("SELECT value FROM settings WHERE key = 'private_key'"); + let rows = self + .pool + .get() + .await? + .query("SELECT value FROM settings WHERE key = 'private_key'", &[]) + .await?; - Ok(hydrate_private_key(&conn).await?) + if let Some(row) = rows.into_iter().next() { + let key_str: String = row.get(0); + return Ok(Some(KeyExt::from_pem_pkcs8(&key_str)?)); + } + + Ok(None) } pub async fn update_private_key(&self, private_key: &RSAPrivateKey) -> Result<(), MyError> { - let conn = self.pool.get().await?; + let pem_pkcs8 = private_key.to_pem_pkcs8()?; - Ok(update_private_key(&conn, private_key).await?) + info!("INSERT INTO settings (key, value, created_at) VALUES ('private_key', $1::TEXT, 'now');"); + self.pool.get().await?.execute("INSERT INTO settings (key, value, created_at) VALUES ('private_key', $1::TEXT, 'now');", &[&pem_pkcs8]).await?; + Ok(()) } } @@ -148,28 +187,6 @@ pub async fn listen(client: &Client) -> Result<(), Error> { Ok(()) } -async fn hydrate_private_key(client: &Client) -> Result, MyError> { - info!("SELECT value FROM settings WHERE key = 'private_key'"); - let rows = client - .query("SELECT value FROM settings WHERE key = 'private_key'", &[]) - .await?; - - if let Some(row) = rows.into_iter().next() { - let key_str: String = row.get(0); - return Ok(Some(KeyExt::from_pem_pkcs8(&key_str)?)); - } - - Ok(None) -} - -async fn update_private_key(client: &Client, key: &RSAPrivateKey) -> Result<(), MyError> { - let pem_pkcs8 = key.to_pem_pkcs8()?; - - info!("INSERT INTO settings (key, value, created_at) VALUES ('private_key', $1::TEXT, 'now');"); - client.execute("INSERT INTO settings (key, value, created_at) VALUES ('private_key', $1::TEXT, 'now');", &[&pem_pkcs8]).await?; - Ok(()) -} - async fn add_block(client: &Client, domain: &str) -> Result<(), Error> { info!( "INSERT INTO blocks (domain_name, created_at) VALUES ($1::TEXT, 'now'); [{}]", @@ -230,60 +247,7 @@ async fn remove_whitelist(client: &Client, domain: &str) -> Result<(), Error> { Ok(()) } -async fn remove_listener(client: &Client, listener: &XsdAnyUri) -> Result<(), Error> { - info!( - "DELETE FROM listeners WHERE actor_id = {};", - listener.as_str() - ); - client - .execute( - "DELETE FROM listeners WHERE actor_id = $1::TEXT;", - &[&listener.as_str()], - ) - .await?; - - Ok(()) -} - -async fn add_listener(client: &Client, listener: &XsdAnyUri) -> Result<(), Error> { - info!( - "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now'); [{}]", - listener.as_str(), - ); - client - .execute( - "INSERT INTO listeners (actor_id, created_at) VALUES ($1::TEXT, 'now');", - &[&listener.as_str()], - ) - .await?; - - Ok(()) -} - -async fn hydrate_blocks(client: &Client) -> Result, Error> { - info!("SELECT domain_name FROM blocks"); - let rows = client.query("SELECT domain_name FROM blocks", &[]).await?; - - parse_rows(rows) -} - -async fn hydrate_whitelists(client: &Client) -> Result, Error> { - info!("SELECT domain_name FROM whitelists"); - let rows = client - .query("SELECT domain_name FROM whitelists", &[]) - .await?; - - parse_rows(rows) -} - -async fn hydrate_listeners(client: &Client) -> Result, Error> { - info!("SELECT actor_id FROM listeners"); - let rows = client.query("SELECT actor_id FROM listeners", &[]).await?; - - parse_rows(rows) -} - -fn parse_rows(rows: Vec) -> Result, Error> +fn parse_rows(rows: Vec) -> Result, MyError> where T: std::str::FromStr + Eq + std::hash::Hash, E: std::fmt::Display, diff --git a/src/error.rs b/src/error.rs index 2c16796..903a70b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,6 +4,7 @@ use actix_web::{ http::StatusCode, HttpResponse, }; +use deadpool::managed::{PoolError, TimeoutType}; use log::error; use rsa_pem::KeyError; use std::{convert::Infallible, fmt::Debug, io::Error}; @@ -17,7 +18,7 @@ pub enum MyError { Config(#[from] config::ConfigError), #[error("Error in db, {0}")] - DbError(#[from] bb8_postgres::tokio_postgres::error::Error), + DbError(#[from] tokio_postgres::error::Error), #[error("Couldn't parse key, {0}")] Key(#[from] KeyError), @@ -76,8 +77,8 @@ pub enum MyError { #[error("Couldn't flush buffer")] FlushBuffer, - #[error("Timed out while waiting on db pool")] - DbTimeout, + #[error("Timed out while waiting on db pool, {0:?}")] + DbTimeout(TimeoutType), #[error("Invalid algorithm provided to verifier")] Algorithm, @@ -136,14 +137,14 @@ where } } -impl From> for MyError +impl From> for MyError where T: Into, { - fn from(e: bb8_postgres::bb8::RunError) -> Self { + fn from(e: PoolError) -> Self { match e { - bb8_postgres::bb8::RunError::User(e) => e.into(), - bb8_postgres::bb8::RunError::TimedOut => MyError::DbTimeout, + PoolError::Backend(e) => e.into(), + PoolError::Timeout(t) => MyError::DbTimeout(t), } } } diff --git a/src/jobs/storage.rs b/src/jobs/storage.rs index d1cbe5e..a25e1ab 100644 --- a/src/jobs/storage.rs +++ b/src/jobs/storage.rs @@ -1,7 +1,7 @@ use crate::{db::Db, error::MyError}; -use background_jobs_core::{JobInfo, Stats}; -use bb8_postgres::tokio_postgres::types::Json; +use background_jobs::{dev::JobInfo, Stats}; use log::debug; +use tokio_postgres::types::Json; use uuid::Uuid; #[derive(Clone)] @@ -16,7 +16,7 @@ impl Storage { } #[async_trait::async_trait] -impl background_jobs_core::Storage for Storage { +impl background_jobs::dev::Storage for Storage { type Error = MyError; async fn generate_id(&self) -> Result { diff --git a/src/main.rs b/src/main.rs index a2ad7b4..00ae9cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,7 +47,7 @@ async fn main() -> Result<(), anyhow::Error> { env_logger::init(); } - let db = Db::build(&config).await?; + let db = Db::build(&config)?; let args = Args::new(); diff --git a/src/notify.rs b/src/notify.rs index 2f2a4d1..c1d290e 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -5,10 +5,10 @@ use crate::{ }; use activitystreams::primitives::XsdAnyUri; use actix::clock::{delay_for, Duration}; -use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config}; use futures::stream::{poll_fn, StreamExt}; use log::{debug, error, warn}; use std::{collections::HashMap, sync::Arc}; +use tokio_postgres::{tls::NoTls, AsyncMessage, Config}; use uuid::Uuid; pub trait Listener {