diff --git a/crates/db_views/src/comment_view.rs b/crates/db_views/src/comment_view.rs index 7588943b9..e021578f8 100644 --- a/crates/db_views/src/comment_view.rs +++ b/crates/db_views/src/comment_view.rs @@ -1,5 +1,4 @@ use crate::structs::{CommentView, LocalUserView}; -use chrono::{DateTime, Utc}; use diesel::{ dsl::{exists, not}, pg::Pg, @@ -63,17 +62,6 @@ fn queries<'a>() -> Queries< ) }; - let is_saved = |person_id| { - comment_saved::table - .filter( - comment::id - .eq(comment_saved::comment_id) - .and(comment_saved::person_id.eq(person_id)), - ) - .select(comment_saved::published.nullable()) - .single_value() - }; - let is_community_followed = |person_id| { community_follower::table .filter( @@ -147,14 +135,6 @@ fn queries<'a>() -> Queries< Box::new(None::.into_sql::>()) }; - let is_saved_selection: Box< - dyn BoxableExpression<_, Pg, SqlType = sql_types::Nullable>, - > = if let Some(person_id) = my_person_id { - Box::new(is_saved(person_id)) - } else { - Box::new(None::>.into_sql::>()) - }; - let is_creator_blocked_selection: Box> = if let Some(person_id) = my_person_id { Box::new(is_creator_blocked(person_id)) @@ -167,6 +147,13 @@ fn queries<'a>() -> Queries< .inner_join(post::table) .inner_join(community::table.on(post::community_id.eq(community::id))) .inner_join(comment_aggregates::table) + .left_join( + comment_saved::table.on( + comment::id + .eq(comment_saved::comment_id) + .and(comment_saved::person_id.eq(my_person_id.unwrap_or(PersonId(-1)))), + ), + ) .select(( comment::all_columns, person::all_columns, @@ -178,7 +165,7 @@ fn queries<'a>() -> Queries< creator_is_moderator, creator_is_admin, subscribed_type_selection, - is_saved_selection.is_not_null(), + comment_saved::person_id.nullable().is_not_null(), is_creator_blocked_selection, score_selection, )) @@ -260,8 +247,8 @@ fn queries<'a>() -> Queries< // If its saved only, then filter, and order by the saved time, not the comment creation time. if options.saved_only { query = query - .filter(is_saved(person_id_join).is_not_null()) - .then_order_by(is_saved(person_id_join).desc()); + .filter(comment_saved::person_id.is_not_null()) + .then_order_by(comment_saved::published.desc()); } if let Some(my_id) = my_person_id { diff --git a/crates/db_views/src/post_view.rs b/crates/db_views/src/post_view.rs index afb0f435f..eac44bb39 100644 --- a/crates/db_views/src/post_view.rs +++ b/crates/db_views/src/post_view.rs @@ -1,5 +1,4 @@ use crate::structs::{LocalUserView, PaginationCursor, PostView}; -use chrono::{DateTime, Utc}; use diesel::{ debug_query, dsl::{exists, not, IntervalDsl}, @@ -100,17 +99,6 @@ fn queries<'a>() -> Queries< ), ); - let is_saved = |person_id| { - post_saved::table - .filter( - post_aggregates::post_id - .eq(post_saved::post_id) - .and(post_saved::person_id.eq(person_id)), - ) - .select(post_saved::published.nullable()) - .single_value() - }; - let is_read = |person_id| { exists( post_read::table.filter( @@ -162,14 +150,6 @@ fn queries<'a>() -> Queries< Box::new(false.into_sql::()) }; - let is_saved_selection: Box< - dyn BoxableExpression<_, Pg, SqlType = sql_types::Nullable>, - > = if let Some(person_id) = my_person_id { - Box::new(is_saved(person_id)) - } else { - Box::new(None::>.into_sql::>()) - }; - let is_read_selection: Box> = if let Some(person_id) = my_person_id { Box::new(is_read(person_id)) @@ -237,6 +217,13 @@ fn queries<'a>() -> Queries< .inner_join(person::table) .inner_join(community::table) .inner_join(post::table) + .left_join( + post_saved::table.on( + post_aggregates::post_id + .eq(post_saved::post_id) + .and(post_saved::person_id.eq(my_person_id.unwrap_or(PersonId(-1)))), + ), + ) .select(( post::all_columns, person::all_columns, @@ -247,7 +234,7 @@ fn queries<'a>() -> Queries< creator_is_admin, post_aggregates::all_columns, subscribed_type_selection, - is_saved_selection.is_not_null(), + post_saved::person_id.nullable().is_not_null(), is_read_selection, is_hidden_selection, is_creator_blocked_selection, @@ -426,10 +413,10 @@ fn queries<'a>() -> Queries< }; // If its saved only, then filter, and order by the saved time, not the comment creation time. - if let (true, Some(person_id)) = (options.saved_only, my_person_id) { + if let (true, Some(_person_id)) = (options.saved_only, my_person_id) { query = query - .filter(is_saved(person_id).is_not_null()) - .then_order_by(is_saved(person_id).desc()); + .filter(post_saved::person_id.is_not_null()) + .then_order_by(post_saved::published.desc()); } // Only hide the read posts, if the saved_only is false. Otherwise ppl with the hide_read // setting wont be able to see saved posts. diff --git a/crates/federate/src/lib.rs b/crates/federate/src/lib.rs index 7cae2ebe9..21b9229b5 100644 --- a/crates/federate/src/lib.rs +++ b/crates/federate/src/lib.rs @@ -1,10 +1,7 @@ use crate::{util::CancellableTask, worker::InstanceWorker}; use activitypub_federation::config::FederationConfig; use lemmy_api_common::context::LemmyContext; -use lemmy_db_schema::{ - newtypes::InstanceId, - source::{federation_queue_state::FederationQueueState, instance::Instance}, -}; +use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance}; use lemmy_utils::error::LemmyResult; use stats::receive_print_stats; use std::{collections::HashMap, time::Duration}; @@ -15,6 +12,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::info; +use util::FederationQueueStateWithDomain; mod stats; mod util; @@ -38,7 +36,7 @@ pub struct SendManager { opts: Opts, workers: HashMap, context: FederationConfig, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, exit_print: JoinHandle<()>, } @@ -197,7 +195,7 @@ mod test { collections::HashSet, sync::{Arc, Mutex}, }; - use tokio::{spawn, time::sleep}; + use tokio::spawn; struct TestData { send_manager: SendManager, diff --git a/crates/federate/src/stats.rs b/crates/federate/src/stats.rs index bb6510263..f927f6ddf 100644 --- a/crates/federate/src/stats.rs +++ b/crates/federate/src/stats.rs @@ -1,15 +1,11 @@ -use crate::util::get_latest_activity_id; +use crate::util::{get_latest_activity_id, FederationQueueStateWithDomain}; use chrono::Local; -use diesel::result::Error::NotFound; use lemmy_api_common::federate_retry_sleep_duration; use lemmy_db_schema::{ newtypes::InstanceId, - source::{federation_queue_state::FederationQueueState, instance::Instance}, utils::{ActualDbPool, DbPool}, }; -use lemmy_utils::{error::LemmyResult, CACHE_DURATION_FEDERATION}; -use moka::future::Cache; -use once_cell::sync::Lazy; +use lemmy_utils::error::LemmyResult; use std::{collections::HashMap, time::Duration}; use tokio::{sync::mpsc::UnboundedReceiver, time::interval}; use tracing::{debug, info, warn}; @@ -18,7 +14,7 @@ use tracing::{debug, info, warn}; /// dropped) pub(crate) async fn receive_print_stats( pool: ActualDbPool, - mut receiver: UnboundedReceiver<(InstanceId, FederationQueueState)>, + mut receiver: UnboundedReceiver, ) { let pool = &mut DbPool::Pool(&pool); let mut printerval = interval(Duration::from_secs(60)); @@ -28,7 +24,7 @@ pub(crate) async fn receive_print_stats( ele = receiver.recv() => { match ele { // update stats for instance - Some((instance_id, ele)) => {stats.insert(instance_id, ele);}, + Some(ele) => {stats.insert(ele.state.instance_id, ele);}, // receiver closed, print stats and exit None => { print_stats(pool, &stats).await; @@ -43,7 +39,10 @@ pub(crate) async fn receive_print_stats( } } -async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap) { +async fn print_stats( + pool: &mut DbPool<'_>, + stats: &HashMap, +) { let res = print_stats_with_error(pool, stats).await; if let Err(e) = res { warn!("Failed to print stats: {e}"); @@ -52,18 +51,8 @@ async fn print_stats(pool: &mut DbPool<'_>, stats: &HashMap, - stats: &HashMap, + stats: &HashMap, ) -> LemmyResult<()> { - static INSTANCE_CACHE: Lazy>> = Lazy::new(|| { - Cache::builder() - .max_capacity(1) - .time_to_live(CACHE_DURATION_FEDERATION) - .build() - }); - let instances = INSTANCE_CACHE - .try_get_with((), async { Instance::read_all(pool).await }) - .await?; - let last_id = get_latest_activity_id(pool).await?; // it's expected that the values are a bit out of date, everything < SAVE_STATE_EVERY should be @@ -72,12 +61,9 @@ async fn print_stats_with_error( // todo: more stats (act/sec, avg http req duration) let mut ok_count = 0; let mut behind_count = 0; - for (instance_id, stat) in stats { - let domain = &instances - .iter() - .find(|i| &i.id == instance_id) - .ok_or(NotFound)? - .domain; + for ele in stats.values() { + let stat = &ele.state; + let domain = &ele.domain; let behind = last_id.0 - stat.last_successful_id.map(|e| e.0).unwrap_or(0); if stat.fail_count > 0 { info!( diff --git a/crates/federate/src/util.rs b/crates/federate/src/util.rs index 08186f436..cd1a4feda 100644 --- a/crates/federate/src/util.rs +++ b/crates/federate/src/util.rs @@ -11,6 +11,7 @@ use lemmy_db_schema::{ source::{ activity::{ActorType, SentActivity}, community::Community, + federation_queue_state::FederationQueueState, person::Person, site::Site, }, @@ -57,7 +58,7 @@ pub struct CancellableTask { impl CancellableTask { /// spawn a task but with graceful shutdown - pub fn spawn( + pub fn spawn( timeout: Duration, task: impl Fn(CancellationToken) -> F + Send + 'static, ) -> CancellableTask @@ -188,3 +189,10 @@ pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result>, stop: CancellationToken, context: Data, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, last_full_communities_fetch: DateTime, last_incremental_communities_fetch: DateTime, state: FederationQueueState, @@ -87,7 +88,7 @@ impl InstanceWorker { instance: Instance, context: Data, stop: CancellationToken, - stats_sender: UnboundedSender<(InstanceId, FederationQueueState)>, + stats_sender: UnboundedSender, ) -> Result<(), anyhow::Error> { let mut pool = context.pool(); let state = FederationQueueState::load(&mut pool, instance.id).await?; @@ -350,9 +351,10 @@ impl InstanceWorker { async fn save_and_send_state(&mut self) -> Result<()> { self.last_state_insert = Utc::now(); FederationQueueState::upsert(&mut self.context.pool(), &self.state).await?; - self - .stats_sender - .send((self.instance.id, self.state.clone()))?; + self.stats_sender.send(FederationQueueStateWithDomain { + state: self.state.clone(), + domain: self.instance.domain.clone(), + })?; Ok(()) } } diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c690a5f48..493b9c205 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -53,7 +53,7 @@ services: lemmy-ui: # use "image" to pull down an already compiled lemmy-ui. make sure to comment out "build". - image: dessalines/lemmy-ui:0.19.3 + image: dessalines/lemmy-ui:0.19.4-rc.3 # platform: linux/x86_64 # no arm64 support. uncomment platform if using m1. # use "build" to build your local lemmy ui image for development. make sure to comment out "image". # run: docker compose up --build