From 5a49c1f10cd357eb6739b4206ef3a7e90bcc8591 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 21 Mar 2020 15:24:05 -0500 Subject: [PATCH] Use background jobs for delivery, support Accept and Reject --- Cargo.lock | 68 +++++++++++++++++++++----- Cargo.toml | 1 + README.md | 2 + src/apub.rs | 39 +++++++++++++-- src/error.rs | 7 ++- src/inbox.rs | 101 ++++++++++++++++++++++++++++----------- src/jobs/deliver.rs | 56 ++++++++++++++++++++++ src/jobs/deliver_many.rs | 56 ++++++++++++++++++++++ src/jobs/mod.rs | 64 +++++++++++++++++++++++++ src/main.rs | 16 ++++++- src/requests.rs | 35 ++++---------- 11 files changed, 371 insertions(+), 74 deletions(-) create mode 100644 src/jobs/deliver.rs create mode 100644 src/jobs/deliver_many.rs create mode 100644 src/jobs/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 9947ae0..d2f6cbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -452,6 +452,50 @@ dependencies = [ "serde_urlencoded", ] +[[package]] +name = "background-jobs" +version = "0.8.0-alpha.0" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e" +dependencies = [ + "background-jobs-actix", + "background-jobs-core", +] + +[[package]] +name = "background-jobs-actix" +version = "0.7.0-alpha.0" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e" +dependencies = [ + "actix", + "actix-rt", + "anyhow", + "async-trait", + "background-jobs-core", + "chrono", + "log", + "num_cpus", + "rand", + "serde 1.0.105", + "serde_json", + "thiserror", + "tokio", +] + +[[package]] +name = "background-jobs-core" +version = "0.7.0" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#3144b71abb5991643353d8e9f046a173ce9d6d4e" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "futures", + "log", + "serde 1.0.105", + "serde_json", + "thiserror", +] + [[package]] name = "backtrace" version = "0.3.45" @@ -628,6 +672,7 @@ checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2" dependencies = [ "num-integer", "num-traits 0.2.11", + "serde 1.0.105", "time 0.1.42", ] @@ -1781,6 +1826,7 @@ dependencies = [ "actix-web", "actix-webfinger", "anyhow", + "background-jobs", "base64 0.12.0", "bb8-postgres", "config", @@ -2102,9 +2148,9 @@ dependencies = [ [[package]] name = "siphasher" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83da420ee8d1a89e640d0948c646c1c088758d3a3c538f943bfa97bdac17929d" +checksum = "8e88f89a550c01e4cd809f3df4f52dc9e939f3273a2017eabd5c6d12fd98bb23" [[package]] name = "slab" @@ -2251,9 +2297,9 @@ checksum = "7c65d530b10ccaeac294f349038a597e435b18fb456aadd0840a623f83b9e941" [[package]] name = "syn" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "123bd9499cfb380418d509322d7a6d52e5315f064fe4b3ad18a53d6b92c07859" +checksum = "0df0eb663f387145cab623dea85b09c2c5b4b0aef44e945d928e682fce71bb03" dependencies = [ "proc-macro2", "quote", @@ -2303,18 +2349,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee14bf8e6767ab4c687c9e8bc003879e042a96fd67a3ba5934eadb6536bef4db" +checksum = "268c0f167625b8b0cc90a91787b158a372b4edadb31d6e20479dc787309defad" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7b51e1fbc44b5a0840be594fbc0f960be09050f2617e61e6aa43bef97cd3ef4" +checksum = "9a3ecbaa927a1d5a73d14a20af52463fa433c0727d07ef5e208f0546841d2efd" dependencies = [ "proc-macro2", "quote", @@ -2427,7 +2473,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "tokio", - "tokio-util 0.3.0", + "tokio-util 0.3.1", ] [[package]] @@ -2458,9 +2504,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af67cdce2b40f8dffb0ee04c853a24217b5d0d3e358f0f5ccc0b5332174ed9a8" +checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 8613906..9ca7d6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ actix-rt = "1.0.0" actix-web = { version = "3.0.0-alpha.1", features = ["rustls"] } actix-webfinger = "0.3.0-alpha.3" activitystreams = "0.5.0-alpha.11" +background-jobs = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] } base64 = "0.12" bb8-postgres = "0.4.0" config = "0.10.1" diff --git a/README.md b/README.md index 008940a..b04b8b9 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,8 @@ ecample, if the server is `https://relay.my.tld`, the correct URL would be `https://relay.my.tld/actor`. ### Supported Activities +- Accept Follow {self}, this is a no-op +- Reject Follow {self}, an Undo Follow is sent back - Announce {anything}, {anything} is Announced to listening servers - Create {anything}, {anything} is Announced to listening servers - Follow {self}, become a listener of the relay, a Follow will be sent back diff --git a/src/apub.rs b/src/apub.rs index 1f976f9..0f019c6 100644 --- a/src/apub.rs +++ b/src/apub.rs @@ -37,10 +37,12 @@ pub struct AnyExistingObject { #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "PascalCase")] pub enum ValidTypes { + Accept, Announce, Create, Delete, Follow, + Reject, Undo, Update, } @@ -133,20 +135,49 @@ impl ValidObjects { } } - pub fn child_object_is(&self, uri: &XsdAnyUri) -> bool { + pub fn child_object_id(&self) -> Option { match self { - ValidObjects::Id(_) => false, + ValidObjects::Id(_) => None, ValidObjects::Object(AnyExistingObject { ext, .. }) => { if let Some(o) = ext.get("object") { if let Ok(child_uri) = serde_json::from_value::(o.clone()) { - return child_uri == *uri; + return Some(child_uri); } } - false + None } } } + + pub fn child_object_is(&self, uri: &XsdAnyUri) -> bool { + if let Some(child_object_id) = self.child_object_id() { + return *uri == child_object_id; + } + false + } + + pub fn child_actor_id(&self) -> Option { + match self { + ValidObjects::Id(_) => None, + ValidObjects::Object(AnyExistingObject { ext, .. }) => { + if let Some(o) = ext.get("actor") { + if let Ok(child_uri) = serde_json::from_value::(o.clone()) { + return Some(child_uri); + } + } + + None + } + } + } + + pub fn child_actor_is(&self, uri: &XsdAnyUri) -> bool { + if let Some(child_actor_id) = self.child_actor_id() { + return *uri == child_actor_id; + } + false + } } impl AcceptedActors { diff --git a/src/error.rs b/src/error.rs index 23253a1..bcbaae4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,6 +10,9 @@ use std::{convert::Infallible, fmt::Debug, io::Error}; #[derive(Debug, thiserror::Error)] pub enum MyError { + #[error("Error queueing job, {0}")] + Queue(anyhow::Error), + #[error("Error in configuration, {0}")] Config(#[from] config::ConfigError), @@ -82,8 +85,8 @@ pub enum MyError { #[error("Couldn't receive request response")] ReceiveResponse, - #[error("Response has invalid status code")] - Status, + #[error("Response has invalid status code, {0}")] + Status(StatusCode), #[error("URI is missing domain field")] Domain, diff --git a/src/inbox.rs b/src/inbox.rs index da3d736..5701de8 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -3,6 +3,8 @@ use crate::{ config::{Config, UrlKind}, db::Db, error::MyError, + jobs::JobServer, + jobs::{Deliver, DeliverMany}, requests::Requests, responses::accepted, state::State, @@ -25,6 +27,7 @@ pub async fn inbox( state: web::Data, config: web::Data, client: web::Data, + jobs: web::Data, input: web::Json, verified: Option, digest_verified: Option, @@ -66,22 +69,74 @@ pub async fn inbox( } match input.kind { + ValidTypes::Accept => handle_accept(&config, input).await, + ValidTypes::Reject => handle_reject(&db, &config, &jobs, input, actor).await, ValidTypes::Announce | ValidTypes::Create => { - handle_announce(&state, &config, &client, input, actor).await + handle_announce(&state, &config, &jobs, input, actor).await } - ValidTypes::Follow => handle_follow(&db, &config, &client, input, actor, is_listener).await, + ValidTypes::Follow => handle_follow(&db, &config, &jobs, input, actor, is_listener).await, ValidTypes::Delete | ValidTypes::Update => { - handle_forward(&state, &client, input, actor).await + handle_forward(&state, &jobs, input, actor).await } - ValidTypes::Undo => handle_undo(&db, &state, &config, &client, input, actor).await, + ValidTypes::Undo => handle_undo(&db, &state, &config, &jobs, input, actor).await, } } +async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result { + if !input.object.is_kind("Follow") { + return Err(MyError::Kind( + input.object.kind().unwrap_or("unknown").to_owned(), + )); + } + + if !input + .object + .child_actor_is(&config.generate_url(UrlKind::Actor).parse()?) + { + return Err(MyError::WrongActor(input.object.id().to_string())); + } + + Ok(accepted(serde_json::json!({}))) +} + +async fn handle_reject( + db: &Db, + config: &Config, + jobs: &JobServer, + input: AcceptedObjects, + actor: AcceptedActors, +) -> Result { + if !input.object.is_kind("Follow") { + return Err(MyError::Kind( + input.object.kind().unwrap_or("unknown").to_owned(), + )); + } + + if !input + .object + .child_actor_is(&config.generate_url(UrlKind::Actor).parse()?) + { + return Err(MyError::WrongActor(input.object.id().to_string())); + } + + let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?; + + let inbox = actor.inbox().to_owned(); + db.remove_listener(inbox).await?; + + let undo = generate_undo_follow(config, &actor.id, &my_id)?; + + let inbox = actor.inbox().to_owned(); + jobs.queue(Deliver::new(inbox, undo.clone())?)?; + + Ok(accepted(undo)) +} + async fn handle_undo( db: &Db, state: &State, config: &Config, - client: &Requests, + jobs: &JobServer, input: AcceptedObjects, actor: AcceptedActors, ) -> Result { @@ -95,7 +150,7 @@ async fn handle_undo( } if !input.object.is_kind("Follow") { - return handle_forward(state, client, input, actor).await; + return handle_forward(state, jobs, input, actor).await; } let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?; @@ -109,26 +164,22 @@ async fn handle_undo( let undo = generate_undo_follow(config, &actor.id, &my_id)?; - let client2 = client.clone(); - let inbox = actor.inbox().clone(); - let undo2 = undo.clone(); - actix::Arbiter::spawn(async move { - let _ = client2.deliver(inbox, &undo2).await; - }); + let inbox = actor.inbox().to_owned(); + jobs.queue(Deliver::new(inbox, undo.clone())?)?; Ok(accepted(undo)) } async fn handle_forward( state: &State, - client: &Requests, + jobs: &JobServer, input: AcceptedObjects, actor: AcceptedActors, ) -> Result { let object_id = input.object.id(); let inboxes = get_inboxes(state, &actor, &object_id).await?; - client.deliver_many(inboxes, input.clone()); + jobs.queue(DeliverMany::new(inboxes, input.clone())?)?; Ok(accepted(input)) } @@ -136,7 +187,7 @@ async fn handle_forward( async fn handle_announce( state: &State, config: &Config, - client: &Requests, + jobs: &JobServer, input: AcceptedObjects, actor: AcceptedActors, ) -> Result { @@ -150,7 +201,7 @@ async fn handle_announce( let announce = generate_announce(config, &activity_id, object_id)?; let inboxes = get_inboxes(state, &actor, &object_id).await?; - client.deliver_many(inboxes, announce.clone()); + jobs.queue(DeliverMany::new(inboxes, announce.clone())?)?; state.cache(object_id.to_owned(), activity_id).await; @@ -160,7 +211,7 @@ async fn handle_announce( async fn handle_follow( db: &Db, config: &Config, - client: &Requests, + jobs: &JobServer, input: AcceptedObjects, actor: AcceptedActors, is_listener: bool, @@ -178,23 +229,15 @@ async fn handle_follow( // if following relay directly, not just following 'public', followback if input.object.is(&my_id) { let follow = generate_follow(config, &actor.id, &my_id)?; - let client2 = client.clone(); - let inbox = actor.inbox().clone(); - let follow2 = follow.clone(); - actix::Arbiter::spawn(async move { - let _ = client2.deliver(inbox, &follow2).await; - }); + let inbox = actor.inbox().to_owned(); + jobs.queue(Deliver::new(inbox, follow)?)?; } } let accept = generate_accept_follow(config, &actor.id, &input.id, &my_id)?; - let client2 = client.clone(); - let inbox = actor.inbox().clone(); - let accept2 = accept.clone(); - actix::Arbiter::spawn(async move { - let _ = client2.deliver(inbox, &accept2).await; - }); + let inbox = actor.inbox().to_owned(); + jobs.queue(Deliver::new(inbox, accept.clone())?)?; Ok(accepted(accept)) } diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs new file mode 100644 index 0000000..eebf7f7 --- /dev/null +++ b/src/jobs/deliver.rs @@ -0,0 +1,56 @@ +use crate::{error::MyError, jobs::JobState}; +use activitystreams::primitives::XsdAnyUri; +use anyhow::Error; +use background_jobs::{Job, Processor}; +use std::{future::Future, pin::Pin}; +use tokio::sync::oneshot; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct Deliver { + to: XsdAnyUri, + data: serde_json::Value, +} + +impl Deliver { + pub fn new(to: XsdAnyUri, data: T) -> Result + where + T: serde::ser::Serialize, + { + Ok(Deliver { + to, + data: serde_json::to_value(data)?, + }) + } + + async fn perform(self, state: JobState) -> Result<(), Error> { + state.requests.deliver(self.to, &self.data).await?; + + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct DeliverProcessor; + +impl Job for Deliver { + type State = JobState; + type Processor = DeliverProcessor; + type Future = Pin> + Send>>; + + fn run(self, state: Self::State) -> Self::Future { + let (tx, rx) = oneshot::channel(); + + actix::spawn(async move { + let _ = tx.send(self.perform(state).await); + }); + + Box::pin(async move { rx.await? }) + } +} + +impl Processor for DeliverProcessor { + type Job = Deliver; + + const NAME: &'static str = "DeliverProcessor"; + const QUEUE: &'static str = "default"; +} diff --git a/src/jobs/deliver_many.rs b/src/jobs/deliver_many.rs new file mode 100644 index 0000000..995c8a3 --- /dev/null +++ b/src/jobs/deliver_many.rs @@ -0,0 +1,56 @@ +use crate::{ + error::MyError, + jobs::{Deliver, JobState}, +}; +use activitystreams::primitives::XsdAnyUri; +use anyhow::Error; +use background_jobs::{Job, Processor}; +use futures::future::{ready, Ready}; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct DeliverMany { + to: Vec, + data: serde_json::Value, +} + +impl DeliverMany { + pub fn new(to: Vec, data: T) -> Result + where + T: serde::ser::Serialize, + { + Ok(DeliverMany { + to, + data: serde_json::to_value(data)?, + }) + } + + fn perform(self, state: JobState) -> Result<(), Error> { + for inbox in self.to { + state + .job_server + .queue(Deliver::new(inbox, self.data.clone())?)?; + } + + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct DeliverManyProcessor; + +impl Job for DeliverMany { + type State = JobState; + type Processor = DeliverManyProcessor; + type Future = Ready>; + + fn run(self, state: Self::State) -> Self::Future { + ready(self.perform(state)) + } +} + +impl Processor for DeliverManyProcessor { + type Job = DeliverMany; + + const NAME: &'static str = "DeliverManyProcessor"; + const QUEUE: &'static str = "default"; +} diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs new file mode 100644 index 0000000..5ba38d2 --- /dev/null +++ b/src/jobs/mod.rs @@ -0,0 +1,64 @@ +mod deliver; +mod deliver_many; +pub use self::{deliver::Deliver, deliver_many::DeliverMany}; + +use crate::{ + error::MyError, + jobs::{deliver::DeliverProcessor, deliver_many::DeliverManyProcessor}, + requests::Requests, + state::State, +}; +use background_jobs::{memory_storage::Storage, Job, QueueHandle, WorkerConfig}; + +pub fn create_server() -> JobServer { + JobServer::new(background_jobs::create_server(Storage::new())) +} + +pub fn create_workers(state: State, job_server: JobServer) { + let queue_handle = job_server.queue_handle(); + + WorkerConfig::new(move || JobState::new(state.requests(), job_server.clone())) + .register(DeliverProcessor) + .register(DeliverManyProcessor) + .set_processor_count("default", 4) + .start(queue_handle); +} + +#[derive(Clone)] +pub struct JobState { + requests: Requests, + job_server: JobServer, +} + +#[derive(Clone)] +pub struct JobServer { + inner: QueueHandle, +} + +impl JobState { + fn new(requests: Requests, job_server: JobServer) -> Self { + JobState { + requests, + job_server, + } + } +} + +impl JobServer { + fn new(queue_handle: QueueHandle) -> Self { + JobServer { + inner: queue_handle, + } + } + + pub fn queue_handle(&self) -> QueueHandle { + self.inner.clone() + } + + pub fn queue(&self, job: J) -> Result<(), MyError> + where + J: Job, + { + self.inner.queue(job).map_err(MyError::Queue) + } +} diff --git a/src/main.rs b/src/main.rs index 0180e62..bf58e8a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ mod config; mod db; mod error; mod inbox; +mod jobs; mod nodeinfo; mod notify; mod rehydrate; @@ -27,8 +28,14 @@ mod verifier; mod webfinger; use self::{ - args::Args, config::Config, db::Db, error::MyError, state::State, - templates::statics::StaticFile, webfinger::RelayResolver, + args::Args, + config::Config, + db::Db, + error::MyError, + jobs::{create_server, create_workers}, + state::State, + templates::statics::StaticFile, + webfinger::RelayResolver, }; async fn index( @@ -102,16 +109,21 @@ async fn main() -> Result<(), anyhow::Error> { rehydrate::spawn(db.clone(), state.clone()); + let job_server = create_server(); + let _ = notify::NotifyHandler::start_handler(state.clone(), pg_config.clone()); let bind_address = config.bind_address(); HttpServer::new(move || { + create_workers(state.clone(), job_server.clone()); + App::new() .wrap(Logger::default()) .data(db.clone()) .data(state.clone()) .data(state.requests()) .data(config.clone()) + .data(job_server.clone()) .service(web::resource("/").route(web::get().to(index))) .service( web::resource("/inbox") diff --git a/src/requests.rs b/src/requests.rs index b67a144..88ff91d 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -1,8 +1,6 @@ use crate::{apub::AcceptedActors, error::MyError, state::ActorCache}; use activitystreams::primitives::XsdAnyUri; -use actix::Arbiter; use actix_web::client::Client; -use futures::stream::StreamExt; use http_signature_normalization_actix::prelude::*; use log::error; use rsa::{hash::Hashes, padding::PaddingScheme, RSAPrivateKey}; @@ -67,14 +65,15 @@ impl Requests { })?; if !res.status().is_success() { - error!("Invalid status code for fetch, {}", res.status()); if let Ok(bytes) = res.body().await { if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - error!("Response, {}", s); + if !s.is_empty() { + error!("Response, {}", s); + } } } - return Err(MyError::Status); + return Err(MyError::Status(res.status())); } res.json().await.map_err(|e| { @@ -83,23 +82,6 @@ impl Requests { }) } - pub fn deliver_many(&self, inboxes: Vec, item: T) - where - T: serde::ser::Serialize + 'static, - { - let this = self.clone(); - - Arbiter::spawn(async move { - let mut unordered = futures::stream::FuturesUnordered::new(); - - for inbox in inboxes { - unordered.push(this.deliver(inbox, &item)); - } - - while let Some(_) = unordered.next().await {} - }); - } - pub async fn deliver(&self, inbox: XsdAnyUri, item: &T) -> Result<(), MyError> where T: serde::ser::Serialize, @@ -129,19 +111,20 @@ impl Requests { })?; if !res.status().is_success() { - error!("Invalid response status from {}, {}", inbox, res.status()); if let Ok(bytes) = res.body().await { if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - error!("Response, {}", s); + if !s.is_empty() { + error!("Response, {}", s); + } } } - return Err(MyError::Status); + return Err(MyError::Status(res.status())); } Ok(()) } - fn sign(&self, signing_string: &str) -> Result { + fn sign(&self, signing_string: &str) -> Result { let hashed = Sha256::digest(signing_string.as_bytes()); let bytes = self.private_key