diff --git a/Cargo.lock b/Cargo.lock index 4fdfe0d5..56ee296c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16,6 +16,11 @@ name = "antidote" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "array_tool" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "arrayvec" version = "0.4.7" @@ -801,6 +806,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "plume" version = "0.1.0" dependencies = [ + "array_tool 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "base64 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "bcrypt 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1575,6 +1581,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum adler32 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6cbd0b9af8587c72beadc9f72d35b9fbb070982c9e6203e46e93f10df25f8f45" "checksum aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6531d44de723825aa81398a6415283229725a00fa30713812ab9323faa82fc4" "checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" +"checksum array_tool 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8f8cb5d814eb646a863c4f24978cff2880c4be96ad8cde2c0f0678732902e271" "checksum arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "a1e964f9e24d588183fcb43503abda40d288c8657dfc27311516ce2f05675aef" "checksum backtrace 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ebbe525f66f42d207968308ee86bc2dd60aa5fab535b22e616323a173d097d8e" "checksum backtrace-sys 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "44585761d6161b0f57afc49482ab6bd067e4edef48c12a152c237eb0203f7661" diff --git a/Cargo.toml b/Cargo.toml index fa438fde..71a3234d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ authors = ["Bat' "] name = "plume" version = "0.1.0" [dependencies] +array_tool = "1.0" base64 = "0.9" bcrypt = "0.2" dotenv = "*" diff --git a/src/activity_pub/actor.rs b/src/activity_pub/actor.rs index 2699aceb..ab5f6c5e 100644 --- a/src/activity_pub/actor.rs +++ b/src/activity_pub/actor.rs @@ -1,12 +1,8 @@ use diesel::PgConnection; -use reqwest::Client; use serde_json; use BASE_URL; use activity_pub::{activity_pub, ActivityPub, context, ap_url}; -use activity_pub::activity::Activity; -use activity_pub::request; -use activity_pub::sign::*; use models::instance::Instance; pub enum ActorType { @@ -38,6 +34,8 @@ pub trait Actor: Sized { fn get_inbox_url(&self) -> String; + fn get_shared_inbox_url(&self) -> Option; + fn custom_props(&self, _conn: &PgConnection) -> serde_json::Map { serde_json::Map::new() } @@ -84,23 +82,5 @@ pub trait Actor: Sized { )) } - fn send_to_inbox(&self, conn: &PgConnection, sender: &S, act: A) { - let mut act = act.serialize(); - act["@context"] = context(); - let signed = act.sign(sender, conn); - - let res = Client::new() - .post(&self.get_inbox_url()[..]) - .headers(request::headers()) - .header(request::signature(sender, request::headers(), conn)) - .header(request::digest(signed.to_string())) - .body(signed.to_string()) - .send(); - match res { - Ok(mut r) => println!("Successfully sent activity to inbox ({})\n\n{:?}", self.get_inbox_url(), r.text().unwrap()), - Err(e) => println!("Error while sending to inbox ({:?})", e) - } - } - fn from_url(conn: &PgConnection, url: String) -> Option; } diff --git a/src/activity_pub/inbox.rs b/src/activity_pub/inbox.rs index 0a49fe2f..2eb36ccc 100644 --- a/src/activity_pub/inbox.rs +++ b/src/activity_pub/inbox.rs @@ -3,6 +3,7 @@ use serde_json; use activity_pub::activity; use activity_pub::actor::Actor; +use activity_pub::outbox::broadcast; use activity_pub::sign::*; use models::blogs::Blog; use models::comments::*; @@ -79,7 +80,7 @@ pub trait Inbox: Actor + Sized { } } - fn accept_follow( + fn accept_follow( &self, conn: &PgConnection, from: &A, @@ -94,6 +95,6 @@ pub trait Inbox: Actor + Sized { }); let accept = activity::Accept::new(target, follow, conn); - from.send_to_inbox(conn, target, accept) + broadcast(conn, from, accept, vec![target.clone()]); } } diff --git a/src/activity_pub/outbox.rs b/src/activity_pub/outbox.rs index 3c991fdb..8b310625 100644 --- a/src/activity_pub/outbox.rs +++ b/src/activity_pub/outbox.rs @@ -1,4 +1,6 @@ +use array_tool::vec::Uniq; use diesel::PgConnection; +use reqwest::Client; use rocket::http::Status; use rocket::response::{Response, Responder}; use rocket::request::Request; @@ -8,8 +10,8 @@ use std::sync::Arc; use activity_pub::{activity_pub, ActivityPub, context}; use activity_pub::activity::Activity; use activity_pub::actor::Actor; -use activity_pub::sign::Signer; -use models::users::User; +use activity_pub::request; +use activity_pub::sign::*; pub struct Outbox { id: String, @@ -42,8 +44,28 @@ impl<'r> Responder<'r> for Outbox { } } -pub fn broadcast(conn: &PgConnection, sender: &S, act: A, to: Vec) { - for user in to { - user.send_to_inbox(conn, sender, act.clone()); // TODO: run it in Sidekiq or something like that +pub fn broadcast(conn: &PgConnection, sender: &S, act: A, to: Vec) { + let boxes = to.into_iter() + .map(|u| u.get_shared_inbox_url().unwrap_or(u.get_inbox_url())) + .collect::>() + .unique(); + for inbox in boxes { + // TODO: run it in Sidekiq or something like that + + let mut act = act.serialize(); + act["@context"] = context(); + let signed = act.sign(sender, conn); + + let res = Client::new() + .post(&inbox[..]) + .headers(request::headers()) + .header(request::signature(sender, request::headers(), conn)) + .header(request::digest(signed.to_string())) + .body(signed.to_string()) + .send(); + match res { + Ok(mut r) => println!("Successfully sent activity to inbox ({})\n\n{:?}", inbox, r.text().unwrap()), + Err(e) => println!("Error while sending to inbox ({:?})", e) + } } } diff --git a/src/main.rs b/src/main.rs index 2445a2ca..2d16db20 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ #![feature(plugin, custom_derive, iterator_find_map)] #![plugin(rocket_codegen)] +extern crate array_tool; extern crate base64; extern crate bcrypt; extern crate chrono; diff --git a/src/models/blogs.rs b/src/models/blogs.rs index 46117dcc..1dc3240e 100644 --- a/src/models/blogs.rs +++ b/src/models/blogs.rs @@ -15,7 +15,7 @@ use models::instance::Instance; use schema::blogs; -#[derive(Queryable, Identifiable, Serialize)] +#[derive(Queryable, Identifiable, Serialize, Clone)] pub struct Blog { pub id: i32, pub actor_id: String, @@ -130,6 +130,10 @@ impl Actor for Blog { self.inbox_url.clone() } + fn get_shared_inbox_url(&self) -> Option { + None + } + fn from_url(conn: &PgConnection, url: String) -> Option { blogs::table.filter(blogs::ap_url.eq(url)) .limit(1) diff --git a/src/models/users.rs b/src/models/users.rs index 50e6d0e3..6d16165d 100644 --- a/src/models/users.rs +++ b/src/models/users.rs @@ -300,6 +300,10 @@ impl Actor for User { self.inbox_url.clone() } + fn get_shared_inbox_url(&self) -> Option { + None + } + fn custom_props(&self, conn: &PgConnection) -> serde_json::Map { let mut res = serde_json::Map::new(); res.insert("publicKey".to_string(), json!({ diff --git a/src/routes/user.rs b/src/routes/user.rs index 863fa0c6..2f053f47 100644 --- a/src/routes/user.rs +++ b/src/routes/user.rs @@ -6,7 +6,7 @@ use serde_json; use activity_pub::{activity, activity_pub, ActivityPub, context}; use activity_pub::actor::Actor; use activity_pub::inbox::Inbox; -use activity_pub::outbox::Outbox; +use activity_pub::outbox::{broadcast, Outbox}; use db_conn::DbConn; use models::follows::*; use models::instance::Instance; @@ -48,7 +48,7 @@ fn follow(name: String, conn: DbConn, user: User) -> Redirect { follower_id: user.id, following_id: target.id }); - target.send_to_inbox(&*conn, &user, activity::Follow::new(&user, &target, &*conn)); + broadcast(&*conn, &user, activity::Follow::new(&user, &target, &*conn), vec![target]); Redirect::to(format!("/@/{}", name).as_ref()) }