Broadcast activities to all known instances

We consider everything posted with Plume public (for the moment at least)
This commit is contained in:
Bat 2018-09-09 12:19:11 +01:00
parent b4391b55f2
commit 08cb337df6
6 changed files with 23 additions and 17 deletions

View file

@ -91,7 +91,7 @@ impl<'a, 'r> FromRequest<'a, 'r> for ApRequest {
} }
} }
pub fn broadcast<A: Activity, S: sign::Signer, T: inbox::WithInbox + Actor>(sender: &S, act: A, to: Vec<T>) { pub fn broadcast<S: sign::Signer, A: Activity, T: inbox::WithInbox + Actor>(sender: &S, act: A, to: Vec<T>) {
let boxes = to.into_iter() let boxes = to.into_iter()
.filter(|u| !u.is_local()) .filter(|u| !u.is_local())
.map(|u| u.get_shared_inbox_url().unwrap_or(u.get_inbox_url())) .map(|u| u.get_shared_inbox_url().unwrap_or(u.get_inbox_url()))

View file

@ -100,6 +100,12 @@ impl User {
find_by!(users, find_by_name, username as String, instance_id as i32); find_by!(users, find_by_name, username as String, instance_id as i32);
find_by!(users, find_by_ap_url, ap_url as String); find_by!(users, find_by_ap_url, ap_url as String);
pub fn one_by_instance(&self, conn: &PgConnection) -> Vec<User> {
users::table.distinct_on(users::instance_id)
.get_results::<User>(conn)
.expect("Error in User::on_by_instance")
}
pub fn delete(&self, conn: &PgConnection) { pub fn delete(&self, conn: &PgConnection) {
diesel::delete(self).execute(conn).expect("Couldn't remove user from DB"); diesel::delete(self).execute(conn).expect("Couldn't remove user from DB");
} }

View file

@ -43,9 +43,9 @@ fn create(blog_name: String, slug: String, data: LenientForm<NewCommentForm>, us
let instance = Instance::get_local(&*conn).unwrap(); let instance = Instance::get_local(&*conn).unwrap();
instance.received(&*conn, serde_json::to_value(new_comment.clone()).expect("JSON serialization error")) instance.received(&*conn, serde_json::to_value(new_comment.clone()).expect("JSON serialization error"))
.expect("We are not compatible with ourselve: local broadcast failed (new comment)"); .expect("We are not compatible with ourselve: local broadcast failed (new comment)");
let followers = user.get_followers(&*conn); let dest = User::one_by_instance(&*conn);
let user_clone = user.clone(); let user_clone = user.clone();
worker.execute(Thunk::of(move || broadcast(&user_clone, new_comment, followers))); worker.execute(Thunk::of(move || broadcast(&user_clone, new_comment, dest)));
Redirect::to(format!(uri!(super::posts::details: blog_name = blog_name, slug = slug)) Redirect::to(format!(uri!(super::posts::details: blog_name = blog_name, slug = slug))
}) })

View file

@ -25,14 +25,14 @@ fn create(blog: String, slug: String, user: User, conn: DbConn, worker: State<Po
like.update_ap_url(&*conn); like.update_ap_url(&*conn);
like.notify(&*conn); like.notify(&*conn);
let followers = user.get_followers(&*conn); let dest = User::one_by_instance(&*conn);
let act = like.into_activity(&*conn); let act = like.into_activity(&*conn);
worker.execute(Thunk::of(move || broadcast(&user, act, followers))); worker.execute(Thunk::of(move || broadcast(&user, act, dest)));
} else { } else {
let like = likes::Like::find_by_user_on_post(&*conn, user.id, post.id).unwrap(); let like = likes::Like::find_by_user_on_post(&*conn, user.id, post.id).unwrap();
let delete_act = like.delete(&*conn); let delete_act = like.delete(&*conn);
let followers = user.get_followers(&*conn); let dest = User::one_by_instance(&*conn);
worker.execute(Thunk::of(move || broadcast(&user, delete_act, followers))); worker.execute(Thunk::of(move || broadcast(&user, delete_act, dest)));
} }
Redirect::to(uri!(super::posts::details: blog = blog, slug = slug)) Redirect::to(uri!(super::posts::details: blog = blog, slug = slug))

View file

@ -196,8 +196,8 @@ fn update(blog: String, slug: String, user: User, conn: DbConn, data: LenientFor
} }
let act = post.update_activity(&*conn); let act = post.update_activity(&*conn);
let followers = user.get_followers(&*conn); let dest = User::one_by_instance(&*conn);
worker.execute(Thunk::of(move || broadcast(&user, act, followers))); worker.execute(Thunk::of(move || broadcast(&user, act, dest)));
Ok(Redirect::to(uri!(details: blog = blog, slug = new_slug))) Ok(Redirect::to(uri!(details: blog = blog, slug = new_slug)))
} }
@ -294,8 +294,8 @@ fn create(blog_name: String, data: LenientForm<NewPostForm>, user: User, conn: D
} }
let act = post.create_activity(&*conn); let act = post.create_activity(&*conn);
let followers = user.get_followers(&*conn); let dest = User::one_by_instance(&*conn);
worker.execute(Thunk::of(move || broadcast(&user, act, followers))); worker.execute(Thunk::of(move || broadcast(&user, act, dest)));
Ok(Redirect::to(uri!(details: blog = blog_name, slug = slug))) Ok(Redirect::to(uri!(details: blog = blog_name, slug = slug)))
} }
@ -320,9 +320,9 @@ fn delete(blog_name: String, slug: String, conn: DbConn, user: User, worker: Sta
if !post.get_authors(&*conn).into_iter().any(|a| a.id == user.id) { if !post.get_authors(&*conn).into_iter().any(|a| a.id == user.id) {
Redirect::to(uri!(details: blog = blog_name.clone(), slug = slug.clone())) Redirect::to(uri!(details: blog = blog_name.clone(), slug = slug.clone()))
} else { } else {
let audience = user.get_followers(&*conn); let dest = User::one_by_instance(&*conn);
let delete_activity = post.delete(&*conn); let delete_activity = post.delete(&*conn);
worker.execute(Thunk::of(move || broadcast(&user, delete_activity, audience))); worker.execute(Thunk::of(move || broadcast(&user, delete_activity, dest)));
Redirect::to(uri!(super::blogs::details: name = blog_name)) Redirect::to(uri!(super::blogs::details: name = blog_name))
} }

View file

@ -25,14 +25,14 @@ fn create(blog: String, slug: String, user: User, conn: DbConn, worker: State<Po
reshare.update_ap_url(&*conn); reshare.update_ap_url(&*conn);
reshare.notify(&*conn); reshare.notify(&*conn);
let followers = user.get_followers(&*conn); let dest = User::one_by_instance(&*conn);
let act = reshare.into_activity(&*conn); let act = reshare.into_activity(&*conn);
worker.execute(Thunk::of(move || broadcast(&user, act, followers))); worker.execute(Thunk::of(move || broadcast(&user, act, dest)));
} else { } else {
let reshare = Reshare::find_by_user_on_post(&*conn, user.id, post.id).unwrap(); let reshare = Reshare::find_by_user_on_post(&*conn, user.id, post.id).unwrap();
let delete_act = reshare.delete(&*conn); let delete_act = reshare.delete(&*conn);
let followers = user.get_followers(&*conn); let dest = User::one_by_instance(&*conn);
worker.execute(Thunk::of(move || broadcast(&user, delete_act, followers))); worker.execute(Thunk::of(move || broadcast(&user, delete_act, dest)));
} }
Redirect::to(uri!(super::posts::details: blog = blog, slug = slug)) Redirect::to(uri!(super::posts::details: blog = blog, slug = slug))