From 52d860d402e320b11f0da2700ef50e27b01f85d2 Mon Sep 17 00:00:00 2001 From: Violet White Date: Wed, 30 Oct 2019 06:22:28 -0400 Subject: [PATCH] Paginate the outbox responses. Fixes #669 (#681) * Paginate the outbox responses. Fixes #669 * Address Ana's review * Make outbox_fetch page through instance outboxes * Fix infinite loop in fetch_outbox * Fix off by one --- plume-models/src/blogs.rs | 48 +++++++++++++- plume-models/src/lib.rs | 2 +- plume-models/src/users.rs | 128 +++++++++++++++++++++++++++++++++----- src/main.rs | 2 + src/routes/blogs.rs | 13 +++- src/routes/mod.rs | 4 +- src/routes/user.rs | 15 ++++- 7 files changed, 186 insertions(+), 26 deletions(-) diff --git a/plume-models/src/blogs.rs b/plume-models/src/blogs.rs index bd065533..571cdb25 100644 --- a/plume-models/src/blogs.rs +++ b/plume-models/src/blogs.rs @@ -1,4 +1,9 @@ -use activitypub::{actor::Group, collection::OrderedCollection, object::Image, CustomObject}; +use activitypub::{ + actor::Group, + collection::{OrderedCollection, OrderedCollectionPage}, + object::Image, + CustomObject, +}; use chrono::NaiveDateTime; use diesel::{self, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SaveChangesDsl}; use openssl::{ @@ -22,7 +27,7 @@ use safe_string::SafeString; use schema::blogs; use search::Searcher; use users::User; -use {Connection, Error, PlumeRocket, Result}; +use {ap_url, Connection, Error, PlumeRocket, Result, ITEMS_PER_PAGE}; pub type CustomGroup = CustomObject; @@ -220,12 +225,49 @@ impl Blog { coll.collection_props.items = serde_json::to_value(self.get_activities(conn)?)?; coll.collection_props .set_total_items_u64(self.get_activities(conn)?.len() as u64)?; + coll.collection_props + .set_first_link(Id::new(ap_url(&format!("{}?page=1", &self.outbox_url))))?; + coll.collection_props + .set_last_link(Id::new(ap_url(&format!( + "{}?page={}", + &self.outbox_url, + (self.get_activities(conn)?.len() as u64 + ITEMS_PER_PAGE as u64 - 1) as u64 + / ITEMS_PER_PAGE as u64 + ))))?; + Ok(ActivityStream::new(coll)) + } + pub fn outbox_page( + &self, + conn: &Connection, + (min, max): (i32, i32), + ) -> Result> { + let mut coll = OrderedCollectionPage::default(); + let acts = self.get_activity_page(&conn, (min, max))?; + //This still doesn't do anything because the outbox + //doesn't do anything yet + coll.collection_page_props.set_next_link(Id::new(&format!( + "{}?page={}", + &self.outbox_url, + min / ITEMS_PER_PAGE + 1 + )))?; + coll.collection_page_props.set_prev_link(Id::new(&format!( + "{}?page={}", + &self.outbox_url, + min / ITEMS_PER_PAGE - 1 + )))?; + coll.collection_props.items = serde_json::to_value(acts)?; Ok(ActivityStream::new(coll)) } - fn get_activities(&self, _conn: &Connection) -> Result> { Ok(vec![]) } + fn get_activity_page( + &self, + _conn: &Connection, + (_min, _max): (i32, i32), + ) -> Result> { + Ok(vec![]) + } pub fn get_keypair(&self) -> Result> { PKey::from_rsa(Rsa::private_key_from_pem( diff --git a/plume-models/src/lib.rs b/plume-models/src/lib.rs index ecffa483..3e2d4f93 100644 --- a/plume-models/src/lib.rs +++ b/plume-models/src/lib.rs @@ -75,7 +75,7 @@ impl From for Error { Error::Signature } } - +pub const ITEMS_PER_PAGE: i32 = 12; impl From for Error { fn from(_: openssl::error::ErrorStack) -> Self { Error::Signature diff --git a/plume-models/src/users.rs b/plume-models/src/users.rs index d530b002..c8af1b83 100644 --- a/plume-models/src/users.rs +++ b/plume-models/src/users.rs @@ -1,7 +1,7 @@ use activitypub::{ activity::Delete, actor::Person, - collection::OrderedCollection, + collection::{OrderedCollection, OrderedCollectionPage}, object::{Image, Tombstone}, Activity, CustomObject, Endpoint, }; @@ -49,7 +49,7 @@ use safe_string::SafeString; use schema::users; use search::Searcher; use timeline::Timeline; -use {ap_url, Connection, Error, PlumeRocket, Result}; +use {ap_url, Connection, Error, PlumeRocket, Result, ITEMS_PER_PAGE}; pub type CustomPerson = CustomObject; @@ -320,16 +320,77 @@ impl User { .load::(conn) .map_err(Error::from) } - pub fn outbox(&self, conn: &Connection) -> Result> { - let acts = self.get_activities(conn)?; - let n_acts = acts.len(); let mut coll = OrderedCollection::default(); - coll.collection_props.items = serde_json::to_value(acts)?; - coll.collection_props.set_total_items_u64(n_acts as u64)?; + let first = &format!("{}?page=1", &self.outbox_url); + let last = &format!( + "{}?page={}", + &self.outbox_url, + self.get_activities_count(&conn) / i64::from(ITEMS_PER_PAGE) + 1 + ); + coll.collection_props.set_first_link(Id::new(first))?; + coll.collection_props.set_last_link(Id::new(last))?; + coll.collection_props + .set_total_items_u64(self.get_activities_count(&conn) as u64)?; Ok(ActivityStream::new(coll)) } + pub fn outbox_page( + &self, + conn: &Connection, + (min, max): (i32, i32), + ) -> Result> { + let acts = self.get_activities_page(conn, (min, max))?; + let n_acts = self.get_activities_count(&conn); + let mut coll = OrderedCollectionPage::default(); + if n_acts - i64::from(min) >= i64::from(ITEMS_PER_PAGE) { + coll.collection_page_props.set_next_link(Id::new(&format!( + "{}?page={}", + &self.outbox_url, + min / ITEMS_PER_PAGE + 2 + )))?; + } + if min > 0 { + coll.collection_page_props.set_prev_link(Id::new(&format!( + "{}?page={}", + &self.outbox_url, + min / ITEMS_PER_PAGE + )))?; + } + coll.collection_props.items = serde_json::to_value(acts)?; + coll.collection_page_props + .set_part_of_link(Id::new(&self.outbox_url))?; + Ok(ActivityStream::new(coll)) + } + fn fetch_outbox_page(&self, url: &str) -> Result<(Vec, Option)> { + let mut res = ClientBuilder::new() + .connect_timeout(Some(std::time::Duration::from_secs(5))) + .build()? + .get(url) + .header( + ACCEPT, + HeaderValue::from_str( + &ap_accept_header() + .into_iter() + .collect::>() + .join(", "), + )?, + ) + .send()?; + let text = &res.text()?; + let json: serde_json::Value = serde_json::from_str(text)?; + let items = json["items"] + .as_array() + .unwrap_or(&vec![]) + .iter() + .filter_map(|j| serde_json::from_value(j.clone()).ok()) + .collect::>(); + let next = match json.get("next") { + Some(x) => Some(x.as_str().unwrap().to_owned()), + None => None, + }; + Ok((items, next)) + } pub fn fetch_outbox(&self) -> Result> { let mut res = ClientBuilder::new() .connect_timeout(Some(std::time::Duration::from_secs(5))) @@ -347,12 +408,32 @@ impl User { .send()?; let text = &res.text()?; let json: serde_json::Value = serde_json::from_str(text)?; - Ok(json["items"] - .as_array() - .unwrap_or(&vec![]) - .iter() - .filter_map(|j| serde_json::from_value(j.clone()).ok()) - .collect::>()) + if let Some(first) = json.get("first") { + let mut items: Vec = Vec::new(); + let mut next = first.as_str().unwrap().to_owned(); + while let Ok((mut page, nxt)) = self.fetch_outbox_page(&next) { + if page.is_empty() { + break; + } + items.extend(page.drain(..)); + if let Some(n) = nxt { + if n == next { + break; + } + next = n; + } else { + break; + } + } + Ok(items) + } else { + Ok(json["items"] + .as_array() + .unwrap_or(&vec![]) + .iter() + .filter_map(|j| serde_json::from_value(j.clone()).ok()) + .collect::>()) + } } pub fn fetch_followers_ids(&self) -> Result> { @@ -379,14 +460,31 @@ impl User { .filter_map(|j| serde_json::from_value(j.clone()).ok()) .collect::>()) } - - fn get_activities(&self, conn: &Connection) -> Result> { + fn get_activities_count(&self, conn: &Connection) -> i64 { + use schema::post_authors; + use schema::posts; + let posts_by_self = PostAuthor::belonging_to(self).select(post_authors::post_id); + posts::table + .filter(posts::published.eq(true)) + .filter(posts::id.eq_any(posts_by_self)) + .count() + .first(conn) + .unwrap() + } + fn get_activities_page( + &self, + conn: &Connection, + (min, max): (i32, i32), + ) -> Result> { use schema::post_authors; use schema::posts; let posts_by_self = PostAuthor::belonging_to(self).select(post_authors::post_id); let posts = posts::table .filter(posts::published.eq(true)) .filter(posts::id.eq_any(posts_by_self)) + .order(posts::creation_date.desc()) + .offset(min.into()) + .limit((max - min).into()) .load::(conn)?; Ok(posts .into_iter() diff --git a/src/main.rs b/src/main.rs index 4df0b0be..eb755a04 100644 --- a/src/main.rs +++ b/src/main.rs @@ -182,6 +182,7 @@ Then try to restart Plume routes::blogs::details, routes::blogs::activity_details, routes::blogs::outbox, + routes::blogs::outbox_page, routes::blogs::new, routes::blogs::new_auth, routes::blogs::create, @@ -262,6 +263,7 @@ Then try to restart Plume routes::user::follow_auth, routes::user::activity_details, routes::user::outbox, + routes::user::outbox_page, routes::user::inbox, routes::user::ap_followers, routes::user::new, diff --git a/src/routes/blogs.rs b/src/routes/blogs.rs index 8ae69084..131ad444 100644 --- a/src/routes/blogs.rs +++ b/src/routes/blogs.rs @@ -1,4 +1,4 @@ -use activitypub::collection::OrderedCollection; +use activitypub::collection::{OrderedCollection, OrderedCollectionPage}; use atom_syndication::{Entry, FeedBuilder}; use diesel::SaveChangesDsl; use rocket::{ @@ -347,7 +347,16 @@ pub fn outbox(name: String, rockets: PlumeRocket) -> Option/outbox?")] +pub fn outbox_page( + name: String, + page: Page, + rockets: PlumeRocket, +) -> Option> { + let blog = Blog::find_by_fqn(&rockets, &name).ok()?; + Some(blog.outbox_page(&*rockets.conn, page.limits()).ok()?) +} #[get("/~//atom.xml")] pub fn atom_feed(name: String, rockets: PlumeRocket) -> Option> { let blog = Blog::find_by_fqn(&rockets, &name).ok()?; diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 171de90b..01822323 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,5 +1,6 @@ #![warn(clippy::too_many_arguments)] use atom_syndication::{ContentBuilder, Entry, EntryBuilder, LinkBuilder, Person, PersonBuilder}; +use plume_models::{posts::Post, Connection, CONFIG, ITEMS_PER_PAGE}; use rocket::{ http::{ hyper::header::{CacheControl, CacheDirective, ETag, EntityTag}, @@ -17,9 +18,6 @@ use std::{ }; use template_utils::Ructe; -use plume_models::{posts::Post, Connection, CONFIG}; -const ITEMS_PER_PAGE: i32 = 12; - /// Special return type used for routes that "cannot fail", and instead /// `Redirect`, or `Flash`, when we cannot deliver a `Ructe` Response #[allow(clippy::large_enum_variant)] diff --git a/src/routes/user.rs b/src/routes/user.rs index 6eea8407..84815922 100644 --- a/src/routes/user.rs +++ b/src/routes/user.rs @@ -1,4 +1,7 @@ -use activitypub::{activity::Create, collection::OrderedCollection}; +use activitypub::{ + activity::Create, + collection::{OrderedCollection, OrderedCollectionPage}, +}; use atom_syndication::{Entry, FeedBuilder}; use diesel::SaveChangesDsl; use rocket::{ @@ -553,7 +556,15 @@ pub fn outbox(name: String, rockets: PlumeRocket) -> Option/outbox?")] +pub fn outbox_page( + name: String, + page: Page, + rockets: PlumeRocket, +) -> Option> { + let user = User::find_by_fqn(&rockets, &name).ok()?; + user.outbox_page(&*rockets.conn, page.limits()).ok() +} #[post("/@//inbox", data = "")] pub fn inbox( name: String,