Move apub business to jobs

This commit is contained in:
asonix 2020-03-30 12:10:04 -05:00
parent 680ccc511c
commit a52a32db8d
10 changed files with 481 additions and 235 deletions

View file

@ -293,7 +293,7 @@ impl ActorCache {
} }
} }
#[derive(Clone)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Actor { pub struct Actor {
pub id: XsdAnyUri, pub id: XsdAnyUri,
pub public_key: String, pub public_key: String,

View file

@ -0,0 +1,77 @@
use crate::{
config::{Config, UrlKind},
data::Actor,
error::MyError,
jobs::{
apub::{get_inboxes, prepare_activity},
DeliverMany, JobState,
},
};
use activitystreams::primitives::XsdAnyUri;
use background_jobs::{ActixJob, Processor};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Announce {
object_id: XsdAnyUri,
actor: Actor,
}
#[derive(Clone, Debug)]
pub struct AnnounceProcessor;
impl Announce {
pub fn new(object_id: XsdAnyUri, actor: Actor) -> Self {
Announce { object_id, actor }
}
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> {
let activity_id: XsdAnyUri = state.config.generate_url(UrlKind::Activity).parse()?;
let announce = generate_announce(&state.config, &activity_id, &self.object_id)?;
let inboxes = get_inboxes(&state.state, &self.actor, &self.object_id).await?;
state
.job_server
.queue(DeliverMany::new(inboxes, announce)?)?;
state.state.cache(self.object_id, activity_id).await;
Ok(())
}
}
// Generate a type that says "Look at this object"
fn generate_announce(
config: &Config,
activity_id: &XsdAnyUri,
object_id: &XsdAnyUri,
) -> Result<activitystreams::activity::Announce, MyError> {
let mut announce = activitystreams::activity::Announce::default();
announce
.announce_props
.set_object_xsd_any_uri(object_id.clone())?
.set_actor_xsd_any_uri(config.generate_url(UrlKind::Actor))?;
prepare_activity(
announce,
activity_id.clone(),
config.generate_url(UrlKind::Followers),
)
}
impl ActixJob for Announce {
type Processor = AnnounceProcessor;
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
}
}
impl Processor for AnnounceProcessor {
type Job = Announce;
const NAME: &'static str = "AnnounceProcessor";
const QUEUE: &'static str = "default";
}

122
src/jobs/apub/follow.rs Normal file
View file

@ -0,0 +1,122 @@
use crate::{
apub::AcceptedObjects,
config::{Config, UrlKind},
data::Actor,
error::MyError,
jobs::{apub::prepare_activity, Deliver, JobState},
};
use activitystreams::primitives::XsdAnyUri;
use background_jobs::{ActixJob, Processor};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Follow {
is_listener: bool,
input: AcceptedObjects,
actor: Actor,
}
#[derive(Clone, Debug)]
pub struct FollowProcessor;
impl Follow {
pub fn new(is_listener: bool, input: AcceptedObjects, actor: Actor) -> Self {
Follow {
is_listener,
input,
actor,
}
}
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> {
if !self.is_listener {
state.db.add_listener(self.actor.inbox.clone()).await?;
}
let my_id: XsdAnyUri = state.config.generate_url(UrlKind::Actor).parse()?;
// if following relay directly, not just following 'public', followback
if self.input.object.is(&my_id) && !state.actors.is_following(&self.actor.id).await {
let follow = generate_follow(&state.config, &self.actor.id, &my_id)?;
state
.job_server
.queue(Deliver::new(self.actor.inbox.clone(), follow)?)?;
}
state.actors.follower(&self.actor).await?;
let accept = generate_accept_follow(&state.config, &self.actor.id, &self.input.id, &my_id)?;
state
.job_server
.queue(Deliver::new(self.actor.inbox, accept)?)?;
Ok(())
}
}
// Generate a type that says "I want to follow you"
fn generate_follow(
config: &Config,
actor_id: &XsdAnyUri,
my_id: &XsdAnyUri,
) -> Result<activitystreams::activity::Follow, MyError> {
let mut follow = activitystreams::activity::Follow::default();
follow
.follow_props
.set_object_xsd_any_uri(actor_id.clone())?
.set_actor_xsd_any_uri(my_id.clone())?;
prepare_activity(
follow,
config.generate_url(UrlKind::Activity),
actor_id.clone(),
)
}
// Generate a type that says "I accept your follow request"
fn generate_accept_follow(
config: &Config,
actor_id: &XsdAnyUri,
input_id: &XsdAnyUri,
my_id: &XsdAnyUri,
) -> Result<activitystreams::activity::Accept, MyError> {
let mut accept = activitystreams::activity::Accept::default();
accept
.accept_props
.set_actor_xsd_any_uri(my_id.clone())?
.set_object_base_box({
let mut follow = activitystreams::activity::Follow::default();
follow.object_props.set_id(input_id.clone())?;
follow
.follow_props
.set_object_xsd_any_uri(my_id.clone())?
.set_actor_xsd_any_uri(actor_id.clone())?;
follow
})?;
prepare_activity(
accept,
config.generate_url(UrlKind::Activity),
actor_id.clone(),
)
}
impl ActixJob for Follow {
type Processor = FollowProcessor;
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
}
}
impl Processor for FollowProcessor {
type Job = Follow;
const NAME: &'static str = "FollowProcessor";
const QUEUE: &'static str = "default";
}

