diff --git a/crates/routes/src/utils/scheduled_tasks.rs b/crates/routes/src/utils/scheduled_tasks.rs index 800ddc135..157b14ee4 100644 --- a/crates/routes/src/utils/scheduled_tasks.rs +++ b/crates/routes/src/utils/scheduled_tasks.rs @@ -117,7 +117,7 @@ pub async fn setup(context: Data) -> LemmyResult<()> { let context = context_1.reset_request_count(); async move { - all_active_counts(&mut context.pool(), ALL_ACTIVE_INTERVALS) + all_active_counts(&mut context.pool()) .await .inspect_err(|e| warn!("Failed to update active counts: {e}")) .ok(); @@ -376,16 +376,30 @@ async fn active_counts(pool: &mut DbPool<'_>, interval: (&str, &str)) -> LemmyRe let conn = &mut get_conn(pool).await?; process_site_aggregates(conn, interval).await?; - process_community_aggregates(conn, interval).await?; + process_community_aggregates( + conn, + interval, + "users_active", + "community_aggregates_activity", + ) + .await?; Ok(()) } /// Re-calculate all the active counts -async fn all_active_counts(pool: &mut DbPool<'_>, intervals: [(&str, &str); 4]) -> LemmyResult<()> { - for i in intervals { +async fn all_active_counts(pool: &mut DbPool<'_>) -> LemmyResult<()> { + for i in ALL_ACTIVE_INTERVALS { active_counts(pool, i).await?; } + let conn = &mut get_conn(pool).await?; + process_community_aggregates( + conn, + ONE_MONTH, + "interactions", + "community_aggregates_interactions", + ) + .await?; Ok(()) } @@ -426,6 +440,8 @@ async fn process_site_aggregates( async fn process_community_aggregates( conn: &mut AsyncPgConnection, interval: (&str, &str), + field_name_prefix: &str, + function_name: &str, ) -> LemmyResult<()> { // Select the community count results into a temp table. let caggs_temp_table = &format!("community_aggregates_temp_table_{}", interval.1); @@ -435,7 +451,7 @@ async fn process_community_aggregates( sql_query(drop_caggs_temp_table).execute(conn).await.ok(); sql_query(format!( - "CREATE TEMP TABLE {caggs_temp_table} AS SELECT * FROM r.community_aggregates_activity('{}')", + "CREATE TEMP TABLE {caggs_temp_table} AS SELECT * FROM r.{function_name}('{}')", interval.0 )) .execute(conn) @@ -450,7 +466,7 @@ async fn process_community_aggregates( while let Some(prev_community_id) = prev_community_id_res { let updated_rows = sql_query(format!( "UPDATE community a - SET users_active_{} = b.count_ + SET {field_name_prefix}_{} = b.count_ FROM ( SELECT count_, community_id_ FROM {caggs_temp_table} @@ -459,7 +475,7 @@ async fn process_community_aggregates( LIMIT $2 ) AS b WHERE a.id = b.community_id_ - RETURNING a.id + RETURNING a.id AS community_id ", interval.1 )) @@ -477,7 +493,7 @@ async fn process_community_aggregates( sql_query(drop_caggs_temp_table).execute(conn).await.ok(); info!( - "Finished community_aggregates active_{} (processed {} rows)", + "Finished community_aggregates {field_name_prefix}_{} (processed {} rows)", interval.1, processed_rows_count ); @@ -691,7 +707,15 @@ mod tests { use super::*; use lemmy_api_utils::request::client_builder; - use lemmy_db_schema::test_data::TestData; + use lemmy_db_schema::{ + source::{ + community::{Community, CommunityInsertForm}, + person::{Person, PersonInsertForm}, + post::{Post, PostActions, PostInsertForm, PostLikeForm}, + }, + test_data::TestData, + traits::{Crud, Likeable}, + }; use lemmy_utils::{ error::{LemmyErrorType, LemmyResult}, settings::structs::Settings, @@ -722,23 +746,61 @@ mod tests { #[tokio::test] #[serial] - async fn test_scheduled_tasks_no_errors() -> LemmyResult<()> { + async fn test_scheduled_tasks() -> LemmyResult<()> { let context = LemmyContext::init_test_context().await; - let data = TestData::create(&mut context.pool()).await?; + let pool = &mut context.pool(); - active_counts(&mut context.pool(), ONE_DAY).await?; - all_active_counts(&mut context.pool(), ALL_ACTIVE_INTERVALS).await?; - update_local_user_count(&mut context.pool()).await?; - update_hot_ranks(&mut context.pool()).await?; - update_banned_when_expired(&mut context.pool()).await?; - delete_instance_block_when_expired(&mut context.pool()).await?; - clear_old_activities(&mut context.pool()).await?; - overwrite_deleted_posts_and_comments(&mut context.pool()).await?; - delete_old_denied_users(&mut context.pool()).await?; - update_instance_software(&mut context.pool(), context.client()).await?; - delete_expired_captcha_answers(&mut context.pool()).await?; + let data = TestData::create(pool).await?; + let community = Community::create( + pool, + &CommunityInsertForm::new( + data.instance.id, + "name".to_owned(), + "title".to_owned(), + "pubkey".to_owned(), + ), + ) + .await?; + let person = Person::create( + pool, + &PersonInsertForm::new("felicity".to_owned(), "pubkey".to_owned(), data.instance.id), + ) + .await?; + let post = Post::create( + pool, + &PostInsertForm::new("i am grrreat".to_owned(), person.id, community.id), + ) + .await?; + PostActions::like(pool, &PostLikeForm::new(post.id, person.id, 1)).await?; + + active_counts(pool, ONE_DAY).await?; + all_active_counts(pool).await?; + update_local_user_count(pool).await?; + update_hot_ranks(pool).await?; + update_banned_when_expired(pool).await?; + delete_instance_block_when_expired(pool).await?; + clear_old_activities(pool).await?; + overwrite_deleted_posts_and_comments(pool).await?; + delete_old_denied_users(pool).await?; + update_instance_software(pool, context.client()).await?; + delete_expired_captcha_answers(pool).await?; publish_scheduled_posts(&context).await?; - data.delete(&mut context.pool()).await?; + + let community_after = Community::read(pool, community.id).await?; + assert_eq!( + community_after, + Community { + posts: 1, + users_active_day: 1, + users_active_week: 1, + users_active_month: 1, + users_active_half_year: 1, + interactions_month: 1, + ..community_after.clone() + } + ); + + data.delete(pool).await?; Ok(()) } }