From a52a32db8d658fe889f2cbfd62f3c27119d01caf Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 30 Mar 2020 12:10:04 -0500 Subject: [PATCH] Move apub business to jobs --- src/data/actor.rs | 2 +- src/jobs/apub/announce.rs | 77 +++++++++++ src/jobs/apub/follow.rs | 122 ++++++++++++++++++ src/jobs/apub/forward.rs | 51 ++++++++ src/jobs/apub/mod.rs | 79 ++++++++++++ src/jobs/apub/reject.rs | 46 +++++++ src/jobs/apub/undo.rs | 59 +++++++++ src/jobs/mod.rs | 12 ++ src/main.rs | 4 +- src/routes/inbox.rs | 264 +++++--------------------------------- 10 files changed, 481 insertions(+), 235 deletions(-) create mode 100644 src/jobs/apub/follow.rs create mode 100644 src/jobs/apub/forward.rs create mode 100644 src/jobs/apub/reject.rs create mode 100644 src/jobs/apub/undo.rs diff --git a/src/data/actor.rs b/src/data/actor.rs index 5a0a79b..f0b51f9 100644 --- a/src/data/actor.rs +++ b/src/data/actor.rs @@ -293,7 +293,7 @@ impl ActorCache { } } -#[derive(Clone)] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Actor { pub id: XsdAnyUri, pub public_key: String, diff --git a/src/jobs/apub/announce.rs b/src/jobs/apub/announce.rs index e69de29..e85b567 100644 --- a/src/jobs/apub/announce.rs +++ b/src/jobs/apub/announce.rs @@ -0,0 +1,77 @@ +use crate::{ + config::{Config, UrlKind}, + data::Actor, + error::MyError, + jobs::{ + apub::{get_inboxes, prepare_activity}, + DeliverMany, JobState, + }, +}; +use activitystreams::primitives::XsdAnyUri; +use background_jobs::{ActixJob, Processor}; +use std::{future::Future, pin::Pin}; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct Announce { + object_id: XsdAnyUri, + actor: Actor, +} + +#[derive(Clone, Debug)] +pub struct AnnounceProcessor; + +impl Announce { + pub fn new(object_id: XsdAnyUri, actor: Actor) -> Self { + Announce { object_id, actor } + } + + async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { + let activity_id: XsdAnyUri = state.config.generate_url(UrlKind::Activity).parse()?; + + let announce = generate_announce(&state.config, &activity_id, &self.object_id)?; + let inboxes = get_inboxes(&state.state, &self.actor, &self.object_id).await?; + state + .job_server + .queue(DeliverMany::new(inboxes, announce)?)?; + + state.state.cache(self.object_id, activity_id).await; + Ok(()) + } +} + +// Generate a type that says "Look at this object" +fn generate_announce( + config: &Config, + activity_id: &XsdAnyUri, + object_id: &XsdAnyUri, +) -> Result { + let mut announce = activitystreams::activity::Announce::default(); + + announce + .announce_props + .set_object_xsd_any_uri(object_id.clone())? + .set_actor_xsd_any_uri(config.generate_url(UrlKind::Actor))?; + + prepare_activity( + announce, + activity_id.clone(), + config.generate_url(UrlKind::Followers), + ) +} + +impl ActixJob for Announce { + type Processor = AnnounceProcessor; + type State = JobState; + type Future = Pin>>>; + + fn run(self, state: Self::State) -> Self::Future { + Box::pin(self.perform(state)) + } +} + +impl Processor for AnnounceProcessor { + type Job = Announce; + + const NAME: &'static str = "AnnounceProcessor"; + const QUEUE: &'static str = "default"; +} diff --git a/src/jobs/apub/follow.rs b/src/jobs/apub/follow.rs new file mode 100644 index 0000000..86d7364 --- /dev/null +++ b/src/jobs/apub/follow.rs @@ -0,0 +1,122 @@ +use crate::{ + apub::AcceptedObjects, + config::{Config, UrlKind}, + data::Actor, + error::MyError, + jobs::{apub::prepare_activity, Deliver, JobState}, +}; +use activitystreams::primitives::XsdAnyUri; +use background_jobs::{ActixJob, Processor}; +use std::{future::Future, pin::Pin}; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct Follow { + is_listener: bool, + input: AcceptedObjects, + actor: Actor, +} + +#[derive(Clone, Debug)] +pub struct FollowProcessor; + +impl Follow { + pub fn new(is_listener: bool, input: AcceptedObjects, actor: Actor) -> Self { + Follow { + is_listener, + input, + actor, + } + } + + async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { + if !self.is_listener { + state.db.add_listener(self.actor.inbox.clone()).await?; + } + let my_id: XsdAnyUri = state.config.generate_url(UrlKind::Actor).parse()?; + + // if following relay directly, not just following 'public', followback + if self.input.object.is(&my_id) && !state.actors.is_following(&self.actor.id).await { + let follow = generate_follow(&state.config, &self.actor.id, &my_id)?; + state + .job_server + .queue(Deliver::new(self.actor.inbox.clone(), follow)?)?; + } + + state.actors.follower(&self.actor).await?; + + let accept = generate_accept_follow(&state.config, &self.actor.id, &self.input.id, &my_id)?; + + state + .job_server + .queue(Deliver::new(self.actor.inbox, accept)?)?; + Ok(()) + } +} + +// Generate a type that says "I want to follow you" +fn generate_follow( + config: &Config, + actor_id: &XsdAnyUri, + my_id: &XsdAnyUri, +) -> Result { + let mut follow = activitystreams::activity::Follow::default(); + + follow + .follow_props + .set_object_xsd_any_uri(actor_id.clone())? + .set_actor_xsd_any_uri(my_id.clone())?; + + prepare_activity( + follow, + config.generate_url(UrlKind::Activity), + actor_id.clone(), + ) +} + +// Generate a type that says "I accept your follow request" +fn generate_accept_follow( + config: &Config, + actor_id: &XsdAnyUri, + input_id: &XsdAnyUri, + my_id: &XsdAnyUri, +) -> Result { + let mut accept = activitystreams::activity::Accept::default(); + + accept + .accept_props + .set_actor_xsd_any_uri(my_id.clone())? + .set_object_base_box({ + let mut follow = activitystreams::activity::Follow::default(); + + follow.object_props.set_id(input_id.clone())?; + follow + .follow_props + .set_object_xsd_any_uri(my_id.clone())? + .set_actor_xsd_any_uri(actor_id.clone())?; + + follow + })?; + + prepare_activity( + accept, + config.generate_url(UrlKind::Activity), + actor_id.clone(), + ) +} + +impl ActixJob for Follow { + type Processor = FollowProcessor; + type State = JobState; + type Future = Pin>>>; + + fn run(self, state: Self::State) -> Self::Future { + Box::pin(self.perform(state)) + } +} + +impl Processor for FollowProcessor { + type Job = Follow; + + const NAME: &'static str = "FollowProcessor"; + const QUEUE: &'static str = "default"; +} diff --git a/src/jobs/apub/forward.rs b/src/jobs/apub/forward.rs new file mode 100644 index 0000000..63e9a52 --- /dev/null +++ b/src/jobs/apub/forward.rs @@ -0,0 +1,51 @@ +use crate::{ + apub::AcceptedObjects, + data::Actor, + jobs::{apub::get_inboxes, DeliverMany, JobState}, +}; +use background_jobs::{ActixJob, Processor}; +use std::{future::Future, pin::Pin}; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct Forward { + input: AcceptedObjects, + actor: Actor, +} + +#[derive(Clone, Debug)] +pub struct ForwardProcessor; + +impl Forward { + pub fn new(input: AcceptedObjects, actor: Actor) -> Self { + Forward { input, actor } + } + + async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { + let object_id = self.input.object.id(); + + let inboxes = get_inboxes(&state.state, &self.actor, object_id).await?; + + state + .job_server + .queue(DeliverMany::new(inboxes, self.input)?)?; + + Ok(()) + } +} + +impl ActixJob for Forward { + type Processor = ForwardProcessor; + type State = JobState; + type Future = Pin>>>; + + fn run(self, state: Self::State) -> Self::Future { + Box::pin(self.perform(state)) + } +} + +impl Processor for ForwardProcessor { + type Job = Forward; + + const NAME: &'static str = "ForwardProcessor"; + const QUEUE: &'static str = "default"; +} diff --git a/src/jobs/apub/mod.rs b/src/jobs/apub/mod.rs index 0706c10..6bab73c 100644 --- a/src/jobs/apub/mod.rs +++ b/src/jobs/apub/mod.rs @@ -1 +1,80 @@ +use crate::{ + config::{Config, UrlKind}, + data::{Actor, State}, + error::MyError, +}; +use activitystreams::{ + context, object::properties::ObjectProperties, primitives::XsdAnyUri, security, +}; +use std::convert::TryInto; + mod announce; +mod follow; +mod forward; +mod reject; +mod undo; + +pub use self::{ + announce::{Announce, AnnounceProcessor}, + follow::{Follow, FollowProcessor}, + forward::{Forward, ForwardProcessor}, + reject::{Reject, RejectProcessor}, + undo::{Undo, UndoProcessor}, +}; + +async fn get_inboxes( + state: &State, + actor: &Actor, + object_id: &XsdAnyUri, +) -> Result, MyError> { + let domain = object_id + .as_url() + .host() + .ok_or(MyError::Domain)? + .to_string(); + + Ok(state.listeners_without(&actor.inbox, &domain).await) +} + +fn prepare_activity( + mut t: T, + id: impl TryInto, + to: impl TryInto, +) -> Result +where + T: AsMut, + MyError: From + From, +{ + t.as_mut() + .set_id(id.try_into()?)? + .set_many_to_xsd_any_uris(vec![to.try_into()?])? + .set_many_context_xsd_any_uris(vec![context(), security()])?; + Ok(t) +} + +// Generate a type that says "I want to stop following you" +fn generate_undo_follow( + config: &Config, + actor_id: &XsdAnyUri, + my_id: &XsdAnyUri, +) -> Result { + let mut undo = activitystreams::activity::Undo::default(); + + undo.undo_props + .set_actor_xsd_any_uri(my_id.clone())? + .set_object_base_box({ + let mut follow = activitystreams::activity::Follow::default(); + + follow + .object_props + .set_id(config.generate_url(UrlKind::Activity))?; + follow + .follow_props + .set_actor_xsd_any_uri(actor_id.clone())? + .set_object_xsd_any_uri(actor_id.clone())?; + + follow + })?; + + prepare_activity(undo, config.generate_url(UrlKind::Actor), actor_id.clone()) +} diff --git a/src/jobs/apub/reject.rs b/src/jobs/apub/reject.rs new file mode 100644 index 0000000..68b25cc --- /dev/null +++ b/src/jobs/apub/reject.rs @@ -0,0 +1,46 @@ +use crate::{ + config::UrlKind, + data::Actor, + jobs::{apub::generate_undo_follow, Deliver, JobState}, +}; +use activitystreams::primitives::XsdAnyUri; +use background_jobs::{ActixJob, Processor}; +use std::{future::Future, pin::Pin}; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct Reject(pub Actor); + +#[derive(Clone, Debug)] +pub struct RejectProcessor; + +impl Reject { + async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { + if let Some(_) = state.actors.unfollower(&self.0).await? { + state.db.remove_listener(self.0.inbox.clone()).await?; + } + + let my_id: XsdAnyUri = state.config.generate_url(UrlKind::Actor).parse()?; + let undo = generate_undo_follow(&state.config, &self.0.id, &my_id)?; + + state.job_server.queue(Deliver::new(self.0.inbox, undo)?)?; + + Ok(()) + } +} + +impl ActixJob for Reject { + type Processor = RejectProcessor; + type State = JobState; + type Future = Pin>>>; + + fn run(self, state: Self::State) -> Self::Future { + Box::pin(self.perform(state)) + } +} + +impl Processor for RejectProcessor { + type Job = Reject; + + const NAME: &'static str = "RejectProcessor"; + const QUEUE: &'static str = "default"; +} diff --git a/src/jobs/apub/undo.rs b/src/jobs/apub/undo.rs new file mode 100644 index 0000000..713f898 --- /dev/null +++ b/src/jobs/apub/undo.rs @@ -0,0 +1,59 @@ +use crate::{ + apub::AcceptedObjects, + config::UrlKind, + data::Actor, + jobs::{apub::generate_undo_follow, Deliver, JobState}, +}; +use activitystreams::primitives::XsdAnyUri; +use background_jobs::{ActixJob, Processor}; +use std::{future::Future, pin::Pin}; + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct Undo { + input: AcceptedObjects, + actor: Actor, +} + +#[derive(Clone, Debug)] +pub struct UndoProcessor; + +impl Undo { + pub fn new(input: AcceptedObjects, actor: Actor) -> Self { + Undo { input, actor } + } + + async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { + let was_following = state.actors.is_following(&self.actor.id).await; + + if let Some(_) = state.actors.unfollower(&self.actor).await? { + state.db.remove_listener(self.actor.inbox.clone()).await?; + } + + if was_following { + let my_id: XsdAnyUri = state.config.generate_url(UrlKind::Actor).parse()?; + let undo = generate_undo_follow(&state.config, &self.actor.id, &my_id)?; + state + .job_server + .queue(Deliver::new(self.actor.inbox, undo)?)?; + } + + Ok(()) + } +} + +impl ActixJob for Undo { + type Processor = UndoProcessor; + type State = JobState; + type Future = Pin>>>; + + fn run(self, state: Self::State) -> Self::Future { + Box::pin(self.perform(state)) + } +} + +impl Processor for UndoProcessor { + type Job = Undo; + + const NAME: &'static str = "UndoProcessor"; + const QUEUE: &'static str = "default"; +} diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 802a0f4..846acfd 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -1,9 +1,11 @@ +pub mod apub; mod deliver; mod deliver_many; mod instance; mod nodeinfo; mod process_listeners; mod storage; + pub use self::{ deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, nodeinfo::QueryNodeinfo, }; @@ -35,6 +37,7 @@ pub fn create_server(db: Db) -> JobServer { } pub fn create_workers( + db: Db, state: State, actors: ActorCache, job_server: JobServer, @@ -45,6 +48,7 @@ pub fn create_workers( WorkerConfig::new(move || { JobState::new( + db.clone(), state.clone(), actors.clone(), job_server.clone(), @@ -57,12 +61,18 @@ pub fn create_workers( .register(NodeinfoProcessor) .register(InstanceProcessor) .register(ListenersProcessor) + .register(apub::AnnounceProcessor) + .register(apub::FollowProcessor) + .register(apub::ForwardProcessor) + .register(apub::RejectProcessor) + .register(apub::UndoProcessor) .set_processor_count("default", 4) .start(remote_handle); } #[derive(Clone)] pub struct JobState { + db: Db, requests: Requests, state: State, actors: ActorCache, @@ -79,6 +89,7 @@ pub struct JobServer { impl JobState { fn new( + db: Db, state: State, actors: ActorCache, job_server: JobServer, @@ -88,6 +99,7 @@ impl JobState { JobState { requests: state.requests(), node_cache: state.node_cache(), + db, actors, config, media, diff --git a/src/main.rs b/src/main.rs index 88d96b1..a6d4b37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -93,9 +93,10 @@ async fn main() -> Result<(), anyhow::Error> { let job_server = job_server.clone(); let media = media.clone(); let config = config.clone(); + let db = db.clone(); Arbiter::new().exec_fn(move || { - create_workers(state, actors, job_server, media, config); + create_workers(db, state, actors, job_server, media, config); }); } actix_rt::signal::ctrl_c().await?; @@ -108,6 +109,7 @@ async fn main() -> Result<(), anyhow::Error> { HttpServer::new(move || { if !no_jobs { create_workers( + db.clone(), state.clone(), actors.clone(), job_server.clone(), diff --git a/src/routes/inbox.rs b/src/routes/inbox.rs index 579d3b2..e3e9156 100644 --- a/src/routes/inbox.rs +++ b/src/routes/inbox.rs @@ -2,28 +2,19 @@ use crate::{ apub::{AcceptedObjects, ValidTypes}, config::{Config, UrlKind}, data::{Actor, ActorCache, State}, - db::Db, error::MyError, + jobs::apub::{Announce, Follow, Forward, Reject, Undo}, jobs::JobServer, - jobs::{Deliver, DeliverMany}, requests::Requests, routes::accepted, }; -use activitystreams::{ - activity::{Accept, Announce, Follow, Undo}, - context, - object::properties::ObjectProperties, - primitives::XsdAnyUri, - public, security, -}; +use activitystreams::{primitives::XsdAnyUri, public}; use actix_web::{web, HttpResponse}; use futures::join; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; use log::error; -use std::convert::TryInto; pub async fn route( - db: web::Data, state: web::Data, actors: web::Data, config: web::Data, @@ -70,31 +61,17 @@ pub async fn route( } match input.kind { - ValidTypes::Accept => handle_accept(&config, input).await, - ValidTypes::Reject => handle_reject(&db, &actors, &config, &jobs, input, actor).await, + ValidTypes::Accept => handle_accept(&config, input).await?, + ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?, ValidTypes::Announce | ValidTypes::Create => { - handle_announce(&state, &config, &jobs, input, actor).await + handle_announce(&state, &jobs, input, actor).await? } - ValidTypes::Follow => { - handle_follow(&db, &actors, &config, &jobs, input, actor, is_listener).await - } - ValidTypes::Delete | ValidTypes::Update => { - handle_forward(&state, &jobs, input, actor).await - } - ValidTypes::Undo => { - handle_undo( - &db, - &state, - &actors, - &config, - &jobs, - input, - actor, - is_listener, - ) - .await - } - } + ValidTypes::Follow => handle_follow(&config, &jobs, input, actor, is_listener).await?, + ValidTypes::Delete | ValidTypes::Update => handle_forward(&jobs, input, actor).await?, + ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_listener).await?, + }; + + Ok(accepted(serde_json::json!({}))) } fn valid_without_listener(input: &AcceptedObjects) -> bool { @@ -105,7 +82,7 @@ fn valid_without_listener(input: &AcceptedObjects) -> bool { } } -async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result { +async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result<(), MyError> { if !input.object.is_kind("Follow") { return Err(MyError::Kind( input.object.kind().unwrap_or("unknown").to_owned(), @@ -119,17 +96,15 @@ async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result Result { +) -> Result<(), MyError> { if !input.object.is_kind("Follow") { return Err(MyError::Kind( input.object.kind().unwrap_or("unknown").to_owned(), @@ -143,29 +118,18 @@ async fn handle_reject( return Err(MyError::WrongActor(input.object.id().to_string())); } - let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?; + jobs.queue(Reject(actor))?; - if let Some(_) = actors.unfollower(&actor).await? { - db.remove_listener(actor.inbox.clone()).await?; - } - - let undo = generate_undo_follow(config, &actor.id, &my_id)?; - - jobs.queue(Deliver::new(actor.inbox, undo.clone())?)?; - - Ok(accepted(undo)) + Ok(()) } async fn handle_undo( - db: &Db, - state: &State, - actors: &ActorCache, config: &Config, jobs: &JobServer, input: AcceptedObjects, actor: Actor, is_listener: bool, -) -> Result { +) -> Result<(), MyError> { match input.object.kind() { Some("Follow") | Some("Announce") | Some("Create") => (), _ => { @@ -177,7 +141,8 @@ async fn handle_undo( if !input.object.is_kind("Follow") { if is_listener { - return handle_forward(state, jobs, input, actor).await; + jobs.queue(Forward::new(input, actor))?; + return Ok(()); } else { return Err(MyError::Kind( input.object.kind().unwrap_or("unknown").to_owned(), @@ -192,221 +157,54 @@ async fn handle_undo( } if !is_listener { - return Ok(accepted(serde_json::json!({}))); + return Ok(()); } - let was_following = actors.is_following(&actor.id).await; - - if let Some(_) = actors.unfollower(&actor).await? { - db.remove_listener(actor.inbox.clone()).await?; - } - - if was_following { - let undo = generate_undo_follow(config, &actor.id, &my_id)?; - jobs.queue(Deliver::new(actor.inbox, undo.clone())?)?; - - return Ok(accepted(undo)); - } - - Ok(accepted(serde_json::json!({}))) + jobs.queue(Undo::new(input, actor))?; + Ok(()) } async fn handle_forward( - state: &State, jobs: &JobServer, input: AcceptedObjects, actor: Actor, -) -> Result { - let object_id = input.object.id(); +) -> Result<(), MyError> { + jobs.queue(Forward::new(input, actor))?; - let inboxes = get_inboxes(state, &actor, &object_id).await?; - jobs.queue(DeliverMany::new(inboxes, input.clone())?)?; - - Ok(accepted(input)) + Ok(()) } async fn handle_announce( state: &State, - config: &Config, jobs: &JobServer, input: AcceptedObjects, actor: Actor, -) -> Result { +) -> Result<(), MyError> { let object_id = input.object.id(); if state.is_cached(object_id).await { return Err(MyError::Duplicate); } - let activity_id: XsdAnyUri = config.generate_url(UrlKind::Activity).parse()?; + jobs.queue(Announce::new(object_id.to_owned(), actor))?; - let announce = generate_announce(config, &activity_id, object_id)?; - let inboxes = get_inboxes(state, &actor, &object_id).await?; - jobs.queue(DeliverMany::new(inboxes, announce.clone())?)?; - - state.cache(object_id.to_owned(), activity_id).await; - - Ok(accepted(announce)) + Ok(()) } async fn handle_follow( - db: &Db, - actors: &ActorCache, config: &Config, jobs: &JobServer, input: AcceptedObjects, actor: Actor, is_listener: bool, -) -> Result { +) -> Result<(), MyError> { let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?; if !input.object.is(&my_id) && !input.object.is(&public()) { return Err(MyError::WrongActor(input.object.id().to_string())); } - if !is_listener { - db.add_listener(actor.inbox.clone()).await?; - } + jobs.queue(Follow::new(is_listener, input, actor))?; - // if following relay directly, not just following 'public', followback - if input.object.is(&my_id) && !actors.is_following(&actor.id).await { - let follow = generate_follow(config, &actor.id, &my_id)?; - jobs.queue(Deliver::new(actor.inbox.clone(), follow)?)?; - } - - actors.follower(&actor).await?; - - let accept = generate_accept_follow(config, &actor.id, &input.id, &my_id)?; - - jobs.queue(Deliver::new(actor.inbox, accept.clone())?)?; - - Ok(accepted(accept)) -} - -// Generate a type that says "I want to stop following you" -fn generate_undo_follow( - config: &Config, - actor_id: &XsdAnyUri, - my_id: &XsdAnyUri, -) -> Result { - let mut undo = Undo::default(); - - undo.undo_props - .set_actor_xsd_any_uri(my_id.clone())? - .set_object_base_box({ - let mut follow = Follow::default(); - - follow - .object_props - .set_id(config.generate_url(UrlKind::Activity))?; - follow - .follow_props - .set_actor_xsd_any_uri(actor_id.clone())? - .set_object_xsd_any_uri(actor_id.clone())?; - - follow - })?; - - prepare_activity(undo, config.generate_url(UrlKind::Actor), actor_id.clone()) -} - -// Generate a type that says "Look at this object" -fn generate_announce( - config: &Config, - activity_id: &XsdAnyUri, - object_id: &XsdAnyUri, -) -> Result { - let mut announce = Announce::default(); - - announce - .announce_props - .set_object_xsd_any_uri(object_id.clone())? - .set_actor_xsd_any_uri(config.generate_url(UrlKind::Actor))?; - - prepare_activity( - announce, - activity_id.clone(), - config.generate_url(UrlKind::Followers), - ) -} - -// Generate a type that says "I want to follow you" -fn generate_follow( - config: &Config, - actor_id: &XsdAnyUri, - my_id: &XsdAnyUri, -) -> Result { - let mut follow = Follow::default(); - - follow - .follow_props - .set_object_xsd_any_uri(actor_id.clone())? - .set_actor_xsd_any_uri(my_id.clone())?; - - prepare_activity( - follow, - config.generate_url(UrlKind::Activity), - actor_id.clone(), - ) -} - -// Generate a type that says "I accept your follow request" -fn generate_accept_follow( - config: &Config, - actor_id: &XsdAnyUri, - input_id: &XsdAnyUri, - my_id: &XsdAnyUri, -) -> Result { - let mut accept = Accept::default(); - - accept - .accept_props - .set_actor_xsd_any_uri(my_id.clone())? - .set_object_base_box({ - let mut follow = Follow::default(); - - follow.object_props.set_id(input_id.clone())?; - follow - .follow_props - .set_object_xsd_any_uri(my_id.clone())? - .set_actor_xsd_any_uri(actor_id.clone())?; - - follow - })?; - - prepare_activity( - accept, - config.generate_url(UrlKind::Activity), - actor_id.clone(), - ) -} - -fn prepare_activity( - mut t: T, - id: impl TryInto, - to: impl TryInto, -) -> Result -where - T: AsMut, - MyError: From + From, -{ - t.as_mut() - .set_id(id.try_into()?)? - .set_many_to_xsd_any_uris(vec![to.try_into()?])? - .set_many_context_xsd_any_uris(vec![context(), security()])?; - Ok(t) -} - -async fn get_inboxes( - state: &State, - actor: &Actor, - object_id: &XsdAnyUri, -) -> Result, MyError> { - let domain = object_id - .as_url() - .host() - .ok_or(MyError::Domain)? - .to_string(); - - Ok(state.listeners_without(&actor.inbox, &domain).await) + Ok(()) }