51
src/jobs/apub/forward.rs Normal file
View file

@ -0,0 +1,51 @@
use crate::{
apub::AcceptedObjects,
data::Actor,
jobs::{apub::get_inboxes, DeliverMany, JobState},
};
use background_jobs::{ActixJob, Processor};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Forward {
input: AcceptedObjects,
actor: Actor,
}
#[derive(Clone, Debug)]
pub struct ForwardProcessor;
impl Forward {
pub fn new(input: AcceptedObjects, actor: Actor) -> Self {
Forward { input, actor }
}
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> {
let object_id = self.input.object.id();
let inboxes = get_inboxes(&state.state, &self.actor, object_id).await?;
state
.job_server
.queue(DeliverMany::new(inboxes, self.input)?)?;
Ok(())
}
}
impl ActixJob for Forward {
type Processor = ForwardProcessor;
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
}
}
impl Processor for ForwardProcessor {
type Job = Forward;
const NAME: &'static str = "ForwardProcessor";
const QUEUE: &'static str = "default";
}

View file

@ -1 +1,80 @@
use crate::{
config::{Config, UrlKind},
data::{Actor, State},
error::MyError,
};
use activitystreams::{
context, object::properties::ObjectProperties, primitives::XsdAnyUri, security,
};
use std::convert::TryInto;
mod announce; mod announce;
mod follow;
mod forward;
mod reject;
mod undo;
pub use self::{
announce::{Announce, AnnounceProcessor},
follow::{Follow, FollowProcessor},
forward::{Forward, ForwardProcessor},
reject::{Reject, RejectProcessor},
undo::{Undo, UndoProcessor},
};
async fn get_inboxes(
state: &State,
actor: &Actor,
object_id: &XsdAnyUri,
) -> Result<Vec<XsdAnyUri>, MyError> {
let domain = object_id
.as_url()
.host()
.ok_or(MyError::Domain)?
.to_string();
Ok(state.listeners_without(&actor.inbox, &domain).await)
}
fn prepare_activity<T, U, V>(
mut t: T,
id: impl TryInto<XsdAnyUri, Error = U>,
to: impl TryInto<XsdAnyUri, Error = V>,
) -> Result<T, MyError>
where
T: AsMut<ObjectProperties>,
MyError: From<U> + From<V>,
{
t.as_mut()
.set_id(id.try_into()?)?
.set_many_to_xsd_any_uris(vec![to.try_into()?])?
.set_many_context_xsd_any_uris(vec![context(), security()])?;
Ok(t)
}
// Generate a type that says "I want to stop following you"
fn generate_undo_follow(
config: &Config,
actor_id: &XsdAnyUri,
my_id: &XsdAnyUri,
) -> Result<activitystreams::activity::Undo, MyError> {
let mut undo = activitystreams::activity::Undo::default();
undo.undo_props
.set_actor_xsd_any_uri(my_id.clone())?
.set_object_base_box({
let mut follow = activitystreams::activity::Follow::default();
follow
.object_props
.set_id(config.generate_url(UrlKind::Activity))?;
follow
.follow_props
.set_actor_xsd_any_uri(actor_id.clone())?
.set_object_xsd_any_uri(actor_id.clone())?;
follow
})?;
prepare_activity(undo, config.generate_url(UrlKind::Actor), actor_id.clone())
}

