From 75df271b589e33cda797625f55700ec498ef15b7 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 17 Aug 2023 17:09:35 -0500 Subject: [PATCH] Switch from awc to reqwest, enable HTTP Proxies --- Cargo.lock | 321 ++++++++++++++++++++++--------------------- Cargo.toml | 14 +- README.md | 12 +- src/admin/client.rs | 50 ++++--- src/config.rs | 46 ++++++- src/data/state.rs | 50 +++---- src/error.rs | 18 ++- src/extractors.rs | 6 + src/jobs.rs | 13 +- src/jobs/contact.rs | 3 + src/jobs/deliver.rs | 2 +- src/jobs/instance.rs | 6 + src/jobs/nodeinfo.rs | 5 +- src/main.rs | 85 ++++++++---- src/requests.rs | 129 ++++++----------- src/routes/index.rs | 2 +- src/routes/media.rs | 2 +- src/spawner.rs | 29 ++++ 18 files changed, 454 insertions(+), 339 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7069d7d..bb9b6ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,7 +100,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -140,7 +140,7 @@ dependencies = [ "futures-util", "mio", "num_cpus", - "socket2", + "socket2 0.4.9", "tokio", "tracing", ] @@ -167,7 +167,6 @@ dependencies = [ "actix-service", "actix-utils", "futures-core", - "http", "log", "pin-project-lite", "tokio-rustls 0.23.4", @@ -219,20 +218,19 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", - "socket2", + "socket2 0.4.9", "time", "url", ] [[package]] name = "actix-webfinger" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e64f0f9b28305d38058daaff76a608684a43cbf67e9a9289bdd124a2a45b5e" +checksum = "74a22b44deff50693521b489885151fd65a2a596f7aef6d8c0753485b8915082" dependencies = [ "actix-rt", "actix-web", - "awc", "serde", "serde_derive", "thiserror", @@ -287,9 +285,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.2" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a" dependencies = [ "memchr", ] @@ -375,9 +373,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +checksum = "c677ab05e09154296dd37acecd46420c17b9713e8366facafa8fc0885167cf4c" dependencies = [ "anstyle", "windows-sys", @@ -385,9 +383,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.72" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854" +checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" [[package]] name = "ap-relay" @@ -400,7 +398,6 @@ dependencies = [ "actix-webfinger", "ammonia", "anyhow", - "awc", "background-jobs", "base64 0.21.2", "bcrypt", @@ -412,6 +409,7 @@ dependencies = [ "flume", "futures-util", "http-signature-normalization-actix", + "http-signature-normalization-reqwest", "lru", "metrics", "metrics-exporter-prometheus", @@ -423,6 +421,9 @@ dependencies = [ "pin-project-lite", "quanta", "rand", + "reqwest", + "reqwest-middleware", + "reqwest-tracing", "ring", "rsa", "rsa-magic-public-key", @@ -439,7 +440,6 @@ dependencies = [ "toml 0.7.6", "tracing", "tracing-actix-web", - "tracing-awc", "tracing-error", "tracing-futures", "tracing-log", @@ -469,13 +469,13 @@ checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" [[package]] name = "async-trait" -version = "0.1.72" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -484,40 +484,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" -[[package]] -name = "awc" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87ef547a81796eb2dfe9b345aba34c2e08391a0502493711395b36dd64052b69" -dependencies = [ - "actix-codec", - "actix-http", - "actix-rt", - "actix-service", - "actix-tls", - "actix-utils", - "ahash 0.7.6", - "base64 0.21.2", - "bytes", - "cfg-if", - "derive_more", - "futures-core", - "futures-util", - "h2", - "http", - "itoa", - "log", - "mime", - "percent-encoding", - "pin-project-lite", - "rand", - "rustls 0.20.8", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", -] - [[package]] name = "axum" version = "0.6.20" @@ -667,9 +633,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.3.3" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" [[package]] name = "block-buffer" @@ -746,9 +712,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c6b2562119bf28c3439f7f02db99faf0aa1a8cdfe5772a2ee155d32227239f0" +checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" dependencies = [ "libc", ] @@ -781,9 +747,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.19" +version = "4.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d" +checksum = "b417ae4361bca3f5de378294fc7472d3c4ed86a5ef9f49e93ae722f432aae8d2" dependencies = [ "clap_builder", "clap_derive", @@ -792,9 +758,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.19" +version = "4.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1" +checksum = "9c90dc0f0e42c64bff177ca9d7be6fcc9ddb0f26a6e062174a61c84dd6c644d4" dependencies = [ "anstream", "anstyle", @@ -811,7 +777,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -883,9 +849,9 @@ dependencies = [ [[package]] name = "const-oid" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "795bc6e66a8e340f075fcf6227e417a2dc976b92b91f3cdc778bb858778b6747" +checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" [[package]] name = "convert_case" @@ -1014,9 +980,9 @@ dependencies = [ [[package]] name = "der" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7ed52955ce76b1554f509074bb357d3fb8ac9b51288a65a3fd480d1dfba946" +checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" dependencies = [ "const-oid", "pem-rfc7468", @@ -1149,9 +1115,9 @@ checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" [[package]] name = "flate2" -version = "1.0.26" +version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" +checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" dependencies = [ "crc32fast", "miniz_oxide", @@ -1261,7 +1227,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -1458,7 +1424,6 @@ dependencies = [ "actix-http", "actix-rt", "actix-web", - "awc", "base64 0.13.1", "futures-util", "http-signature-normalization", @@ -1470,6 +1435,22 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "http-signature-normalization-reqwest" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10cfb84663420ec12c4422820bfdf5e8e5e57467892587f43ac432e73ebce880" +dependencies = [ + "async-trait", + "base64 0.13.1", + "http-signature-normalization", + "httpdate", + "reqwest", + "reqwest-middleware", + "ring", + "thiserror", +] + [[package]] name = "httparse" version = "1.8.0" @@ -1478,9 +1459,9 @@ checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" [[package]] name = "httpdate" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "humantime" @@ -1505,7 +1486,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -1741,9 +1722,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.19" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "lru" @@ -1861,7 +1842,7 @@ checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -1870,7 +1851,7 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" dependencies = [ - "aho-corasick 1.0.2", + "aho-corasick 1.0.4", "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.13.1", @@ -2327,7 +2308,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -2381,29 +2362,29 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" [[package]] name = "pin-utils" @@ -2533,9 +2514,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.32" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -2618,13 +2599,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ - "aho-corasick 1.0.2", + "aho-corasick 1.0.4", "memchr", - "regex-automata 0.3.4", + "regex-automata 0.3.6", "regex-syntax 0.7.4", ] @@ -2639,11 +2620,11 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ - "aho-corasick 1.0.2", + "aho-corasick 1.0.4", "memchr", "regex-syntax 0.7.4", ] @@ -2702,6 +2683,37 @@ dependencies = [ "winreg", ] +[[package]] +name = "reqwest-middleware" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff44108c7925d082f2861e683a88618b68235ad9cdc60d64d9d1188efc951cdb" +dependencies = [ + "anyhow", + "async-trait", + "http", + "reqwest", + "serde", + "task-local-extensions", + "thiserror", +] + +[[package]] +name = "reqwest-tracing" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b97ad83c2fc18113346b7158d79732242002427c30f620fa817c1f32901e0a8" +dependencies = [ + "anyhow", + "async-trait", + "getrandom", + "matchit", + "reqwest", + "reqwest-middleware", + "task-local-extensions", + "tracing", +] + [[package]] name = "ring" version = "0.16.20" @@ -2827,11 +2839,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.6" +version = "0.38.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ee020b1716f0a80e2ace9b03441a749e402e86712f15f16fe8a8f75afac732f" +checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.0", "errno", "libc", "linux-raw-sys", @@ -2873,9 +2885,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.2" +version = "0.101.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" +checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" dependencies = [ "ring", "untrusted", @@ -2917,29 +2929,29 @@ checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" [[package]] name = "serde" -version = "1.0.181" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d3e73c93c3240c0bda063c239298e633114c69a888c3e37ca8bb33f343e9890" +checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.181" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be02f6cb0cd3a5ec20bbcfbcbd749f57daddb1a0882dc2e46a6c236c90b977ed" +checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] name = "serde_json" -version = "1.0.104" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c" +checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" dependencies = [ "itoa", "ryu", @@ -3082,6 +3094,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "spin" version = "0.5.2" @@ -3158,9 +3180,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.28" +version = "2.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04361975b3f5e348b2189d8dc55bc942f278b2d482a6a0365de5bdd62d351567" +checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" dependencies = [ "proc-macro2", "quote", @@ -3185,6 +3207,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20f34339676cdcab560c9a82300c4c2581f68b9369aedf0fae86f2ff9565ff3e" +[[package]] +name = "task-local-extensions" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba323866e5d033818e3240feeb9f7db2c4296674e4d9e16b97b7bf8f490434e8" +dependencies = [ + "pin-utils", +] + [[package]] name = "teloxide" version = "0.12.2" @@ -3267,22 +3298,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.44" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90" +checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.44" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" +checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -3340,11 +3371,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ - "autocfg", "backtrace", "bytes", "libc", @@ -3352,7 +3382,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.3", "tokio-macros", "tracing", "windows-sys", @@ -3376,7 +3406,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -3561,22 +3591,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", -] - -[[package]] -name = "tracing-awc" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afd0c52e66eec56d4fbddbfa1d15261ee48a78360d7d3ee3d3900c4c3489d8ad" -dependencies = [ - "actix-http", - "actix-service", - "awc", - "bytes", - "futures-core", - "pin-project-lite", - "tracing", + "syn 2.0.29", ] [[package]] @@ -3794,7 +3809,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", "wasm-bindgen-shared", ] @@ -3828,7 +3843,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3914,9 +3929,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.48.1" +version = "0.48.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" +checksum = "27f51fb4c64f8b770a823c043c7fad036323e1c48f55287b7bbb7987b2fcdf3b" dependencies = [ "windows_aarch64_gnullvm", "windows_aarch64_msvc", @@ -3929,51 +3944,51 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.48.0" +version = "0.48.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +checksum = "fde1bb55ae4ce76a597a8566d82c57432bc69c039449d61572a7a353da28f68c" [[package]] name = "windows_aarch64_msvc" -version = "0.48.0" +version = "0.48.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +checksum = "1513e8d48365a78adad7322fd6b5e4c4e99d92a69db8df2d435b25b1f1f286d4" [[package]] name = "windows_i686_gnu" -version = "0.48.0" +version = "0.48.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +checksum = "60587c0265d2b842298f5858e1a5d79d146f9ee0c37be5782e92a6eb5e1d7a83" [[package]] name = "windows_i686_msvc" -version = "0.48.0" +version = "0.48.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +checksum = "224fe0e0ffff5d2ea6a29f82026c8f43870038a0ffc247aa95a52b47df381ac4" [[package]] name = "windows_x86_64_gnu" -version = "0.48.0" +version = "0.48.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +checksum = "62fc52a0f50a088de499712cbc012df7ebd94e2d6eb948435449d76a6287e7ad" [[package]] name = "windows_x86_64_gnullvm" -version = "0.48.0" +version = "0.48.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +checksum = "2093925509d91ea3d69bcd20238f4c2ecdb1a29d3c281d026a09705d0dd35f3d" [[package]] name = "windows_x86_64_msvc" -version = "0.48.0" +version = "0.48.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +checksum = "b6ade45bc8bf02ae2aa34a9d54ba660a1a58204da34ba793c00d83ca3730b5f1" [[package]] name = "winnow" -version = "0.5.3" +version = "0.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f46aab759304e4d7b2075a9aecba26228bb073ee8c50db796b2c72c676b5d807" +checksum = "d09770118a7eb1ccaf4a594a221334119a44a814fcb0d31c5b85e83e97227a97" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 306f61f..f67b019 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,11 +28,10 @@ actix-web = { version = "4.0.1", default-features = false, features = [ "compress-brotli", "compress-gzip", ] } -actix-webfinger = "0.4.0" +actix-webfinger = { version = "0.5.0", default-features = false } activitystreams = "0.7.0-alpha.25" activitystreams-ext = "0.1.0-alpha.3" ammonia = "3.1.0" -awc = { version = "3.0.0", default-features = false, features = ["rustls"] } bcrypt = "0.15" base64 = "0.21" clap = { version = "4.0.0", features = ["derive"] } @@ -55,6 +54,9 @@ opentelemetry-otlp = "0.13" pin-project-lite = "0.2.9" quanta = "0.11.0" rand = "0.8" +reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"]} +reqwest-middleware = "0.2" +reqwest-tracing = "0.4.5" ring = "0.16.20" rsa = { version = "0.9" } rsa-magic-public-key = "0.8.0" @@ -71,7 +73,6 @@ teloxide = { version = "0.12.0", default-features = false, features = [ thiserror = "1.0" time = { version = "0.3.17", features = ["serde"] } tracing = "0.1" -tracing-awc = "0.1.8" tracing-error = "0.2" tracing-futures = "0.2" tracing-log = "0.1" @@ -92,7 +93,12 @@ features = ["background-jobs-actix", "error-logging"] [dependencies.http-signature-normalization-actix] version = "0.10.1" default-features = false -features = ["client", "server", "ring"] +features = ["server", "ring"] + +[dependencies.http-signature-normalization-reqwest] +version = "0.10.0" +default-features = false +features = ["middleware", "ring"] [dependencies.tracing-actix-web] version = "0.7.6" diff --git a/README.md b/README.md index 24b1566..72c2e22 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,6 @@ LOCAL_BLURB="

Welcome to my cool relay where I have cool relay things happenin PROMETHEUS_ADDR=0.0.0.0 PROMETHEUS_PORT=9000 CLIENT_TIMEOUT=10 -CLIENT_POOL_SIZE=20 DELIVER_CONCURRENCY=8 SIGNATURE_THREADS=2 ``` @@ -161,11 +160,6 @@ Optional - Port to bind to for serving the prometheus scrape endpoint ##### `CLIENT_TIMEOUT` Optional - How long the relay will hold open a connection (in seconds) to a remote server during fetches and deliveries. This defaults to 10 -##### `CLIENT_POOL_SIZE` -Optional - How many connections the relay should maintain per thread. This value will be multiplied -by the number of cores available to the relay. This defaults to 20, so a 4-core machine will have a -maximum of 160 simultaneous outbound connections. If you run into problems related to "Too many open -files", you can either decrease this number or increase the ulimit for your system. ##### `DELIVER_CONCURRENCY` Optional - How many deliver requests the relay should allow to be in-flight per thread. the default is 8 @@ -173,6 +167,12 @@ is 8 Optional - Override number of threads used for signing and verifying requests. Default is `std::thread::available_parallelism()` (It tries to detect how many cores you have). If it cannot detect the correct number of cores, it falls back to 1. +##### 'PROXY_URL' +Optional - URL of an HTTP proxy to forward outbound requests through +##### 'PROXY_USERNAME' +Optional - username to provide to the HTTP proxy set with `PROXY_URL` through HTTP Basic Auth +##### 'PROXY_PASSWORD' +Optional - password to provide to the HTTP proxy set with `PROXY_URL` through HTTP Basic Auth ### Subscribing Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings. diff --git a/src/admin/client.rs b/src/admin/client.rs index fdb1687..88151e1 100644 --- a/src/admin/client.rs +++ b/src/admin/client.rs @@ -3,12 +3,14 @@ use crate::{ collector::Snapshot, config::{AdminUrlKind, Config}, error::{Error, ErrorKind}, + extractors::XApiToken, }; -use awc::Client; +use actix_web::http::header::Header; +use reqwest_middleware::ClientWithMiddleware; use serde::de::DeserializeOwned; pub(crate) async fn allow( - client: &Client, + client: &ClientWithMiddleware, config: &Config, domains: Vec, ) -> Result<(), Error> { @@ -16,7 +18,7 @@ pub(crate) async fn allow( } pub(crate) async fn disallow( - client: &Client, + client: &ClientWithMiddleware, config: &Config, domains: Vec, ) -> Result<(), Error> { @@ -24,7 +26,7 @@ pub(crate) async fn disallow( } pub(crate) async fn block( - client: &Client, + client: &ClientWithMiddleware, config: &Config, domains: Vec, ) -> Result<(), Error> { @@ -32,35 +34,50 @@ pub(crate) async fn block( } pub(crate) async fn unblock( - client: &Client, + client: &ClientWithMiddleware, config: &Config, domains: Vec, ) -> Result<(), Error> { post_domains(client, config, domains, AdminUrlKind::Unblock).await } -pub(crate) async fn allowed(client: &Client, config: &Config) -> Result { +pub(crate) async fn allowed( + client: &ClientWithMiddleware, + config: &Config, +) -> Result { get_results(client, config, AdminUrlKind::Allowed).await } -pub(crate) async fn blocked(client: &Client, config: &Config) -> Result { +pub(crate) async fn blocked( + client: &ClientWithMiddleware, + config: &Config, +) -> Result { get_results(client, config, AdminUrlKind::Blocked).await } -pub(crate) async fn connected(client: &Client, config: &Config) -> Result { +pub(crate) async fn connected( + client: &ClientWithMiddleware, + config: &Config, +) -> Result { get_results(client, config, AdminUrlKind::Connected).await } -pub(crate) async fn stats(client: &Client, config: &Config) -> Result { +pub(crate) async fn stats( + client: &ClientWithMiddleware, + config: &Config, +) -> Result { get_results(client, config, AdminUrlKind::Stats).await } -pub(crate) async fn last_seen(client: &Client, config: &Config) -> Result { +pub(crate) async fn last_seen( + client: &ClientWithMiddleware, + config: &Config, +) -> Result { get_results(client, config, AdminUrlKind::LastSeen).await } async fn get_results( - client: &Client, + client: &ClientWithMiddleware, config: &Config, url_kind: AdminUrlKind, ) -> Result { @@ -68,9 +85,9 @@ async fn get_results( let iri = config.generate_admin_url(url_kind); - let mut res = client + let res = client .get(iri.as_str()) - .insert_header(x_api_token) + .header(XApiToken::name(), x_api_token.to_string()) .send() .await .map_err(|e| ErrorKind::SendRequest(iri.to_string(), e.to_string()))?; @@ -88,7 +105,7 @@ async fn get_results( } async fn post_domains( - client: &Client, + client: &ClientWithMiddleware, config: &Config, domains: Vec, url_kind: AdminUrlKind, @@ -99,8 +116,9 @@ async fn post_domains( let res = client .post(iri.as_str()) - .insert_header(x_api_token) - .send_json(&Domains { domains }) + .header(XApiToken::name(), x_api_token.to_string()) + .json(&Domains { domains }) + .send() .await .map_err(|e| ErrorKind::SendRequest(iri.to_string(), e.to_string()))?; diff --git a/src/config.rs b/src/config.rs index 9dfa589..35520fd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -46,7 +46,9 @@ pub(crate) struct ParsedConfig { prometheus_port: Option, deliver_concurrency: u64, client_timeout: u64, - client_pool_size: usize, + proxy_url: Option, + proxy_username: Option, + proxy_password: Option, signature_threads: Option, } @@ -73,7 +75,7 @@ pub struct Config { prometheus_config: Option, deliver_concurrency: u64, client_timeout: u64, - client_pool_size: usize, + proxy_config: Option, signature_threads: Option, } @@ -89,6 +91,12 @@ struct PrometheusConfig { port: u16, } +#[derive(Clone, Debug)] +struct ProxyConfig { + url: IriString, + auth: Option<(String, String)>, +} + #[derive(Debug)] pub enum UrlKind { Activity, @@ -144,7 +152,7 @@ impl std::fmt::Debug for Config { .field("prometheus_config", &self.prometheus_config) .field("deliver_concurrency", &self.deliver_concurrency) .field("client_timeout", &self.client_timeout) - .field("client_pool_size", &self.client_pool_size) + .field("proxy_config", &self.proxy_config) .field("signature_threads", &self.signature_threads) .finish() } @@ -177,7 +185,9 @@ impl Config { .set_default("prometheus_port", None as Option)? .set_default("deliver_concurrency", 8u64)? .set_default("client_timeout", 10u64)? - .set_default("client_pool_size", 20u64)? + .set_default("proxy_url", None as Option<&str>)? + .set_default("proxy_username", None as Option<&str>)? + .set_default("proxy_password", None as Option<&str>)? .set_default("signature_threads", None as Option)? .add_source(Environment::default()) .build()?; @@ -220,6 +230,26 @@ impl Config { (None, None) => None, }; + let proxy_config = match (config.proxy_username, config.proxy_password) { + (Some(username), Some(password)) => config.proxy_url.map(|url| ProxyConfig { + url, + auth: Some((username, password)), + }), + (Some(_), None) => { + tracing::warn!( + "PROXY_USERNAME is set but PROXY_PASSWORD is not set, not setting Proxy Auth" + ); + config.proxy_url.map(|url| ProxyConfig { url, auth: None }) + } + (None, Some(_)) => { + tracing::warn!( + "PROXY_PASSWORD is set but PROXY_USERNAME is not set, not setting Proxy Auth" + ); + config.proxy_url.map(|url| ProxyConfig { url, auth: None }) + } + (None, None) => config.proxy_url.map(|url| ProxyConfig { url, auth: None }), + }; + let source_url = match Self::git_hash() { Some(hash) => format!( "{}{}{hash}", @@ -252,7 +282,7 @@ impl Config { prometheus_config, deliver_concurrency: config.deliver_concurrency, client_timeout: config.client_timeout, - client_pool_size: config.client_pool_size, + proxy_config, signature_threads: config.signature_threads, }) } @@ -468,8 +498,10 @@ impl Config { ) } - pub(crate) fn client_pool_size(&self) -> usize { - self.client_pool_size + pub(crate) fn proxy_config(&self) -> Option<(&IriString, Option<(&str, &str)>)> { + self.proxy_config.as_ref().map(|ProxyConfig { url, auth }| { + (url, auth.as_ref().map(|(u, p)| (u.as_str(), p.as_str()))) + }) } pub(crate) fn source_code(&self) -> &IriString { diff --git a/src/data/state.rs b/src/data/state.rs index 19798d3..d3b699d 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -1,5 +1,4 @@ use crate::{ - config::{Config, UrlKind}, data::NodeCache, db::Db, error::Error, @@ -10,6 +9,7 @@ use activitystreams::iri_string::types::IriString; use actix_web::web; use lru::LruCache; use rand::thread_rng; +use reqwest_middleware::ClientWithMiddleware; use rsa::{RsaPrivateKey, RsaPublicKey}; use std::sync::{Arc, RwLock}; @@ -17,10 +17,10 @@ use super::LastOnline; #[derive(Clone)] pub struct State { + pub(crate) requests: Requests, pub(crate) public_key: RsaPublicKey, - private_key: RsaPrivateKey, object_cache: Arc>>, - node_cache: NodeCache, + pub(crate) node_cache: NodeCache, breakers: Breakers, pub(crate) last_online: Arc, pub(crate) db: Db, @@ -37,23 +37,6 @@ impl std::fmt::Debug for State { } impl State { - pub(crate) fn node_cache(&self) -> NodeCache { - self.node_cache.clone() - } - - pub(crate) fn requests(&self, config: &Config, spawner: Spawner) -> Requests { - Requests::new( - config.generate_url(UrlKind::MainKey).to_string(), - self.private_key.clone(), - config.user_agent(), - self.breakers.clone(), - self.last_online.clone(), - config.client_pool_size(), - config.client_timeout(), - spawner, - ) - } - #[tracing::instrument( level = "debug", name = "Get inboxes for other domains", @@ -98,7 +81,12 @@ impl State { } #[tracing::instrument(level = "debug", name = "Building state", skip_all)] - pub(crate) async fn build(db: Db) -> Result { + pub(crate) async fn build( + db: Db, + key_id: String, + spawner: Spawner, + client: ClientWithMiddleware, + ) -> Result { let private_key = if let Ok(Some(key)) = db.private_key().await { tracing::debug!("Using existing key"); key @@ -117,16 +105,28 @@ impl State { let public_key = private_key.to_public_key(); - let state = State { - public_key, + let breakers = Breakers::default(); + let last_online = Arc::new(LastOnline::empty()); + + let requests = Requests::new( + key_id, private_key, + breakers.clone(), + last_online.clone(), + spawner, + client, + ); + + let state = State { + requests, + public_key, object_cache: Arc::new(RwLock::new(LruCache::new( (1024 * 8).try_into().expect("nonzero"), ))), node_cache: NodeCache::new(db.clone()), - breakers: Breakers::default(), + breakers, db, - last_online: Arc::new(LastOnline::empty()), + last_online, }; Ok(state) diff --git a/src/error.rs b/src/error.rs index e009a7c..e1461db 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,7 +5,7 @@ use actix_web::{ http::StatusCode, HttpResponse, }; -use http_signature_normalization_actix::PrepareSignError; +use http_signature_normalization_reqwest::SignError; use std::{convert::Infallible, fmt::Debug, io}; use tracing_error::SpanTrace; @@ -84,6 +84,12 @@ pub(crate) enum ErrorKind { #[error("Couldn't sign request")] SignRequest, + #[error("Couldn't make request")] + Reqwest(#[from] reqwest::Error), + + #[error("Couldn't build client")] + ReqwestMiddleware(#[from] reqwest_middleware::Error), + #[error("Couldn't parse IRI, {0}")] ParseIri(#[from] activitystreams::iri_string::validate::Error), @@ -102,8 +108,8 @@ pub(crate) enum ErrorKind { #[error("Couldn't do the json thing, {0}")] Json(#[from] serde_json::Error), - #[error("Couldn't build signing string, {0}")] - PrepareSign(#[from] PrepareSignError), + #[error("Couldn't sign request, {0}")] + Sign(#[from] SignError), #[error("Couldn't sign digest")] Signature(#[from] rsa::signature::Error), @@ -251,3 +257,9 @@ impl From for ErrorKind { Self::Canceled } } + +impl From for ErrorKind { + fn from(_: http_signature_normalization_reqwest::Canceled) -> Self { + Self::Canceled + } +} diff --git a/src/extractors.rs b/src/extractors.rs index 2301ee4..3226398 100644 --- a/src/extractors.rs +++ b/src/extractors.rs @@ -243,3 +243,9 @@ impl FromStr for XApiToken { Ok(XApiToken(s.to_string())) } } + +impl std::fmt::Display for XApiToken { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} diff --git a/src/jobs.rs b/src/jobs.rs index 7635b3c..6298938 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -14,11 +14,9 @@ pub(crate) use self::{ use crate::{ config::Config, - data::{ActorCache, MediaCache, NodeCache, State}, + data::{ActorCache, MediaCache, State}, error::{Error, ErrorKind}, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, - requests::Requests, - spawner::Spawner, }; use background_jobs::{ memory_storage::{ActixTimer, Storage}, @@ -45,7 +43,6 @@ pub(crate) fn create_workers( actors: ActorCache, media: MediaCache, config: Config, - spawner: Spawner, ) -> JobServer { let deliver_concurrency = config.deliver_concurrency(); @@ -56,7 +53,6 @@ pub(crate) fn create_workers( JobServer::new(queue_handle), media.clone(), config.clone(), - spawner.clone(), ) }) .register::() @@ -84,12 +80,10 @@ pub(crate) fn create_workers( #[derive(Clone, Debug)] pub(crate) struct JobState { - requests: Requests, state: State, actors: ActorCache, config: Config, media: MediaCache, - node_cache: NodeCache, job_server: JobServer, } @@ -113,15 +107,12 @@ impl JobState { job_server: JobServer, media: MediaCache, config: Config, - spawner: Spawner, ) -> Self { JobState { - requests: state.requests(&config, spawner), - node_cache: state.node_cache(), + state, actors, config, media, - state, job_server, } } diff --git a/src/jobs/contact.rs b/src/jobs/contact.rs index 3c880af..f59affe 100644 --- a/src/jobs/contact.rs +++ b/src/jobs/contact.rs @@ -32,6 +32,7 @@ impl QueryContact { async fn perform(self, state: JobState) -> Result<(), Error> { let contact_outdated = state + .state .node_cache .is_contact_outdated(self.actor_id.clone()) .await; @@ -41,6 +42,7 @@ impl QueryContact { } let contact = match state + .state .requests .fetch::(&self.contact_id) .await @@ -57,6 +59,7 @@ impl QueryContact { to_contact(contact).ok_or(ErrorKind::Extract("contact"))?; state + .state .node_cache .set_contact(self.actor_id, username, display_name, url, avatar) .await?; diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index 72f4aec..10c5fce 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -35,7 +35,7 @@ impl Deliver { #[tracing::instrument(name = "Deliver", skip(state))] async fn permform(self, state: JobState) -> Result<(), Error> { - if let Err(e) = state.requests.deliver(&self.to, &self.data).await { + if let Err(e) = state.state.requests.deliver(&self.to, &self.data).await { if e.is_breaker() { tracing::debug!("Not trying due to failed breaker"); return Ok(()); diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 59826b5..2b44e60 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -40,6 +40,7 @@ impl QueryInstance { InstanceApiType::Mastodon => { let mastodon_instance_uri = iri!(format!("{scheme}://{authority}/api/v1/instance")); state + .state .requests .fetch_json::(&mastodon_instance_uri) .await @@ -47,6 +48,7 @@ impl QueryInstance { InstanceApiType::Misskey => { let msky_meta_uri = iri!(format!("{scheme}://{authority}/api/meta")); state + .state .requests .fetch_json_msky::(&msky_meta_uri) .await @@ -58,10 +60,12 @@ impl QueryInstance { #[tracing::instrument(name = "Query instance", skip(state))] async fn perform(self, state: JobState) -> Result<(), Error> { let contact_outdated = state + .state .node_cache .is_contact_outdated(self.actor_id.clone()) .await; let instance_outdated = state + .state .node_cache .is_instance_outdated(self.actor_id.clone()) .await; @@ -123,6 +127,7 @@ impl QueryInstance { let avatar = state.config.generate_url(UrlKind::Media(uuid)); state + .state .node_cache .set_contact( self.actor_id.clone(), @@ -137,6 +142,7 @@ impl QueryInstance { let description = ammonia::clean(&description); state + .state .node_cache .set_instance( self.actor_id, diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index fa4812f..60f41ce 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -27,6 +27,7 @@ impl QueryNodeinfo { #[tracing::instrument(name = "Query node info", skip(state))] async fn perform(self, state: JobState) -> Result<(), Error> { if !state + .state .node_cache .is_nodeinfo_outdated(self.actor_id.clone()) .await @@ -42,6 +43,7 @@ impl QueryNodeinfo { let well_known_uri = iri!(format!("{scheme}://{authority}/.well-known/nodeinfo")); let well_known = match state + .state .requests .fetch_json::(&well_known_uri) .await @@ -60,7 +62,7 @@ impl QueryNodeinfo { return Ok(()); }; - let nodeinfo = match state.requests.fetch_json::(&href).await { + let nodeinfo = match state.state.requests.fetch_json::(&href).await { Ok(nodeinfo) => nodeinfo, Err(e) if e.is_breaker() => { tracing::debug!("Not retrying due to failed breaker"); @@ -70,6 +72,7 @@ impl QueryNodeinfo { }; state + .state .node_cache .set_info( self.actor_id.clone(), diff --git a/src/main.rs b/src/main.rs index 7832880..482a260 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,21 @@ // need this for ructe #![allow(clippy::needless_borrow)] +use std::time::Duration; + use activitystreams::iri_string::types::IriString; use actix_rt::task::JoinHandle; use actix_web::{middleware::Compress, web, App, HttpServer}; use collector::MemoryCollector; #[cfg(feature = "console")] use console_subscriber::ConsoleLayer; +use error::Error; use http_signature_normalization_actix::middleware::VerifySignature; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_util::layers::FanoutBuilder; use opentelemetry::{sdk::Resource, KeyValue}; use opentelemetry_otlp::WithExportConfig; +use reqwest_middleware::ClientWithMiddleware; use rustls::ServerConfig; use tracing_actix_web::TracingLogger; use tracing_error::ErrorLayer; @@ -34,6 +38,8 @@ mod routes; mod spawner; mod telegram; +use crate::config::UrlKind; + use self::{ args::Args, config::Config, @@ -100,6 +106,38 @@ fn init_subscriber( Ok(()) } +fn build_client( + user_agent: &str, + timeout_seconds: u64, + proxy: Option<(&IriString, Option<(&str, &str)>)>, +) -> Result { + let builder = reqwest::Client::builder().user_agent(user_agent.to_string()); + + let builder = if let Some((url, auth)) = proxy { + let proxy = reqwest::Proxy::all(url.as_str())?; + + let proxy = if let Some((username, password)) = auth { + proxy.basic_auth(username, password) + } else { + proxy + }; + + builder.proxy(proxy) + } else { + builder + }; + + let client = builder + .timeout(Duration::from_secs(timeout_seconds)) + .build()?; + + let client_with_middleware = reqwest_middleware::ClientBuilder::new(client) + .with(reqwest_tracing::TracingMiddleware::default()) + .build(); + + Ok(client_with_middleware) +} + #[actix_rt::main] async fn main() -> Result<(), anyhow::Error> { dotenv::dotenv().ok(); @@ -150,11 +188,11 @@ fn client_main(config: Config, args: Args) -> JoinHandle Result<(), anyhow::Error> { - let client = requests::build_client( + let client = build_client( &config.user_agent(), - config.client_pool_size(), config.client_timeout(), - ); + config.proxy_config(), + )?; if !args.blocks().is_empty() || !args.allowed().is_empty() { if args.undo() { @@ -251,15 +289,13 @@ async fn do_server_main( collector: MemoryCollector, config: Config, ) -> Result<(), anyhow::Error> { + let client = build_client( + &config.user_agent(), + config.client_timeout(), + config.proxy_config(), + )?; + tracing::warn!("Creating state"); - let state = State::build(db.clone()).await?; - - if let Some((token, admin_handle)) = config.telegram_info() { - tracing::warn!("Creating telegram handler"); - telegram::start(admin_handle.to_owned(), db.clone(), token); - } - - let keys = config.open_keys()?; let (signature_threads, verify_threads) = match config.signature_threads() { 0 | 1 => (1, 1), @@ -272,26 +308,29 @@ async fn do_server_main( } }; - let spawner = Spawner::build("sign-cpu", signature_threads)?; let verify_spawner = Spawner::build("verify-cpu", verify_threads)?; + let sign_spawner = Spawner::build("sign-cpu", signature_threads)?; + + let key_id = config.generate_url(UrlKind::MainKey).to_string(); + let state = State::build(db.clone(), key_id, sign_spawner, client).await?; + + if let Some((token, admin_handle)) = config.telegram_info() { + tracing::warn!("Creating telegram handler"); + telegram::start(admin_handle.to_owned(), db.clone(), token); + } + + let keys = config.open_keys()?; let bind_address = config.bind_address(); let server = HttpServer::new(move || { - let requests = state.requests(&config, spawner.clone()); - - let job_server = create_workers( - state.clone(), - actors.clone(), - media.clone(), - config.clone(), - spawner.clone(), - ); + let job_server = + create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); let app = App::new() .app_data(web::Data::new(db.clone())) .app_data(web::Data::new(state.clone())) .app_data(web::Data::new( - requests.clone().spawner(verify_spawner.clone()), + state.requests.clone().spawner(verify_spawner.clone()), )) .app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(config.clone())) @@ -317,7 +356,7 @@ async fn do_server_main( .wrap(config.digest_middleware().spawner(verify_spawner.clone())) .wrap(VerifySignature::new( MyVerify( - requests.spawner(verify_spawner.clone()), + state.requests.clone().spawner(verify_spawner.clone()), actors.clone(), state.clone(), verify_spawner.clone(), diff --git a/src/requests.rs b/src/requests.rs index 8b77028..1921f27 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -5,10 +5,10 @@ use crate::{ }; use activitystreams::iri_string::types::IriString; use actix_web::http::header::Date; -use awc::{error::SendRequestError, Client, ClientResponse, Connector}; use base64::{engine::general_purpose::STANDARD, Engine}; use dashmap::DashMap; -use http_signature_normalization_actix::{digest::ring::Sha256, prelude::*}; +use http_signature_normalization_reqwest::{digest::ring::Sha256, prelude::*}; +use reqwest_middleware::ClientWithMiddleware; use ring::{ rand::SystemRandom, signature::{RsaKeyPair, RSA_PKCS1_SHA256}, @@ -18,7 +18,6 @@ use std::{ sync::Arc, time::{Duration, SystemTime}, }; -use tracing_awc::Tracing; const ONE_SECOND: u64 = 1; const ONE_MINUTE: u64 = 60 * ONE_SECOND; @@ -139,10 +138,8 @@ impl Default for Breaker { #[derive(Clone)] pub(crate) struct Requests { - pool_size: usize, - client: Client, + client: ClientWithMiddleware, key_id: String, - user_agent: String, private_key: Arc, rng: SystemRandom, config: Config, @@ -153,66 +150,39 @@ pub(crate) struct Requests { impl std::fmt::Debug for Requests { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Requests") - .field("pool_size", &self.pool_size) .field("key_id", &self.key_id) - .field("user_agent", &self.user_agent) .field("config", &self.config) .field("breakers", &self.breakers) .finish() } } -thread_local! { - static CLIENT: std::cell::OnceCell = std::cell::OnceCell::new(); -} - -pub(crate) fn build_client(user_agent: &str, pool_size: usize, timeout_seconds: u64) -> Client { - CLIENT.with(|client| { - client - .get_or_init(|| { - let connector = Connector::new().limit(pool_size); - - Client::builder() - .connector(connector) - .wrap(Tracing) - .add_default_header(("User-Agent", user_agent.to_string())) - .timeout(Duration::from_secs(timeout_seconds)) - .finish() - }) - .clone() - }) -} - impl Requests { #[allow(clippy::too_many_arguments)] pub(crate) fn new( key_id: String, private_key: RsaPrivateKey, - user_agent: String, breakers: Breakers, last_online: Arc, - pool_size: usize, - timeout_seconds: u64, spawner: Spawner, + client: ClientWithMiddleware, ) -> Self { let private_key_der = private_key.to_pkcs1_der().expect("Can encode der"); let private_key = ring::signature::RsaKeyPair::from_der(private_key_der.as_bytes()) .expect("Key is valid"); Requests { - pool_size, - client: build_client(&user_agent, pool_size, timeout_seconds), + client, key_id, - user_agent, private_key: Arc::new(private_key), rng: SystemRandom::new(), - config: Config::new().mastodon_compat().spawner(spawner), + config: Config::new_with_spawner(spawner).mastodon_compat(), breakers, last_online, } } pub(crate) fn spawner(mut self, spawner: Spawner) -> Self { - self.config = self.config.spawner(spawner); + self.config = self.config.set_spawner(spawner); self } @@ -223,27 +193,26 @@ impl Requests { async fn check_response( &self, parsed_url: &IriString, - res: Result, - ) -> Result { + res: Result, + ) -> Result { if res.is_err() { self.breakers.fail(&parsed_url); } - let mut res = - res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?; + let res = res?; - if res.status().is_server_error() { + let status = res.status(); + + if status.is_server_error() { self.breakers.fail(&parsed_url); - if let Ok(bytes) = res.body().await { - if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - if !s.is_empty() { - tracing::debug!("Response from {parsed_url}, {s}"); - } + if let Ok(s) = res.text().await { + if !s.is_empty() { + tracing::debug!("Response from {parsed_url}, {s}"); } } - return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into()); + return Err(ErrorKind::Status(parsed_url.to_string(), status).into()); } self.last_online.mark_seen(&parsed_url); @@ -265,21 +234,18 @@ impl Requests { where T: serde::de::DeserializeOwned, { - let mut res = self + let body = self .do_deliver( url, &serde_json::json!({}), "application/json", "application/json", ) + .await? + .bytes() .await?; - let body = res - .body() - .await - .map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?; - - Ok(serde_json::from_slice(body.as_ref())?) + Ok(serde_json::from_slice(&body)?) } #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))] @@ -294,18 +260,13 @@ impl Requests { where T: serde::de::DeserializeOwned, { - let mut res = self.do_fetch_response(url, accept).await?; + let body = self.do_fetch_response(url, accept).await?.bytes().await?; - let body = res - .body() - .await - .map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?; - - Ok(serde_json::from_slice(body.as_ref())?) + Ok(serde_json::from_slice(&body)?) } #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))] - pub(crate) async fn fetch_response(&self, url: &IriString) -> Result { + pub(crate) async fn fetch_response(&self, url: &IriString) -> Result { self.do_fetch_response(url, "*/*").await } @@ -313,7 +274,7 @@ impl Requests { &self, url: &IriString, accept: &str, - ) -> Result { + ) -> Result { if !self.breakers.should_try(url) { return Err(ErrorKind::Breaker.into()); } @@ -321,23 +282,18 @@ impl Requests { let signer = self.signer(); let span = tracing::Span::current(); - let res = self + let request = self .client .get(url.as_str()) - .insert_header(("Accept", accept)) - .insert_header(Date(SystemTime::now().into())) - .no_decompress() - .signature( - self.config.clone(), - self.key_id.clone(), - move |signing_string| { - span.record("signing_string", signing_string); - span.in_scope(|| signer.sign(signing_string)) - }, - ) - .await? - .send() - .await; + .header("Accept", accept) + .header("Date", Date(SystemTime::now().into()).to_string()) + .signature(&self.config, self.key_id.clone(), move |signing_string| { + span.record("signing_string", signing_string); + span.in_scope(|| signer.sign(signing_string)) + }) + .await?; + + let res = self.client.execute(request).await; let res = self.check_response(url, res).await?; @@ -369,7 +325,7 @@ impl Requests { item: &T, content_type: &str, accept: &str, - ) -> Result + ) -> Result where T: serde::ser::Serialize + std::fmt::Debug, { @@ -381,12 +337,12 @@ impl Requests { let span = tracing::Span::current(); let item_string = serde_json::to_string(item)?; - let (req, body) = self + let request = self .client .post(inbox.as_str()) - .insert_header(("Accept", accept)) - .insert_header(("Content-Type", content_type)) - .insert_header(Date(SystemTime::now().into())) + .header("Accept", accept) + .header("Content-Type", content_type) + .header("Date", Date(SystemTime::now().into()).to_string()) .signature_with_digest( self.config.clone(), self.key_id.clone(), @@ -397,10 +353,9 @@ impl Requests { span.in_scope(|| signer.sign(signing_string)) }, ) - .await? - .split(); + .await?; - let res = req.send_body(body).await; + let res = self.client.execute(request).await; let res = self.check_response(inbox, res).await?; diff --git a/src/routes/index.rs b/src/routes/index.rs index 0d1bb23..7c94759 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -36,7 +36,7 @@ pub(crate) async fn route( state: web::Data, config: web::Data, ) -> Result { - let all_nodes = state.node_cache().nodes().await?; + let all_nodes = state.node_cache.nodes().await?; let mut nodes = Vec::new(); let mut local = Vec::new(); diff --git a/src/routes/media.rs b/src/routes/media.rs index 7cc3ed9..b99caef 100644 --- a/src/routes/media.rs +++ b/src/routes/media.rs @@ -19,7 +19,7 @@ pub(crate) async fn route( response.insert_header((name.clone(), value.clone())); } - return Ok(response.body(BodyStream::new(res))); + return Ok(response.body(BodyStream::new(res.bytes_stream()))); } Ok(HttpResponse::NotFound().finish()) diff --git a/src/spawner.rs b/src/spawner.rs index a6c4c87..3b611d7 100644 --- a/src/spawner.rs +++ b/src/spawner.rs @@ -162,3 +162,32 @@ impl Spawn for Spawner { }) } } + +impl http_signature_normalization_reqwest::Spawn for Spawner { + type Future = std::pin::Pin> + Send>> where T: Send; + + fn spawn_blocking(&self, func: Func) -> Self::Future + where + Func: FnOnce() -> Out + Send + 'static, + Out: Send + 'static, + { + let sender = self.sender.as_ref().expect("Sender exists").clone(); + + Box::pin(async move { + let (tx, rx) = flume::bounded(1); + + let _ = sender + .send_async(Box::new(move || { + if tx.try_send((func)()).is_err() { + tracing::warn!("Requestor hung up"); + metrics::increment_counter!("relay.spawner.disconnected"); + } + })) + .await; + + timer(rx.recv_async()) + .await + .map_err(|_| http_signature_normalization_reqwest::Canceled) + }) + } +}