[main] Fixing active counts slow queries. (#5907) (#5917)

* [0.19] Fixing active counts slow queries. (#5907)

* Fixing active counts slow queries.

* Simplify back to str tuple

* Batch site and community updates

* Using update from temp table

* Making aggs temp table use interval name.

* Make dev setup use optimized postgres.

* Addressing PR comments.

* Use ref

* Removing system custom info from customPostgresql.conf

* Forgot to remove old scheduled tasks.

* Making sure migrations aren't missing anything from release/v0.19

Checked using git diff --diff-filter D --stat release/v0.19 migrations

* Rename all migrations to come after release/v0.19 migrations

* Add liked_at is not null check.
This commit is contained in:
Dessalines 2025-08-10 03:57:38 +08:00 committed by GitHub
parent 85cb47a464
commit 644a448aa9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
143 changed files with 194 additions and 28 deletions

View file

@ -6,7 +6,7 @@ use diesel::{
dsl::{count, exists, not, update, IntervalDsl},
query_builder::AsQuery,
sql_query,
sql_types::{BigInt, Timestamptz},
sql_types::{BigInt, Integer, Timestamptz},
BoolExpressionMethods,
ExpressionMethods,
NullableExpressionMethods,
@ -82,12 +82,15 @@ pub async fn setup(context: Data<LemmyContext>) -> LemmyResult<()> {
});
let context_1 = context.clone();
// Update active counts expired bans and unpublished posts every hour
// Hourly tasks:
// - Update active daily counts
// - Expired bans
// - Expired instance blocks
scheduler.every(CTimeUnits::hour(1)).run(move || {
let context = context_1.clone();
async move {
active_counts(&mut context.pool())
active_counts(&mut context.pool(), ONE_DAY)
.await
.inspect_err(|e| warn!("Failed to update active counts: {e}"))
.ok();
@ -104,6 +107,8 @@ pub async fn setup(context: Data<LemmyContext>) -> LemmyResult<()> {
let context_1 = context.reset_request_count();
// Daily tasks:
// - Update site and community activity counts
// - Update local user count
// - Overwrite deleted & removed posts and comments every day
// - Delete old denied users
// - Update instance software
@ -112,6 +117,14 @@ pub async fn setup(context: Data<LemmyContext>) -> LemmyResult<()> {
let context = context_1.reset_request_count();
async move {
all_active_counts(&mut context.pool(), ALL_ACTIVE_INTERVALS)
.await
.inspect_err(|e| warn!("Failed to update active counts: {e}"))
.ok();
update_local_user_count(&mut context.pool())
.await
.inspect_err(|e| warn!("Failed to update local user count: {e}"))
.ok();
overwrite_deleted_posts_and_comments(&mut context.pool())
.await
.inspect_err(|e| warn!("Failed to overwrite deleted posts/comments: {e}"))
@ -335,33 +348,147 @@ async fn overwrite_deleted_posts_and_comments(pool: &mut DbPool<'_>) -> LemmyRes
Ok(())
}
/// Re-calculate the site, community active counts and local user count
async fn active_counts(pool: &mut DbPool<'_>) -> LemmyResult<()> {
info!("Updating active site and community aggregates ...");
const ONE_DAY: (&str, &str) = ("1 day", "day");
const ONE_WEEK: (&str, &str) = ("1 week", "week");
const ONE_MONTH: (&str, &str) = ("1 month", "month");
const SIX_MONTHS: (&str, &str) = ("6 months", "half_year");
const ALL_ACTIVE_INTERVALS: [(&str, &str); 4] = [ONE_DAY, ONE_WEEK, ONE_MONTH, SIX_MONTHS];
#[derive(QueryableByName)]
struct SiteActivitySelectResult {
#[diesel(sql_type = Integer)]
site_aggregates_activity: i32,
}
#[derive(QueryableByName)]
struct CommunityAggregatesUpdateResult {
#[diesel(sql_type = Integer)]
community_id: i32,
}
/// Re-calculate the site and community active counts for a given interval
async fn active_counts(pool: &mut DbPool<'_>, interval: (&str, &str)) -> LemmyResult<()> {
info!(
"Updating active site and community aggregates for {}...",
interval.0
);
let conn = &mut get_conn(pool).await?;
process_site_aggregates(conn, interval).await?;
process_community_aggregates(conn, interval).await?;
let intervals = vec![
("1 day", "day"),
("1 week", "week"),
("1 month", "month"),
("6 months", "half_year"),
];
Ok(())
}
for (full_form, abbr) in &intervals {
let update_site_stmt = format!(
"update local_site set users_active_{} = (select r.site_aggregates_activity('{}')) where site_id = 1",
abbr, full_form
);
sql_query(update_site_stmt).execute(conn).await?;
/// Re-calculate all the active counts
async fn all_active_counts(pool: &mut DbPool<'_>, intervals: [(&str, &str); 4]) -> LemmyResult<()> {
for i in intervals {
active_counts(pool, i).await?;
}
Ok(())
}
let update_community_stmt = format!("update community ca set users_active_{} = mv.count_ from r.community_aggregates_activity('{}') mv where ca.id = mv.community_id_", abbr, full_form);
sql_query(update_community_stmt).execute(conn).await?;
async fn process_site_aggregates(
conn: &mut AsyncPgConnection,
interval: (&str, &str),
) -> LemmyResult<()> {
// Select the site count result first
let site_activity = sql_query(format!(
"select * from r.site_aggregates_activity('{}')",
interval.0
))
.get_result::<SiteActivitySelectResult>(conn)
.await
.inspect_err(|e| warn!("Failed to fetch site activity: {e}"))?;
let processed_rows = site_activity.site_aggregates_activity;
// Update the site count
sql_query(format!(
"update local_site set users_active_{} = $1",
interval.1,
))
.bind::<Integer, _>(processed_rows)
.execute(conn)
.await
.inspect_err(|e| warn!("Failed to update site stats: {e}"))
.ok();
info!(
"Finished site_aggregates active_{} (processed {} rows)",
interval.1, processed_rows
);
Ok(())
}
async fn process_community_aggregates(
conn: &mut AsyncPgConnection,
interval: (&str, &str),
) -> LemmyResult<()> {
// Select the community count results into a temp table.
let caggs_temp_table = &format!("community_aggregates_temp_table_{}", interval.1);
// Drop temp table before and after, just in case
let drop_caggs_temp_table = &format!("DROP TABLE IF EXISTS {caggs_temp_table}");
sql_query(drop_caggs_temp_table).execute(conn).await.ok();
sql_query(format!(
"CREATE TEMP TABLE {caggs_temp_table} AS SELECT * FROM r.community_aggregates_activity('{}')",
interval.0
))
.execute(conn)
.await
.inspect_err(|e| warn!("Failed to create temp community_aggregates table: {e}"))?;
// Split up into 1000 community transaction batches
let update_batch_size = 1000;
let mut processed_rows_count = 0;
let mut prev_community_id_res = Some(0);
while let Some(prev_community_id) = prev_community_id_res {
let updated_rows = sql_query(format!(
"UPDATE community a
SET users_active_{} = b.count_
FROM (
SELECT count_, community_id_
FROM {caggs_temp_table}
WHERE community_id_ > $1
ORDER BY community_id_
LIMIT $2
) AS b
WHERE a.id = b.community_id_
RETURNING a.id
",
interval.1
))
.bind::<Integer, _>(prev_community_id)
.bind::<Integer, _>(update_batch_size)
.get_results::<CommunityAggregatesUpdateResult>(conn)
.await
.inspect_err(|e| warn!("Failed to update community stats: {e}"))?;
processed_rows_count += updated_rows.len();
prev_community_id_res = updated_rows.last().map(|row| row.community_id);
}
let update_interactions_stmt = "update community ca set interactions_month = mv.count_ from r.community_aggregates_interactions('1 month') mv where ca.id = mv.community_id_";
sql_query(update_interactions_stmt).execute(conn).await?;
// Drop the temp table just in case
sql_query(drop_caggs_temp_table).execute(conn).await.ok();
info!(
"Finished community_aggregates active_{} (processed {} rows)",
interval.1, processed_rows_count
);
info!("Done.");
Ok(())
}
async fn update_local_user_count(pool: &mut DbPool<'_>) -> LemmyResult<()> {
info!("Updating the local user count...");
let conn = &mut get_conn(pool).await?;
let user_count = local_user::table
.inner_join(
person::table.left_join(
@ -599,7 +726,9 @@ mod tests {
let context = LemmyContext::init_test_context().await;
let data = TestData::create(&mut context.pool()).await?;
active_counts(&mut context.pool()).await?;
active_counts(&mut context.pool(), ONE_DAY).await?;
all_active_counts(&mut context.pool(), ALL_ACTIVE_INTERVALS).await?;
update_local_user_count(&mut context.pool()).await?;
update_hot_ranks(&mut context.pool()).await?;
update_banned_when_expired(&mut context.pool()).await?;
delete_instance_block_when_expired(&mut context.pool()).await?;

View file

@ -0,0 +1,6 @@
DROP INDEX idx_post_read_post;
DROP INDEX idx_post_hide_post;
DROP INDEX idx_post_saved_post;

View file

@ -0,0 +1,6 @@
CREATE INDEX idx_post_read_post ON post_read (post_id);
CREATE INDEX idx_post_hide_post ON post_hide (post_id);
CREATE INDEX idx_post_saved_post ON post_saved (post_id);

View file

@ -0,0 +1,2 @@
DROP INDEX idx_post_published, idx_post_like_published, idx_comment_like_published;

View file

@ -0,0 +1,7 @@
-- These actually increased query time, but they prevent more postgres workers from being launched, and so should free up locks.
CREATE INDEX idx_post_published ON post (published);
CREATE INDEX idx_post_like_published ON post_like (published);
CREATE INDEX idx_comment_like_published ON comment_like (published);

View file

@ -305,6 +305,16 @@ CREATE INDEX IF NOT EXISTS idx_post_like_post ON post_like (post_id);
CREATE INDEX idx_comment_like_comment ON comment_like (comment_id);
CREATE INDEX idx_post_hide_post ON post_hide (post_id);
CREATE INDEX idx_post_read_post ON post_read (post_id);
CREATE INDEX idx_post_saved_post ON post_saved (post_id);
CREATE INDEX idx_post_like_published ON post_like (published);
CREATE INDEX idx_comment_like_published ON comment_like (published);
DROP INDEX idx_person_actions_person, idx_person_actions_target, idx_post_actions_person, idx_post_actions_post;
-- Drop `NOT NULL` indexes of columns that still exist

View file

@ -223,8 +223,6 @@ DROP INDEX idx_post_featured_local_published;
DROP INDEX idx_post_featured_local_published_asc;
DROP INDEX idx_post_published;
DROP INDEX idx_post_published_asc;
DROP INDEX idx_search_combined_score;

View file

@ -212,8 +212,6 @@ CREATE INDEX idx_post_featured_local_score ON post USING btree (featured_local D
CREATE INDEX idx_post_nonzero_hotrank ON post USING btree (published DESC)
WHERE ((hot_rank <> (0)::double precision) OR (hot_rank_active <> (0)::double precision));
CREATE INDEX idx_post_published ON post USING btree (published DESC);
CREATE INDEX idx_post_published_asc ON post USING btree (reverse_timestamp_sort (published) DESC);
-- merge community_aggregates into community table

View file

@ -301,7 +301,7 @@ CREATE INDEX idx_post_language ON post USING btree (language_id);
CREATE INDEX idx_post_nonzero_hotrank ON post USING btree (published DESC)
WHERE ((hot_rank <> (0)::double precision) OR (hot_rank_active <> (0)::double precision));
CREATE INDEX idx_post_published ON post USING btree (published DESC);
CREATE INDEX idx_post_published ON post USING btree (published);
CREATE INDEX idx_post_published_asc ON post USING btree (reverse_timestamp_sort (published) DESC);

Some files were not shown because too many files have changed in this diff Show more