46
src/jobs/apub/reject.rs Normal file
View file

@ -0,0 +1,46 @@
use crate::{
config::UrlKind,
data::Actor,
jobs::{apub::generate_undo_follow, Deliver, JobState},
};
use activitystreams::primitives::XsdAnyUri;
use background_jobs::{ActixJob, Processor};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Reject(pub Actor);
#[derive(Clone, Debug)]
pub struct RejectProcessor;
impl Reject {
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> {
if let Some(_) = state.actors.unfollower(&self.0).await? {
state.db.remove_listener(self.0.inbox.clone()).await?;
}
let my_id: XsdAnyUri = state.config.generate_url(UrlKind::Actor).parse()?;
let undo = generate_undo_follow(&state.config, &self.0.id, &my_id)?;
state.job_server.queue(Deliver::new(self.0.inbox, undo)?)?;
Ok(())
}
}
impl ActixJob for Reject {
type Processor = RejectProcessor;
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
}
}
impl Processor for RejectProcessor {
type Job = Reject;
const NAME: &'static str = "RejectProcessor";
const QUEUE: &'static str = "default";
}

59
src/jobs/apub/undo.rs Normal file
View file

@ -0,0 +1,59 @@
use crate::{
apub::AcceptedObjects,
config::UrlKind,
data::Actor,
jobs::{apub::generate_undo_follow, Deliver, JobState},
};
use activitystreams::primitives::XsdAnyUri;
use background_jobs::{ActixJob, Processor};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Undo {
input: AcceptedObjects,
actor: Actor,
}
#[derive(Clone, Debug)]
pub struct UndoProcessor;
impl Undo {
pub fn new(input: AcceptedObjects, actor: Actor) -> Self {
Undo { input, actor }
}
async fn perform(self, state: JobState) -> Result<(), anyhow::Error> {
let was_following = state.actors.is_following(&self.actor.id).await;
if let Some(_) = state.actors.unfollower(&self.actor).await? {
state.db.remove_listener(self.actor.inbox.clone()).await?;
}
if was_following {
let my_id: XsdAnyUri = state.config.generate_url(UrlKind::Actor).parse()?;
let undo = generate_undo_follow(&state.config, &self.actor.id, &my_id)?;
state
.job_server
.queue(Deliver::new(self.actor.inbox, undo)?)?;
}
Ok(())
}
}
impl ActixJob for Undo {
type Processor = UndoProcessor;
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
}
}
impl Processor for UndoProcessor {
type Job = Undo;
const NAME: &'static str = "UndoProcessor";
const QUEUE: &'static str = "default";
}

View file

@ -1,9 +1,11 @@
pub mod apub;
mod deliver; mod deliver;
mod deliver_many; mod deliver_many;
mod instance; mod instance;
mod nodeinfo; mod nodeinfo;
mod process_listeners; mod process_listeners;
mod storage; mod storage;
pub use self::{ pub use self::{
deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, nodeinfo::QueryNodeinfo, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, nodeinfo::QueryNodeinfo,
}; };
@ -35,6 +37,7 @@ pub fn create_server(db: Db) -> JobServer {
} }
pub fn create_workers( pub fn create_workers(
db: Db,
state: State, state: State,
actors: ActorCache, actors: ActorCache,
job_server: JobServer, job_server: JobServer,
@ -45,6 +48,7 @@ pub fn create_workers(
WorkerConfig::new(move || { WorkerConfig::new(move || {
JobState::new( JobState::new(
db.clone(),
state.clone(), state.clone(),
actors.clone(), actors.clone(),
job_server.clone(), job_server.clone(),
@ -57,12 +61,18 @@ pub fn create_workers(
.register(NodeinfoProcessor) .register(NodeinfoProcessor)
.register(InstanceProcessor) .register(InstanceProcessor)
.register(ListenersProcessor) .register(ListenersProcessor)
.register(apub::AnnounceProcessor)
.register(apub::FollowProcessor)
.register(apub::ForwardProcessor)
.register(apub::RejectProcessor)
.register(apub::UndoProcessor)
.set_processor_count("default", 4) .set_processor_count("default", 4)
.start(remote_handle); .start(remote_handle);
} }
#[derive(Clone)] #[derive(Clone)]
pub struct JobState { pub struct JobState {
db: Db,
requests: Requests, requests: Requests,
state: State, state: State,
actors: ActorCache, actors: ActorCache,
@ -79,6 +89,7 @@ pub struct JobServer {
impl JobState { impl JobState {
fn new( fn new(
db: Db,
state: State, state: State,
actors: ActorCache, actors: ActorCache,
job_server: JobServer, job_server: JobServer,
@ -88,6 +99,7 @@ impl JobState {
JobState { JobState {
requests: state.requests(), requests: state.requests(),
node_cache: state.node_cache(), node_cache: state.node_cache(),
db,
actors, actors,
config, config,
media, media,

View file

@ -93,9 +93,10 @@ async fn main() -> Result<(), anyhow::Error> {
let job_server = job_server.clone(); let job_server = job_server.clone();
let media = media.clone(); let media = media.clone();
let config = config.clone(); let config = config.clone();
let db = db.clone();
Arbiter::new().exec_fn(move || { Arbiter::new().exec_fn(move || {
create_workers(state, actors, job_server, media, config); create_workers(db, state, actors, job_server, media, config);
}); });
} }
actix_rt::signal::ctrl_c().await?; actix_rt::signal::ctrl_c().await?;
@ -108,6 +109,7 @@ async fn main() -> Result<(), anyhow::Error> {
HttpServer::new(move || { HttpServer::new(move || {
if !no_jobs { if !no_jobs {
create_workers( create_workers(
db.clone(),
state.clone(), state.clone(),
actors.clone(), actors.clone(),
job_server.clone(), job_server.clone(),

View file

@ -2,28 +2,19 @@ use crate::{
apub::{AcceptedObjects, ValidTypes}, apub::{AcceptedObjects, ValidTypes},
config::{Config, UrlKind}, config::{Config, UrlKind},
data::{Actor, ActorCache, State}, data::{Actor, ActorCache, State},
db::Db,
error::MyError, error::MyError,
jobs::apub::{Announce, Follow, Forward, Reject, Undo},
jobs::JobServer, jobs::JobServer,
jobs::{Deliver, DeliverMany},
requests::Requests, requests::Requests,
routes::accepted, routes::accepted,
}; };
use activitystreams::{ use activitystreams::{primitives::XsdAnyUri, public};
activity::{Accept, Announce, Follow, Undo},
context,
object::properties::ObjectProperties,
primitives::XsdAnyUri,
public, security,
};
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use futures::join; use futures::join;
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
use log::error; use log::error;
use std::convert::TryInto;
pub async fn route( pub async fn route(
db: web::Data<Db>,
state: web::Data<State>, state: web::Data<State>,
actors: web::Data<ActorCache>, actors: web::Data<ActorCache>,
config: web::Data<Config>, config: web::Data<Config>,
@ -70,31 +61,17 @@ pub async fn route(
} }
match input.kind { match input.kind {
ValidTypes::Accept => handle_accept(&config, input).await, ValidTypes::Accept => handle_accept(&config, input).await?,
ValidTypes::Reject => handle_reject(&db, &actors, &config, &jobs, input, actor).await, ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?,
ValidTypes::Announce | ValidTypes::Create => { ValidTypes::Announce | ValidTypes::Create => {
handle_announce(&state, &config, &jobs, input, actor).await handle_announce(&state, &jobs, input, actor).await?
} }
ValidTypes::Follow => { ValidTypes::Follow => handle_follow(&config, &jobs, input, actor, is_listener).await?,
handle_follow(&db, &actors, &config, &jobs, input, actor, is_listener).await ValidTypes::Delete | ValidTypes::Update => handle_forward(&jobs, input, actor).await?,
} ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_listener).await?,
ValidTypes::Delete | ValidTypes::Update => { };
handle_forward(&state, &jobs, input, actor).await
} Ok(accepted(serde_json::json!({})))
ValidTypes::Undo => {
handle_undo(
&db,
&state,
&actors,
&config,
&jobs,
input,
actor,
is_listener,
)
.await
}
}
} }
fn valid_without_listener(input: &AcceptedObjects) -> bool { fn valid_without_listener(input: &AcceptedObjects) -> bool {
@ -105,7 +82,7 @@ fn valid_without_listener(input: &AcceptedObjects) -> bool {
} }
} }
async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result<HttpResponse, MyError> { async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result<(), MyError> {
if !input.object.is_kind("Follow") { if !input.object.is_kind("Follow") {
return Err(MyError::Kind( return Err(MyError::Kind(
input.object.kind().unwrap_or("unknown").to_owned(), input.object.kind().unwrap_or("unknown").to_owned(),
@ -119,17 +96,15 @@ async fn handle_accept(config: &Config, input: AcceptedObjects) -> Result<HttpRe
return Err(MyError::WrongActor(input.object.id().to_string())); return Err(MyError::WrongActor(input.object.id().to_string()));
} }
Ok(accepted(serde_json::json!({}))) Ok(())
} }
async fn handle_reject( async fn handle_reject(
db: &Db,
actors: &ActorCache,
config: &Config, config: &Config,
jobs: &JobServer, jobs: &JobServer,
input: AcceptedObjects, input: AcceptedObjects,
actor: Actor, actor: Actor,
) -> Result<HttpResponse, MyError> { ) -> Result<(), MyError> {
if !input.object.is_kind("Follow") { if !input.object.is_kind("Follow") {
return Err(MyError::Kind( return Err(MyError::Kind(
input.object.kind().unwrap_or("unknown").to_owned(), input.object.kind().unwrap_or("unknown").to_owned(),
@ -143,29 +118,18 @@ async fn handle_reject(
return Err(MyError::WrongActor(input.object.id().to_string())); return Err(MyError::WrongActor(input.object.id().to_string()));
} }
let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?; jobs.queue(Reject(actor))?;
if let Some(_) = actors.unfollower(&actor).await? { Ok(())
db.remove_listener(actor.inbox.clone()).await?;
}
let undo = generate_undo_follow(config, &actor.id, &my_id)?;
jobs.queue(Deliver::new(actor.inbox, undo.clone())?)?;
Ok(accepted(undo))
} }
async fn handle_undo( async fn handle_undo(
db: &Db,
state: &State,
actors: &ActorCache,
config: &Config, config: &Config,
jobs: &JobServer, jobs: &JobServer,
input: AcceptedObjects, input: AcceptedObjects,
actor: Actor, actor: Actor,
is_listener: bool, is_listener: bool,
) -> Result<HttpResponse, MyError> { ) -> Result<(), MyError> {
match input.object.kind() { match input.object.kind() {
Some("Follow") | Some("Announce") | Some("Create") => (), Some("Follow") | Some("Announce") | Some("Create") => (),
_ => { _ => {
@ -177,7 +141,8 @@ async fn handle_undo(
if !input.object.is_kind("Follow") { if !input.object.is_kind("Follow") {
if is_listener { if is_listener {
return handle_forward(state, jobs, input, actor).await; jobs.queue(Forward::new(input, actor))?;
return Ok(());
} else { } else {
return Err(MyError::Kind( return Err(MyError::Kind(
input.object.kind().unwrap_or("unknown").to_owned(), input.object.kind().unwrap_or("unknown").to_owned(),
@ -192,221 +157,54 @@ async fn handle_undo(
} }
if !is_listener { if !is_listener {
return Ok(accepted(serde_json::json!({}))); return Ok(());
} }
let was_following = actors.is_following(&actor.id).await; jobs.queue(Undo::new(input, actor))?;
Ok(())
if let Some(_) = actors.unfollower(&actor).await? {
db.remove_listener(actor.inbox.clone()).await?;
}
if was_following {
let undo = generate_undo_follow(config, &actor.id, &my_id)?;
jobs.queue(Deliver::new(actor.inbox, undo.clone())?)?;
return Ok(accepted(undo));
}
Ok(accepted(serde_json::json!({})))
} }
async fn handle_forward( async fn handle_forward(
state: &State,
jobs: &JobServer, jobs: &JobServer,
input: AcceptedObjects, input: AcceptedObjects,
actor: Actor, actor: Actor,
) -> Result<HttpResponse, MyError> { ) -> Result<(), MyError> {
let object_id = input.object.id(); jobs.queue(Forward::new(input, actor))?;
let inboxes = get_inboxes(state, &actor, &object_id).await?; Ok(())
jobs.queue(DeliverMany::new(inboxes, input.clone())?)?;
Ok(accepted(input))
} }
async fn handle_announce( async fn handle_announce(
state: &State, state: &State,
config: &Config,
jobs: &JobServer, jobs: &JobServer,
input: AcceptedObjects, input: AcceptedObjects,
actor: Actor, actor: Actor,
) -> Result<HttpResponse, MyError> { ) -> Result<(), MyError> {
let object_id = input.object.id(); let object_id = input.object.id();
if state.is_cached(object_id).await { if state.is_cached(object_id).await {
return Err(MyError::Duplicate); return Err(MyError::Duplicate);
} }
let activity_id: XsdAnyUri = config.generate_url(UrlKind::Activity).parse()?; jobs.queue(Announce::new(object_id.to_owned(), actor))?;
let announce = generate_announce(config, &activity_id, object_id)?; Ok(())
let inboxes = get_inboxes(state, &actor, &object_id).await?;
jobs.queue(DeliverMany::new(inboxes, announce.clone())?)?;
state.cache(object_id.to_owned(), activity_id).await;
Ok(accepted(announce))
} }
async fn handle_follow( async fn handle_follow(
db: &Db,
actors: &ActorCache,
config: &Config, config: &Config,
jobs: &JobServer, jobs: &JobServer,
input: AcceptedObjects, input: AcceptedObjects,
actor: Actor, actor: Actor,
is_listener: bool, is_listener: bool,
) -> Result<HttpResponse, MyError> { ) -> Result<(), MyError> {
let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?; let my_id: XsdAnyUri = config.generate_url(UrlKind::Actor).parse()?;
if !input.object.is(&my_id) && !input.object.is(&public()) { if !input.object.is(&my_id) && !input.object.is(&public()) {
return Err(MyError::WrongActor(input.object.id().to_string())); return Err(MyError::WrongActor(input.object.id().to_string()));
} }
if !is_listener { jobs.queue(Follow::new(is_listener, input, actor))?;
db.add_listener(actor.inbox.clone()).await?;
}
// if following relay directly, not just following 'public', followback Ok(())
if input.object.is(&my_id) && !actors.is_following(&actor.id).await {
let follow = generate_follow(config, &actor.id, &my_id)?;
jobs.queue(Deliver::new(actor.inbox.clone(), follow)?)?;
}
actors.follower(&actor).await?;
let accept = generate_accept_follow(config, &actor.id, &input.id, &my_id)?;
jobs.queue(Deliver::new(actor.inbox, accept.clone())?)?;
Ok(accepted(accept))
}
// Generate a type that says "I want to stop following you"
fn generate_undo_follow(
config: &Config,
actor_id: &XsdAnyUri,
my_id: &XsdAnyUri,
) -> Result<Undo, MyError> {
let mut undo = Undo::default();
undo.undo_props
.set_actor_xsd_any_uri(my_id.clone())?
.set_object_base_box({
let mut follow = Follow::default();
follow
.object_props
.set_id(config.generate_url(UrlKind::Activity))?;
follow
.follow_props
.set_actor_xsd_any_uri(actor_id.clone())?
.set_object_xsd_any_uri(actor_id.clone())?;
follow
})?;
prepare_activity(undo, config.generate_url(UrlKind::Actor), actor_id.clone())
}
// Generate a type that says "Look at this object"
fn generate_announce(
config: &Config,
activity_id: &XsdAnyUri,
object_id: &XsdAnyUri,
) -> Result<Announce, MyError> {
let mut announce = Announce::default();
announce
.announce_props
.set_object_xsd_any_uri(object_id.clone())?
.set_actor_xsd_any_uri(config.generate_url(UrlKind::Actor))?;
prepare_activity(
announce,
activity_id.clone(),
config.generate_url(UrlKind::Followers),
)
}
// Generate a type that says "I want to follow you"
fn generate_follow(
config: &Config,
actor_id: &XsdAnyUri,
my_id: &XsdAnyUri,
) -> Result<Follow, MyError> {
let mut follow = Follow::default();
follow
.follow_props
.set_object_xsd_any_uri(actor_id.clone())?
.set_actor_xsd_any_uri(my_id.clone())?;
prepare_activity(
follow,
config.generate_url(UrlKind::Activity),
actor_id.clone(),
)
}
// Generate a type that says "I accept your follow request"
fn generate_accept_follow(
config: &Config,
actor_id: &XsdAnyUri,
input_id: &XsdAnyUri,
my_id: &XsdAnyUri,
) -> Result<Accept, MyError> {
let mut accept = Accept::default();
accept
.accept_props
.set_actor_xsd_any_uri(my_id.clone())?
.set_object_base_box({
let mut follow = Follow::default();
follow.object_props.set_id(input_id.clone())?;
follow
.follow_props
.set_object_xsd_any_uri(my_id.clone())?
.set_actor_xsd_any_uri(actor_id.clone())?;
follow
})?;
prepare_activity(
accept,
config.generate_url(UrlKind::Activity),
actor_id.clone(),
)
}
fn prepare_activity<T, U, V>(
mut t: T,
id: impl TryInto<XsdAnyUri, Error = U>,
to: impl TryInto<XsdAnyUri, Error = V>,
) -> Result<T, MyError>
where
T: AsMut<ObjectProperties>,
MyError: From<U> + From<V>,
{
t.as_mut()
.set_id(id.try_into()?)?
.set_many_to_xsd_any_uris(vec![to.try_into()?])?
.set_many_context_xsd_any_uris(vec![context(), security()])?;
Ok(t)
}
async fn get_inboxes(
state: &State,
actor: &Actor,
object_id: &XsdAnyUri,
) -> Result<Vec<XsdAnyUri>, MyError> {
let domain = object_id
.as_url()
.host()
.ok_or(MyError::Domain)?
.to_string();
Ok(state.listeners_without(&actor.inbox, &domain).await)
} }