From 4ba6221e04ab3e186669aeaa890d23b1e3f3d1a9 Mon Sep 17 00:00:00 2001 From: dullbananas Date: Wed, 17 Apr 2024 17:58:44 -0700 Subject: [PATCH] Move SQL triggers from migrations into reusable sql file (#4333) * stuff * stuff including batch_upsert function * stuff * do things * stuff * different timestamps * stuff * Revert changes to comment.rs * Update comment.rs * Update comment.rs * Update post_view.rs * Update utils.rs * Update up.sql * Update up.sql * Update down.sql * Update up.sql * Update main.rs * use anyhow macro * Create down.sql * Create up.sql * Create replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update utils.rs * Update .woodpecker.yml * Update sql_format_check.sh * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Create dump_schema.sh * Update start_dev_db.sh * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * Update replaceable_schema.sql * stuff * Update replaceable_schema.sql * Update .pg_format * fmt * stuff * stuff (#21) * Update replaceable_schema.sql * Update up.sql * Update replaceable_schema.sql * fmt * update cargo.lock * stuff * Update replaceable_schema.sql * Remove truncate trigger because truncate is already restricted by foreign keys * Update replaceable_schema.sql * fix some things * Update replaceable_schema.sql * Update replaceable_schema.sql * Update .woodpecker.yml * stuff * fix TG_OP * Psql env vars * try to fix combine_transition_tables parse error * Revert "try to fix combine_transition_tables parse error" This reverts commit 75d00a46266fbb49b7fab1b149c79fa1c31ee84a. * refactor combine_transition_tables * try to fix create_triggers * fix some things * try to fix combined_transition_tables * fix sql errors * update comment count in post trigger * fmt * Revert "fmt" This reverts commit a5bcd0834bb91a63b66bf63a848caa078f193940. * Revert "update comment count in post trigger" This reverts commit 0066a4b42b3472c088eed945605a2cf0bfcc1362. * fix everything * Update replaceable_schema.sql * actually fix everything * refactor create_triggers * fix * add semicolons * add is_counted function and fix incorrect bool operator in update_comment_count_from_post * refactor comment trigger * refactor post trigger * fix * Delete crates/db_schema/src/utils/series.rs * subscribers_local * edit migrations * move migrations * remove utils::series module declaration * fix everything * stuff * Move sql to schema_setup dir * utils.sql * delete .pg_format * Update .woodpecker.yml * Update sql_format_check.sh * Update .woodpecker.yml * Merge remote-tracking branch 'upstream/main' into bliss * fmt * Create main.rs * Update lib.rs * Update main.rs * Update .woodpecker.yml * Update main.rs * Update Cargo.toml * Update .woodpecker.yml * Update .woodpecker.yml * Update triggers.sql * YAY * Update mod.rs * Update Cargo.toml * a * Update Cargo.toml * Update Cargo.toml * Delete crates/db_schema/src/main.rs * Update Cargo.toml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update utils.sql * Update utils.sql * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update down.sql * Update up.sql * Update triggers.sql * Update .woodpecker.yml * Update .woodpecker.yml * Update triggers.sql * Update down.sql * Update .woodpecker.yml * Update Cargo.toml * Update .woodpecker.yml * Update Cargo.toml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update .woodpecker.yml * Update mod.rs * Update Cargo.toml * Update mod.rs * make dump_schema.sh executable * fix dump_schema.sh * defer * diff dumps * fmt * Update utils.sql * Update .woodpecker.yml * use correct version for pg_dump * Update .woodpecker.yml * Update .woodpecker.yml * change migration date * atomic site_aggregates insert * temporarily repeat tests in CI * drop r schema in CI migration check * show ReceivedActivity::create error * move check_diesel_migration CI step * Update .woodpecker.yml * Update scheduled_tasks.rs * Update scheduled_tasks.rs * update cargo.lock * move sql files * move rank functions * filter post_aggregates update * fmt * cargo fmt * replace post_id with id * update cargo.lock * avoid locking rows that need no change in up.sql * only run replaceable_schema if migrations were run * debug ci test failure * make replaceable_schema work in CI * Update .woodpecker.yml * remove println * Use migration revert and git checkout * Update schema_setup.rs * Fix * Update schema_setup.rs * Update schema_setup.rs * Update .woodpecker.yml --------- Co-authored-by: Nutomic Co-authored-by: Dessalines --- .woodpecker.yml | 48 +- .../db_schema/replaceable_schema/triggers.sql | 476 ++++++++ crates/db_schema/replaceable_schema/utils.sql | 146 +++ crates/db_schema/src/impls/activity.rs | 9 +- crates/db_schema/src/lib.rs | 3 + crates/db_schema/src/schema_setup.rs | 64 + crates/db_schema/src/utils.rs | 33 +- .../down.sql | 1062 +++++++++++++++++ .../up.sql | 81 ++ scripts/dump_schema.sh | 16 + scripts/sql_format_check.sh | 6 +- scripts/start_dev_db.sh | 6 +- src/lib.rs | 20 +- src/scheduled_tasks.rs | 10 +- 14 files changed, 1919 insertions(+), 61 deletions(-) create mode 100644 crates/db_schema/replaceable_schema/triggers.sql create mode 100644 crates/db_schema/replaceable_schema/utils.sql create mode 100644 crates/db_schema/src/schema_setup.rs create mode 100644 migrations/2024-02-24-034523_replaceable-schema/down.sql create mode 100644 migrations/2024-02-24-034523_replaceable-schema/up.sql create mode 100755 scripts/dump_schema.sh diff --git a/.woodpecker.yml b/.woodpecker.yml index 4c9af5441..bca554733 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -143,16 +143,6 @@ steps: - diff tmp.schema crates/db_schema/src/schema.rs when: *slow_check_paths - check_diesel_migration_revertable: - image: willsquire/diesel-cli - environment: - CARGO_HOME: .cargo_home - DATABASE_URL: postgres://lemmy:password@database:5432/lemmy - commands: - - diesel migration run - - diesel migration redo - when: *slow_check_paths - check_db_perf_tool: image: *rust_image environment: @@ -194,6 +184,44 @@ steps: - cargo test --workspace --no-fail-fast when: *slow_check_paths + check_diesel_migration: + # TODO: use willsquire/diesel-cli image when shared libraries become optional in lemmy_server + image: *rust_image + environment: + LEMMY_DATABASE_URL: postgres://lemmy:password@database:5432/lemmy + RUST_BACKTRACE: "1" + CARGO_HOME: .cargo_home + DATABASE_URL: postgres://lemmy:password@database:5432/lemmy + PGUSER: lemmy + PGPASSWORD: password + PGHOST: database + PGDATABASE: lemmy + commands: + - cargo install diesel_cli + - export PATH="$CARGO_HOME/bin:$PATH" + # Run all migrations + - diesel migration run + # Dump schema to before.sqldump (PostgreSQL apt repo is used to prevent pg_dump version mismatch error) + - apt update && apt install -y lsb-release + - sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' + - wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - + - apt update && apt install -y postgresql-client-16 + - psql -c "DROP SCHEMA IF EXISTS r CASCADE;" + - pg_dump --no-owner --no-privileges --no-table-access-method --schema-only --no-sync -f before.sqldump + # Make sure that the newest migration is revertable without the `r` schema + - diesel migration redo + # Run schema setup twice, which fails on the 2nd time if `DROP SCHEMA IF EXISTS r CASCADE` drops the wrong things + - alias lemmy_schema_setup="target/lemmy_server --disable-scheduled-tasks --disable-http-server --disable-activity-sending" + - lemmy_schema_setup + - lemmy_schema_setup + # Make sure that the newest migration is revertable with the `r` schema + - diesel migration redo + # Check for changes in the schema, which would be caused by an incorrect migration + - psql -c "DROP SCHEMA IF EXISTS r CASCADE;" + - pg_dump --no-owner --no-privileges --no-table-access-method --schema-only --no-sync -f after.sqldump + - diff before.sqldump after.sqldump + when: *slow_check_paths + run_federation_tests: image: node:20-bookworm-slim environment: diff --git a/crates/db_schema/replaceable_schema/triggers.sql b/crates/db_schema/replaceable_schema/triggers.sql new file mode 100644 index 000000000..d869a5e1e --- /dev/null +++ b/crates/db_schema/replaceable_schema/triggers.sql @@ -0,0 +1,476 @@ +-- A trigger is associated with a table instead of a schema, so they can't be in the `r` schema. This is +-- okay if the function specified after `EXECUTE FUNCTION` is in `r`, since dropping the function drops the trigger. +-- +-- Tables that are updated by triggers should not have foreign keys that aren't set to `INITIALLY DEFERRED` +-- (even if only other columns are updated) because triggers can run after the deletion of referenced rows and +-- before the automatic deletion of the row that references it. This is not a problem for insert or delete. +-- +-- +-- +-- Create triggers for both post and comments +CREATE FUNCTION r.creator_id_from_post_aggregates (agg post_aggregates) + RETURNS int IMMUTABLE PARALLEL SAFE RETURN agg.creator_id; + +CREATE FUNCTION r.creator_id_from_comment_aggregates (agg comment_aggregates) + RETURNS int IMMUTABLE PARALLEL SAFE RETURN ( + SELECT + creator_id + FROM + comment + WHERE + comment.id = agg.comment_id LIMIT 1 +); + +CREATE PROCEDURE r.post_or_comment (table_name text) +LANGUAGE plpgsql +AS $a$ +BEGIN + EXECUTE replace($b$ + -- When a thing gets a vote, update its aggregates and its creator's aggregates + CALL r.create_triggers ('thing_like', $$ + BEGIN + WITH thing_diff AS ( UPDATE + thing_aggregates AS a + SET + score = a.score + diff.upvotes - diff.downvotes, upvotes = a.upvotes + diff.upvotes, downvotes = a.downvotes + diff.downvotes, controversy_rank = r.controversy_rank ((a.upvotes + diff.upvotes)::numeric, (a.downvotes + diff.downvotes)::numeric) + FROM ( + SELECT + (thing_like).thing_id, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score = 1), 0) AS upvotes, coalesce(sum(count_diff) FILTER (WHERE (thing_like).score != 1), 0) AS downvotes FROM select_old_and_new_rows AS old_and_new_rows GROUP BY (thing_like).thing_id) AS diff + WHERE + a.thing_id = diff.thing_id + RETURNING + r.creator_id_from_thing_aggregates (a.*) AS creator_id, diff.upvotes - diff.downvotes AS score) + UPDATE + person_aggregates AS a + SET + thing_score = a.thing_score + diff.score FROM ( + SELECT + creator_id, sum(score) AS score FROM thing_diff GROUP BY creator_id) AS diff + WHERE + a.person_id = diff.creator_id; + RETURN NULL; + END; + $$); + $b$, + 'thing', + table_name); +END; +$a$; + +CALL r.post_or_comment ('post'); + +CALL r.post_or_comment ('comment'); + +-- Create triggers that update counts in parent aggregates +CALL r.create_triggers ('comment', $$ +BEGIN + UPDATE + person_aggregates AS a + SET + comment_count = a.comment_count + diff.comment_count + FROM ( + SELECT + (comment).creator_id, coalesce(sum(count_diff), 0) AS comment_count + FROM select_old_and_new_rows AS old_and_new_rows + WHERE + r.is_counted (comment) + GROUP BY (comment).creator_id) AS diff +WHERE + a.person_id = diff.creator_id; + +UPDATE + site_aggregates AS a +SET + comments = a.comments + diff.comments +FROM ( + SELECT + coalesce(sum(count_diff), 0) AS comments + FROM + select_old_and_new_rows AS old_and_new_rows + WHERE + r.is_counted (comment) + AND (comment).local) AS diff; + +WITH post_diff AS ( + UPDATE + post_aggregates AS a + SET + comments = a.comments + diff.comments, + newest_comment_time = GREATEST (a.newest_comment_time, ( + SELECT + published + FROM select_new_rows AS new_comment + WHERE + a.post_id = new_comment.post_id ORDER BY published DESC LIMIT 1)), + newest_comment_time_necro = GREATEST (a.newest_comment_time_necro, ( + SELECT + published + FROM select_new_rows AS new_comment + WHERE + a.post_id = new_comment.post_id + -- Ignore comments from the post's creator + AND a.creator_id != new_comment.creator_id + -- Ignore comments on old posts + AND a.published > (new_comment.published - '2 days'::interval) + ORDER BY published DESC LIMIT 1)) + FROM ( + SELECT + (comment).post_id, + coalesce(sum(count_diff), 0) AS comments + FROM + select_old_and_new_rows AS old_and_new_rows + WHERE + r.is_counted (comment) + GROUP BY + (comment).post_id) AS diff + LEFT JOIN post ON post.id = diff.post_id + WHERE + a.post_id = diff.post_id + RETURNING + a.community_id, + diff.comments, + r.is_counted (post.*) AS include_in_community_aggregates) +UPDATE + community_aggregates AS a +SET + comments = a.comments + diff.comments +FROM ( + SELECT + community_id, + sum(comments) AS comments + FROM + post_diff + WHERE + post_diff.include_in_community_aggregates + GROUP BY + community_id) AS diff +WHERE + a.community_id = diff.community_id; + +RETURN NULL; + +END; + +$$); + +CALL r.create_triggers ('post', $$ +BEGIN + UPDATE + person_aggregates AS a + SET + post_count = a.post_count + diff.post_count + FROM ( + SELECT + (post).creator_id, coalesce(sum(count_diff), 0) AS post_count + FROM select_old_and_new_rows AS old_and_new_rows + WHERE + r.is_counted (post) + GROUP BY (post).creator_id) AS diff +WHERE + a.person_id = diff.creator_id; + +UPDATE + site_aggregates AS a +SET + posts = a.posts + diff.posts +FROM ( + SELECT + coalesce(sum(count_diff), 0) AS posts + FROM + select_old_and_new_rows AS old_and_new_rows + WHERE + r.is_counted (post) + AND (post).local) AS diff; + +UPDATE + community_aggregates AS a +SET + posts = a.posts + diff.posts +FROM ( + SELECT + (post).community_id, + coalesce(sum(count_diff), 0) AS posts + FROM + select_old_and_new_rows AS old_and_new_rows + WHERE + r.is_counted (post) + GROUP BY + (post).community_id) AS diff +WHERE + a.community_id = diff.community_id; + +RETURN NULL; + +END; + +$$); + +CALL r.create_triggers ('community', $$ +BEGIN + UPDATE + site_aggregates AS a + SET + communities = a.communities + diff.communities + FROM ( + SELECT + coalesce(sum(count_diff), 0) AS communities + FROM select_old_and_new_rows AS old_and_new_rows + WHERE + r.is_counted (community) + AND (community).local) AS diff; + +RETURN NULL; + +END; + +$$); + +CALL r.create_triggers ('person', $$ +BEGIN + UPDATE + site_aggregates AS a + SET + users = a.users + diff.users + FROM ( + SELECT + coalesce(sum(count_diff), 0) AS users + FROM select_old_and_new_rows AS old_and_new_rows + WHERE (person).local) AS diff; + +RETURN NULL; + +END; + +$$); + +-- For community_aggregates.comments, don't include comments of deleted or removed posts +CREATE FUNCTION r.update_comment_count_from_post () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE + community_aggregates AS a + SET + comments = a.comments + diff.comments + FROM ( + SELECT + old_post.community_id, + sum(( + CASE WHEN r.is_counted (new_post.*) THEN + 1 + ELSE + -1 + END) * post_aggregates.comments) AS comments + FROM + new_post + INNER JOIN old_post ON new_post.id = old_post.id + AND (r.is_counted (new_post.*) != r.is_counted (old_post.*)) + INNER JOIN post_aggregates ON post_aggregates.post_id = new_post.id + GROUP BY + old_post.community_id) AS diff +WHERE + a.community_id = diff.community_id; + RETURN NULL; +END; +$$; + +CREATE TRIGGER comment_count + AFTER UPDATE ON post REFERENCING OLD TABLE AS old_post NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE FUNCTION r.update_comment_count_from_post (); + +-- Count subscribers for communities. +-- subscribers should be updated only when a local community is followed by a local or remote person. +-- subscribers_local should be updated only when a local person follows a local or remote community. +CALL r.create_triggers ('community_follower', $$ +BEGIN + UPDATE + community_aggregates AS a + SET + subscribers = a.subscribers + diff.subscribers, subscribers_local = a.subscribers_local + diff.subscribers_local + FROM ( + SELECT + (community_follower).community_id, coalesce(sum(count_diff) FILTER (WHERE community.local), 0) AS subscribers, coalesce(sum(count_diff) FILTER (WHERE person.local), 0) AS subscribers_local + FROM select_old_and_new_rows AS old_and_new_rows + LEFT JOIN community ON community.id = (community_follower).community_id + LEFT JOIN person ON person.id = (community_follower).person_id GROUP BY (community_follower).community_id) AS diff +WHERE + a.community_id = diff.community_id; + +RETURN NULL; + +END; + +$$); + +-- These triggers create and update rows in each aggregates table to match its associated table's rows. +-- Deleting rows and updating IDs are already handled by `CASCADE` in foreign key constraints. +CREATE FUNCTION r.comment_aggregates_from_comment () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO comment_aggregates (comment_id, published) + SELECT + id, + published + FROM + new_comment; + RETURN NULL; +END; +$$; + +CREATE TRIGGER aggregates + AFTER INSERT ON comment REFERENCING NEW TABLE AS new_comment + FOR EACH STATEMENT + EXECUTE FUNCTION r.comment_aggregates_from_comment (); + +CREATE FUNCTION r.community_aggregates_from_community () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO community_aggregates (community_id, published) + SELECT + id, + published + FROM + new_community; + RETURN NULL; +END; +$$; + +CREATE TRIGGER aggregates + AFTER INSERT ON community REFERENCING NEW TABLE AS new_community + FOR EACH STATEMENT + EXECUTE FUNCTION r.community_aggregates_from_community (); + +CREATE FUNCTION r.person_aggregates_from_person () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO person_aggregates (person_id) + SELECT + id + FROM + new_person; + RETURN NULL; +END; +$$; + +CREATE TRIGGER aggregates + AFTER INSERT ON person REFERENCING NEW TABLE AS new_person + FOR EACH STATEMENT + EXECUTE FUNCTION r.person_aggregates_from_person (); + +CREATE FUNCTION r.post_aggregates_from_post () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id, featured_community, featured_local) + SELECT + new_post.id, + new_post.published, + new_post.published, + new_post.published, + new_post.community_id, + new_post.creator_id, + community.instance_id, + new_post.featured_community, + new_post.featured_local + FROM + new_post + INNER JOIN community ON community.id = new_post.community_id; + RETURN NULL; +END; +$$; + +CREATE TRIGGER aggregates + AFTER INSERT ON post REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE FUNCTION r.post_aggregates_from_post (); + +CREATE FUNCTION r.post_aggregates_from_post_update () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE + post_aggregates + SET + featured_community = new_post.featured_community, + featured_local = new_post.featured_local + FROM + new_post + INNER JOIN old_post ON old_post.id = new_post.id + AND (old_post.featured_community, + old_post.featured_local) != (new_post.featured_community, + old_post.featured_local) + WHERE + post_aggregates.post_id = new_post.id; + RETURN NULL; +END; +$$; + +CREATE TRIGGER aggregates_update + AFTER UPDATE ON post REFERENCING OLD TABLE AS old_post NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE FUNCTION r.post_aggregates_from_post_update (); + +CREATE FUNCTION r.site_aggregates_from_site () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + -- only 1 row can be in site_aggregates because of the index idx_site_aggregates_1_row_only. + -- we only ever want to have a single value in site_aggregate because the site_aggregate triggers update all rows in that table. + -- a cleaner check would be to insert it for the local_site but that would break assumptions at least in the tests + INSERT INTO site_aggregates (site_id) + VALUES (NEW.id) + ON CONFLICT ((TRUE)) + DO NOTHING; + RETURN NULL; +END; +$$; + +CREATE TRIGGER aggregates + AFTER INSERT ON site + FOR EACH ROW + EXECUTE FUNCTION r.site_aggregates_from_site (); + +-- Change the order of some cascading deletions to make deletion triggers run before the deletion of rows that the triggers need to read +CREATE FUNCTION r.delete_comments_before_post () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + DELETE FROM comment AS c + WHERE c.post_id = OLD.id; + RETURN OLD; +END; +$$; + +CREATE TRIGGER delete_comments + BEFORE DELETE ON post + FOR EACH ROW + EXECUTE FUNCTION r.delete_comments_before_post (); + +CREATE FUNCTION r.delete_follow_before_person () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + DELETE FROM community_follower AS c + WHERE c.person_id = OLD.id; + RETURN OLD; +END; +$$; + +CREATE TRIGGER delete_follow + BEFORE DELETE ON person + FOR EACH ROW + EXECUTE FUNCTION r.delete_follow_before_person (); + diff --git a/crates/db_schema/replaceable_schema/utils.sql b/crates/db_schema/replaceable_schema/utils.sql new file mode 100644 index 000000000..f236c5387 --- /dev/null +++ b/crates/db_schema/replaceable_schema/utils.sql @@ -0,0 +1,146 @@ +-- Each calculation used in triggers should be a single SQL language +-- expression so it can be inlined in migrations. +CREATE FUNCTION r.controversy_rank (upvotes numeric, downvotes numeric) + RETURNS float + LANGUAGE sql + IMMUTABLE PARALLEL SAFE RETURN CASE WHEN downvotes <= 0 + OR upvotes <= 0 THEN + 0 + ELSE + ( + upvotes + downvotes) * CASE WHEN upvotes > downvotes THEN + downvotes::float / upvotes::float + ELSE + upvotes::float / downvotes::float + END + END; + +CREATE FUNCTION r.hot_rank (score numeric, published timestamp with time zone) + RETURNS double precision + LANGUAGE sql + IMMUTABLE PARALLEL SAFE RETURN + -- after a week, it will default to 0. + CASE WHEN ( +now() - published) > '0 days' + AND ( +now() - published) < '7 days' THEN + -- Use greatest(2,score), so that the hot_rank will be positive and not ignored. + log ( + greatest (2, score + 2)) / power (((EXTRACT(EPOCH FROM (now() - published)) / 3600) + 2), 1.8) + ELSE + -- if the post is from the future, set hot score to 0. otherwise you can game the post to + -- always be on top even with only 1 vote by setting it to the future + 0.0 + END; + +CREATE FUNCTION r.scaled_rank (score numeric, published timestamp with time zone, users_active_month numeric) + RETURNS double precision + LANGUAGE sql + IMMUTABLE PARALLEL SAFE + -- Add 2 to avoid divide by zero errors + -- Default for score = 1, active users = 1, and now, is (0.1728 / log(2 + 1)) = 0.3621 + -- There may need to be a scale factor multiplied to users_active_month, to make + -- the log curve less pronounced. This can be tuned in the future. + RETURN ( + r.hot_rank (score, published) / log(2 + users_active_month) +); + +-- For tables with `deleted` and `removed` columns, this function determines which rows to include in a count. +CREATE FUNCTION r.is_counted (item record) + RETURNS bool + LANGUAGE plpgsql + IMMUTABLE PARALLEL SAFE + AS $$ +BEGIN + RETURN COALESCE(NOT (item.deleted + OR item.removed), FALSE); +END; +$$; + +-- This function creates statement-level triggers for all operation types. It's designed this way +-- because of these limitations: +-- * A trigger that uses transition tables can only handle 1 operation type. +-- * Transition tables must be relevant for the operation type (for example, `NEW TABLE` is +-- not allowed for a `DELETE` trigger) +-- * Transition tables are only provided to the trigger function, not to functions that it calls. +-- +-- This function can only be called once per table. The trigger function body given as the 2nd argument +-- and can contain these names, which are replaced with a `SELECT` statement in parenthesis if needed: +-- * `select_old_rows` +-- * `select_new_rows` +-- * `select_old_and_new_rows` with 2 columns: +-- 1. `count_diff`: `-1` for old rows and `1` for new rows, which can be used with `sum` to get the number +-- to add to a count +-- 2. (same name as the trigger's table): the old or new row as a composite value +CREATE PROCEDURE r.create_triggers (table_name text, function_body text) +LANGUAGE plpgsql +AS $a$ +DECLARE + defs text := $$ + -- Delete + CREATE FUNCTION r.thing_delete_statement () + RETURNS TRIGGER + LANGUAGE plpgsql + AS function_body_delete; + CREATE TRIGGER delete_statement + AFTER DELETE ON thing REFERENCING OLD TABLE AS select_old_rows + FOR EACH STATEMENT + EXECUTE FUNCTION r.thing_delete_statement ( ); + -- Insert + CREATE FUNCTION r.thing_insert_statement ( ) + RETURNS TRIGGER + LANGUAGE plpgsql + AS function_body_insert; + CREATE TRIGGER insert_statement + AFTER INSERT ON thing REFERENCING NEW TABLE AS select_new_rows + FOR EACH STATEMENT + EXECUTE FUNCTION r.thing_insert_statement ( ); + -- Update + CREATE FUNCTION r.thing_update_statement ( ) + RETURNS TRIGGER + LANGUAGE plpgsql + AS function_body_update; + CREATE TRIGGER update_statement + AFTER UPDATE ON thing REFERENCING OLD TABLE AS select_old_rows NEW TABLE AS select_new_rows + FOR EACH STATEMENT + EXECUTE FUNCTION r.thing_update_statement ( ); + $$; + select_old_and_new_rows text := $$ ( + SELECT + -1 AS count_diff, + old_table::thing AS thing + FROM + select_old_rows AS old_table + UNION ALL + SELECT + 1 AS count_diff, + new_table::thing AS thing + FROM + select_new_rows AS new_table) $$; + empty_select_new_rows text := $$ ( + SELECT + * + FROM + -- Real transition table + select_old_rows + WHERE + FALSE) $$; + empty_select_old_rows text := $$ ( + SELECT + * + FROM + -- Real transition table + select_new_rows + WHERE + FALSE) $$; + BEGIN + function_body := replace(function_body, 'select_old_and_new_rows', select_old_and_new_rows); + -- `select_old_rows` and `select_new_rows` are made available as empty tables if they don't already exist + defs := replace(defs, 'function_body_delete', quote_literal(replace(function_body, 'select_new_rows', empty_select_new_rows))); + defs := replace(defs, 'function_body_insert', quote_literal(replace(function_body, 'select_old_rows', empty_select_old_rows))); + defs := replace(defs, 'function_body_update', quote_literal(function_body)); + defs := replace(defs, 'thing', table_name); + EXECUTE defs; +END; +$a$; + diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index adcdf8ad5..9391d55bc 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -85,12 +85,9 @@ mod tests { .unwrap() .into(); - // inserting activity for first time - let res = ReceivedActivity::create(pool, &ap_id).await; - assert!(res.is_ok()); - - let res = ReceivedActivity::create(pool, &ap_id).await; - assert!(res.is_err()); + // inserting activity should only work once + ReceivedActivity::create(pool, &ap_id).await.unwrap(); + ReceivedActivity::create(pool, &ap_id).await.unwrap_err(); } #[tokio::test] diff --git a/crates/db_schema/src/lib.rs b/crates/db_schema/src/lib.rs index 05663ff3e..37c44e263 100644 --- a/crates/db_schema/src/lib.rs +++ b/crates/db_schema/src/lib.rs @@ -43,6 +43,9 @@ pub mod traits; #[cfg(feature = "full")] pub mod utils; +#[cfg(feature = "full")] +mod schema_setup; + use serde::{Deserialize, Serialize}; use strum_macros::{Display, EnumString}; #[cfg(feature = "full")] diff --git a/crates/db_schema/src/schema_setup.rs b/crates/db_schema/src/schema_setup.rs new file mode 100644 index 000000000..dc8cdf387 --- /dev/null +++ b/crates/db_schema/src/schema_setup.rs @@ -0,0 +1,64 @@ +use anyhow::Context; +use diesel::{connection::SimpleConnection, Connection, PgConnection}; +use diesel_migrations::{EmbeddedMigrations, MigrationHarness}; +use lemmy_utils::error::LemmyError; +use tracing::info; + +const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); + +/// This SQL code sets up the `r` schema, which contains things that can be safely dropped and replaced +/// instead of being changed using migrations. It may not create or modify things outside of the `r` schema +/// (indicated by `r.` before the name), unless a comment says otherwise. +/// +/// Currently, this code is only run after the server starts and there's at least 1 pending migration +/// to run. This means every time you change something here, you must also create a migration (a blank +/// up.sql file works fine). This behavior will be removed when we implement a better way to avoid +/// useless schema updates and locks. +/// +/// If you add something that depends on something (such as a table) created in a new migration, then down.sql +/// must use `CASCADE` when dropping it. This doesn't need to be fixed in old migrations because the +/// "replaceable-schema" migration runs `DROP SCHEMA IF EXISTS r CASCADE` in down.sql. +const REPLACEABLE_SCHEMA: &[&str] = &[ + "DROP SCHEMA IF EXISTS r CASCADE;", + "CREATE SCHEMA r;", + include_str!("../replaceable_schema/utils.sql"), + include_str!("../replaceable_schema/triggers.sql"), +]; + +pub fn run(db_url: &str) -> Result<(), LemmyError> { + // Migrations don't support async connection + let mut conn = PgConnection::establish(db_url).with_context(|| "Error connecting to database")?; + + // Run all pending migrations except for the newest one, then run the newest one in the same transaction + // as `REPLACEABLE_SCHEMA`. This code will be becone less hacky when the conditional setup of things in + // `REPLACEABLE_SCHEMA` is done without using the number of pending migrations. + info!("Running Database migrations (This may take a long time)..."); + let migrations = conn + .pending_migrations(MIGRATIONS) + .map_err(|e| anyhow::anyhow!("Couldn't determine pending migrations: {e}"))?; + for migration in migrations.iter().rev().skip(1).rev() { + conn + .run_migration(migration) + .map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?; + } + conn.transaction::<_, LemmyError, _>(|conn| { + if let Some(migration) = migrations.last() { + // Migration is run with a savepoint since there's already a transaction + conn + .run_migration(migration) + .map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?; + } else if !cfg!(debug_assertions) { + // In production, skip running `REPLACEABLE_SCHEMA` to avoid locking things in the schema. In + // CI, always run it because `diesel migration` commands would otherwise prevent it. + return Ok(()); + } + conn + .batch_execute(&REPLACEABLE_SCHEMA.join("\n")) + .context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?; + + Ok(()) + })?; + info!("Database migrations complete."); + + Ok(()) +} diff --git a/crates/db_schema/src/utils.rs b/crates/db_schema/src/utils.rs index dcd208881..9c711be2a 100644 --- a/crates/db_schema/src/utils.rs +++ b/crates/db_schema/src/utils.rs @@ -1,11 +1,4 @@ -use crate::{ - diesel::Connection, - diesel_migrations::MigrationHarness, - newtypes::DbUrl, - CommentSortType, - SortType, -}; -use anyhow::Context; +use crate::{newtypes::DbUrl, CommentSortType, SortType}; use chrono::{DateTime, TimeDelta, Utc}; use deadpool::Runtime; use diesel::{ @@ -21,7 +14,6 @@ use diesel::{ sql_types::{self, Timestamptz}, IntoSql, OptionalExtension, - PgConnection, }; use diesel_async::{ pg::AsyncPgConnection, @@ -32,7 +24,6 @@ use diesel_async::{ }, SimpleAsyncConnection, }; -use diesel_migrations::EmbeddedMigrations; use futures_util::{future::BoxFuture, Future, FutureExt}; use i_love_jesus::CursorKey; use lemmy_utils::{ @@ -50,7 +41,7 @@ use std::{ sync::Arc, time::{Duration, SystemTime}, }; -use tracing::{error, info}; +use tracing::error; use url::Url; const FETCH_LIMIT_DEFAULT: i64 = 10; @@ -364,21 +355,6 @@ impl ServerCertVerifier for NoCertVerifier { } } -pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); - -fn run_migrations(db_url: &str) -> LemmyResult<()> { - // Needs to be a sync connection - let mut conn = PgConnection::establish(db_url).with_context(|| "Error connecting to database")?; - - info!("Running Database migrations (This may take a long time)..."); - conn - .run_pending_migrations(MIGRATIONS) - .map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?; - info!("Database migrations complete."); - - Ok(()) -} - pub async fn build_db_pool() -> LemmyResult { let db_url = SETTINGS.get_database_url(); // We only support TLS with sslmode=require currently @@ -407,7 +383,7 @@ pub async fn build_db_pool() -> LemmyResult { })) .build()?; - run_migrations(&db_url)?; + crate::schema_setup::run(&db_url)?; Ok(pool) } @@ -449,14 +425,17 @@ pub mod functions { use diesel::sql_types::{BigInt, Text, Timestamptz}; sql_function! { + #[sql_name = "r.hot_rank"] fn hot_rank(score: BigInt, time: Timestamptz) -> Double; } sql_function! { + #[sql_name = "r.scaled_rank"] fn scaled_rank(score: BigInt, time: Timestamptz, users_active_month: BigInt) -> Double; } sql_function! { + #[sql_name = "r.controversy_rank"] fn controversy_rank(upvotes: BigInt, downvotes: BigInt, score: BigInt) -> Double; } diff --git a/migrations/2024-02-24-034523_replaceable-schema/down.sql b/migrations/2024-02-24-034523_replaceable-schema/down.sql new file mode 100644 index 000000000..db8feacdf --- /dev/null +++ b/migrations/2024-02-24-034523_replaceable-schema/down.sql @@ -0,0 +1,1062 @@ +DROP SCHEMA IF EXISTS r CASCADE; + +DROP INDEX idx_site_aggregates_1_row_only; + +CREATE FUNCTION comment_aggregates_comment () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO comment_aggregates (comment_id, published) + VALUES (NEW.id, NEW.published); + ELSIF (TG_OP = 'DELETE') THEN + DELETE FROM comment_aggregates + WHERE comment_id = OLD.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION comment_aggregates_score () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + UPDATE + comment_aggregates ca + SET + score = score + NEW.score, + upvotes = CASE WHEN NEW.score = 1 THEN + upvotes + 1 + ELSE + upvotes + END, + downvotes = CASE WHEN NEW.score = - 1 THEN + downvotes + 1 + ELSE + downvotes + END, + controversy_rank = controversy_rank (ca.upvotes + CASE WHEN NEW.score = 1 THEN + 1 + ELSE + 0 + END::numeric, ca.downvotes + CASE WHEN NEW.score = - 1 THEN + 1 + ELSE + 0 + END::numeric) + WHERE + ca.comment_id = NEW.comment_id; + ELSIF (TG_OP = 'DELETE') THEN + -- Join to comment because that comment may not exist anymore + UPDATE + comment_aggregates ca + SET + score = score - OLD.score, + upvotes = CASE WHEN OLD.score = 1 THEN + upvotes - 1 + ELSE + upvotes + END, + downvotes = CASE WHEN OLD.score = - 1 THEN + downvotes - 1 + ELSE + downvotes + END, + controversy_rank = controversy_rank (ca.upvotes + CASE WHEN NEW.score = 1 THEN + 1 + ELSE + 0 + END::numeric, ca.downvotes + CASE WHEN NEW.score = - 1 THEN + 1 + ELSE + 0 + END::numeric) + FROM + comment c + WHERE + ca.comment_id = c.id + AND ca.comment_id = OLD.comment_id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION community_aggregates_comment_count () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + community_aggregates ca + SET + comments = comments + 1 + FROM + post p + WHERE + p.id = NEW.post_id + AND ca.community_id = p.community_id; + ELSIF (was_removed_or_deleted (TG_OP, OLD, NEW)) THEN + UPDATE + community_aggregates ca + SET + comments = comments - 1 + FROM + post p + WHERE + p.id = OLD.post_id + AND ca.community_id = p.community_id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION community_aggregates_community () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO community_aggregates (community_id, published) + VALUES (NEW.id, NEW.published); + ELSIF (TG_OP = 'DELETE') THEN + DELETE FROM community_aggregates + WHERE community_id = OLD.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION community_aggregates_post_count () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + community_aggregates + SET + posts = posts + 1 + WHERE + community_id = NEW.community_id; + IF (TG_OP = 'UPDATE') THEN + -- Post was restored, so restore comment counts as well + UPDATE + community_aggregates ca + SET + posts = coalesce(cd.posts, 0), + comments = coalesce(cd.comments, 0) + FROM ( + SELECT + c.id, + count(DISTINCT p.id) AS posts, + count(DISTINCT ct.id) AS comments + FROM + community c + LEFT JOIN post p ON c.id = p.community_id + AND p.deleted = 'f' + AND p.removed = 'f' + LEFT JOIN comment ct ON p.id = ct.post_id + AND ct.deleted = 'f' + AND ct.removed = 'f' + WHERE + c.id = NEW.community_id + GROUP BY + c.id) cd + WHERE + ca.community_id = NEW.community_id; + END IF; + ELSIF (was_removed_or_deleted (TG_OP, OLD, NEW)) THEN + UPDATE + community_aggregates + SET + posts = posts - 1 + WHERE + community_id = OLD.community_id; + -- Update the counts if the post got deleted + UPDATE + community_aggregates ca + SET + posts = coalesce(cd.posts, 0), + comments = coalesce(cd.comments, 0) + FROM ( + SELECT + c.id, + count(DISTINCT p.id) AS posts, + count(DISTINCT ct.id) AS comments + FROM + community c + LEFT JOIN post p ON c.id = p.community_id + AND p.deleted = 'f' + AND p.removed = 'f' + LEFT JOIN comment ct ON p.id = ct.post_id + AND ct.deleted = 'f' + AND ct.removed = 'f' + WHERE + c.id = OLD.community_id + GROUP BY + c.id) cd + WHERE + ca.community_id = OLD.community_id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION community_aggregates_post_count_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE + community_aggregates + SET + posts = posts + post_group.count + FROM ( + SELECT + community_id, + count(*) + FROM + new_post + GROUP BY + community_id) post_group +WHERE + community_aggregates.community_id = post_group.community_id; + RETURN NULL; +END +$$; + +CREATE FUNCTION community_aggregates_subscriber_count () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + UPDATE + community_aggregates ca + SET + subscribers = subscribers + community.local::int, + subscribers_local = subscribers_local + person.local::int + FROM + community + LEFT JOIN person ON person.id = NEW.person_id + WHERE + community.id = NEW.community_id + AND community.id = ca.community_id + AND person.local IS NOT NULL; + ELSIF (TG_OP = 'DELETE') THEN + UPDATE + community_aggregates ca + SET + subscribers = subscribers - community.local::int, + subscribers_local = subscribers_local - person.local::int + FROM + community + LEFT JOIN person ON person.id = OLD.person_id + WHERE + community.id = OLD.community_id + AND community.id = ca.community_id + AND person.local IS NOT NULL; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION delete_follow_before_person () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + DELETE FROM community_follower AS c + WHERE c.person_id = OLD.id; + RETURN OLD; +END; +$$; + +CREATE FUNCTION person_aggregates_comment_count () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + person_aggregates + SET + comment_count = comment_count + 1 + WHERE + person_id = NEW.creator_id; + ELSIF (was_removed_or_deleted (TG_OP, OLD, NEW)) THEN + UPDATE + person_aggregates + SET + comment_count = comment_count - 1 + WHERE + person_id = OLD.creator_id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION person_aggregates_comment_score () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + -- Need to get the post creator, not the voter + UPDATE + person_aggregates ua + SET + comment_score = comment_score + NEW.score + FROM + comment c + WHERE + ua.person_id = c.creator_id + AND c.id = NEW.comment_id; + ELSIF (TG_OP = 'DELETE') THEN + UPDATE + person_aggregates ua + SET + comment_score = comment_score - OLD.score + FROM + comment c + WHERE + ua.person_id = c.creator_id + AND c.id = OLD.comment_id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION person_aggregates_person () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO person_aggregates (person_id) + VALUES (NEW.id); + ELSIF (TG_OP = 'DELETE') THEN + DELETE FROM person_aggregates + WHERE person_id = OLD.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION person_aggregates_post_count () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + person_aggregates + SET + post_count = post_count + 1 + WHERE + person_id = NEW.creator_id; + ELSIF (was_removed_or_deleted (TG_OP, OLD, NEW)) THEN + UPDATE + person_aggregates + SET + post_count = post_count - 1 + WHERE + person_id = OLD.creator_id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION person_aggregates_post_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE + person_aggregates + SET + post_count = post_count + post_group.count + FROM ( + SELECT + creator_id, + count(*) + FROM + new_post + GROUP BY + creator_id) post_group +WHERE + person_aggregates.person_id = post_group.creator_id; + RETURN NULL; +END +$$; + +CREATE FUNCTION person_aggregates_post_score () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + -- Need to get the post creator, not the voter + UPDATE + person_aggregates ua + SET + post_score = post_score + NEW.score + FROM + post p + WHERE + ua.person_id = p.creator_id + AND p.id = NEW.post_id; + ELSIF (TG_OP = 'DELETE') THEN + UPDATE + person_aggregates ua + SET + post_score = post_score - OLD.score + FROM + post p + WHERE + ua.person_id = p.creator_id + AND p.id = OLD.post_id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION post_aggregates_comment_count () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + -- Check for post existence - it may not exist anymore + IF TG_OP = 'INSERT' OR EXISTS ( + SELECT + 1 + FROM + post p + WHERE + p.id = OLD.post_id) THEN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + post_aggregates pa + SET + comments = comments + 1 + WHERE + pa.post_id = NEW.post_id; + ELSIF (was_removed_or_deleted (TG_OP, OLD, NEW)) THEN + UPDATE + post_aggregates pa + SET + comments = comments - 1 + WHERE + pa.post_id = OLD.post_id; + END IF; + END IF; + IF TG_OP = 'INSERT' THEN + UPDATE + post_aggregates pa + SET + newest_comment_time = NEW.published + WHERE + pa.post_id = NEW.post_id; + -- A 2 day necro-bump limit + UPDATE + post_aggregates pa + SET + newest_comment_time_necro = NEW.published + FROM + post p + WHERE + pa.post_id = p.id + AND pa.post_id = NEW.post_id + -- Fix issue with being able to necro-bump your own post + AND NEW.creator_id != p.creator_id + AND pa.published > ('now'::timestamp - '2 days'::interval); + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION post_aggregates_featured_community () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE + post_aggregates pa + SET + featured_community = NEW.featured_community + WHERE + pa.post_id = NEW.id; + RETURN NULL; +END +$$; + +CREATE FUNCTION post_aggregates_featured_local () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE + post_aggregates pa + SET + featured_local = NEW.featured_local + WHERE + pa.post_id = NEW.id; + RETURN NULL; +END +$$; + +CREATE FUNCTION post_aggregates_post () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + INSERT INTO post_aggregates (post_id, published, newest_comment_time, newest_comment_time_necro, community_id, creator_id, instance_id) + SELECT + id, + published, + published, + published, + community_id, + creator_id, + ( + SELECT + community.instance_id + FROM + community + WHERE + community.id = community_id + LIMIT 1) +FROM + new_post; + RETURN NULL; +END +$$; + +CREATE FUNCTION post_aggregates_score () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + UPDATE + post_aggregates pa + SET + score = score + NEW.score, + upvotes = CASE WHEN NEW.score = 1 THEN + upvotes + 1 + ELSE + upvotes + END, + downvotes = CASE WHEN NEW.score = - 1 THEN + downvotes + 1 + ELSE + downvotes + END, + controversy_rank = controversy_rank (pa.upvotes + CASE WHEN NEW.score = 1 THEN + 1 + ELSE + 0 + END::numeric, pa.downvotes + CASE WHEN NEW.score = - 1 THEN + 1 + ELSE + 0 + END::numeric) + WHERE + pa.post_id = NEW.post_id; + ELSIF (TG_OP = 'DELETE') THEN + -- Join to post because that post may not exist anymore + UPDATE + post_aggregates pa + SET + score = score - OLD.score, + upvotes = CASE WHEN OLD.score = 1 THEN + upvotes - 1 + ELSE + upvotes + END, + downvotes = CASE WHEN OLD.score = - 1 THEN + downvotes - 1 + ELSE + downvotes + END, + controversy_rank = controversy_rank (pa.upvotes + CASE WHEN NEW.score = 1 THEN + 1 + ELSE + 0 + END::numeric, pa.downvotes + CASE WHEN NEW.score = - 1 THEN + 1 + ELSE + 0 + END::numeric) + FROM + post p + WHERE + pa.post_id = p.id + AND pa.post_id = OLD.post_id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_comment_delete () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_removed_or_deleted (TG_OP, OLD, NEW)) THEN + UPDATE + site_aggregates sa + SET + comments = comments - 1 + FROM + site s + WHERE + sa.site_id = s.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_comment_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + site_aggregates sa + SET + comments = comments + 1 + FROM + site s + WHERE + sa.site_id = s.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_community_delete () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_removed_or_deleted (TG_OP, OLD, NEW)) THEN + UPDATE + site_aggregates sa + SET + communities = communities - 1 + FROM + site s + WHERE + sa.site_id = s.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_community_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + site_aggregates sa + SET + communities = communities + 1 + FROM + site s + WHERE + sa.site_id = s.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_person_delete () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + -- Join to site since the creator might not be there anymore + UPDATE + site_aggregates sa + SET + users = users - 1 + FROM + site s + WHERE + sa.site_id = s.id; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_person_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE + site_aggregates + SET + users = users + 1; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_post_delete () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_removed_or_deleted (TG_OP, OLD, NEW)) THEN + UPDATE + site_aggregates sa + SET + posts = posts - 1 + FROM + site s + WHERE + sa.site_id = s.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_post_insert () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE + site_aggregates sa + SET + posts = posts + ( + SELECT + count(*) + FROM + new_post) + FROM + site s + WHERE + sa.site_id = s.id; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_post_update () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + IF (was_restored_or_created (TG_OP, OLD, NEW)) THEN + UPDATE + site_aggregates sa + SET + posts = posts + 1 + FROM + site s + WHERE + sa.site_id = s.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION site_aggregates_site () + RETURNS TRIGGER + LANGUAGE plpgsql + AS $$ +BEGIN + -- we only ever want to have a single value in site_aggregate because the site_aggregate triggers update all rows in that table. + -- a cleaner check would be to insert it for the local_site but that would break assumptions at least in the tests + IF (TG_OP = 'INSERT') AND NOT EXISTS ( + SELECT + * + FROM + site_aggregates + LIMIT 1) THEN + INSERT INTO site_aggregates (site_id) + VALUES (NEW.id); + ELSIF (TG_OP = 'DELETE') THEN + DELETE FROM site_aggregates + WHERE site_id = OLD.id; + END IF; + RETURN NULL; +END +$$; + +CREATE FUNCTION was_removed_or_deleted (tg_op text, old record, new record) + RETURNS boolean + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'INSERT') THEN + RETURN FALSE; + END IF; + IF (TG_OP = 'DELETE' AND OLD.deleted = 'f' AND OLD.removed = 'f') THEN + RETURN TRUE; + END IF; + RETURN TG_OP = 'UPDATE' + AND OLD.deleted = 'f' + AND OLD.removed = 'f' + AND (NEW.deleted = 't' + OR NEW.removed = 't'); +END +$$; + +CREATE FUNCTION was_restored_or_created (tg_op text, old record, new record) + RETURNS boolean + LANGUAGE plpgsql + AS $$ +BEGIN + IF (TG_OP = 'DELETE') THEN + RETURN FALSE; + END IF; + IF (TG_OP = 'INSERT') THEN + RETURN TRUE; + END IF; + RETURN TG_OP = 'UPDATE' + AND NEW.deleted = 'f' + AND NEW.removed = 'f' + AND (OLD.deleted = 't' + OR OLD.removed = 't'); +END +$$; + +CREATE TRIGGER comment_aggregates_comment + AFTER INSERT OR DELETE ON comment + FOR EACH ROW + EXECUTE FUNCTION comment_aggregates_comment (); + +CREATE TRIGGER comment_aggregates_score + AFTER INSERT OR DELETE ON comment_like + FOR EACH ROW + EXECUTE FUNCTION comment_aggregates_score (); + +CREATE TRIGGER community_aggregates_comment_count + AFTER INSERT OR DELETE OR UPDATE OF removed, + deleted ON comment + FOR EACH ROW + EXECUTE FUNCTION community_aggregates_comment_count (); + +CREATE TRIGGER community_aggregates_community + AFTER INSERT OR DELETE ON community + FOR EACH ROW + EXECUTE FUNCTION community_aggregates_community (); + +CREATE TRIGGER community_aggregates_post_count + AFTER DELETE OR UPDATE OF removed, + deleted ON post + FOR EACH ROW + EXECUTE FUNCTION community_aggregates_post_count (); + +CREATE TRIGGER community_aggregates_post_count_insert + AFTER INSERT ON post REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE FUNCTION community_aggregates_post_count_insert (); + +CREATE TRIGGER community_aggregates_subscriber_count + AFTER INSERT OR DELETE ON community_follower + FOR EACH ROW + EXECUTE FUNCTION community_aggregates_subscriber_count (); + +CREATE TRIGGER delete_follow_before_person + BEFORE DELETE ON person + FOR EACH ROW + EXECUTE FUNCTION delete_follow_before_person (); + +CREATE TRIGGER person_aggregates_comment_count + AFTER INSERT OR DELETE OR UPDATE OF removed, + deleted ON comment + FOR EACH ROW + EXECUTE FUNCTION person_aggregates_comment_count (); + +CREATE TRIGGER person_aggregates_comment_score + AFTER INSERT OR DELETE ON comment_like + FOR EACH ROW + EXECUTE FUNCTION person_aggregates_comment_score (); + +CREATE TRIGGER person_aggregates_person + AFTER INSERT OR DELETE ON person + FOR EACH ROW + EXECUTE FUNCTION person_aggregates_person (); + +CREATE TRIGGER person_aggregates_post_count + AFTER DELETE OR UPDATE OF removed, + deleted ON post + FOR EACH ROW + EXECUTE FUNCTION person_aggregates_post_count (); + +CREATE TRIGGER person_aggregates_post_insert + AFTER INSERT ON post REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE FUNCTION person_aggregates_post_insert (); + +CREATE TRIGGER person_aggregates_post_score + AFTER INSERT OR DELETE ON post_like + FOR EACH ROW + EXECUTE FUNCTION person_aggregates_post_score (); + +CREATE TRIGGER post_aggregates_comment_count + AFTER INSERT OR DELETE OR UPDATE OF removed, + deleted ON comment + FOR EACH ROW + EXECUTE FUNCTION post_aggregates_comment_count (); + +CREATE TRIGGER post_aggregates_featured_community + AFTER UPDATE ON post + FOR EACH ROW + WHEN ((old.featured_community IS DISTINCT FROM new.featured_community)) + EXECUTE FUNCTION post_aggregates_featured_community (); + +CREATE TRIGGER post_aggregates_featured_local + AFTER UPDATE ON post + FOR EACH ROW + WHEN ((old.featured_local IS DISTINCT FROM new.featured_local)) + EXECUTE FUNCTION post_aggregates_featured_local (); + +CREATE TRIGGER post_aggregates_post + AFTER INSERT ON post REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE FUNCTION post_aggregates_post (); + +CREATE TRIGGER post_aggregates_score + AFTER INSERT OR DELETE ON post_like + FOR EACH ROW + EXECUTE FUNCTION post_aggregates_score (); + +CREATE TRIGGER site_aggregates_comment_delete + AFTER DELETE OR UPDATE OF removed, + deleted ON comment + FOR EACH ROW + WHEN ((old.local = TRUE)) + EXECUTE FUNCTION site_aggregates_comment_delete (); + +CREATE TRIGGER site_aggregates_comment_insert + AFTER INSERT OR UPDATE OF removed, + deleted ON comment + FOR EACH ROW + WHEN ((new.local = TRUE)) + EXECUTE FUNCTION site_aggregates_comment_insert (); + +CREATE TRIGGER site_aggregates_community_insert + AFTER INSERT OR UPDATE OF removed, + deleted ON community + FOR EACH ROW + WHEN ((new.local = TRUE)) + EXECUTE FUNCTION site_aggregates_community_insert (); + +CREATE TRIGGER site_aggregates_person_delete + AFTER DELETE ON person + FOR EACH ROW + WHEN ((old.local = TRUE)) + EXECUTE FUNCTION site_aggregates_person_delete (); + +CREATE TRIGGER site_aggregates_person_insert + AFTER INSERT ON person + FOR EACH ROW + WHEN ((new.local = TRUE)) + EXECUTE FUNCTION site_aggregates_person_insert (); + +CREATE TRIGGER site_aggregates_post_delete + AFTER DELETE OR UPDATE OF removed, + deleted ON post + FOR EACH ROW + WHEN ((old.local = TRUE)) + EXECUTE FUNCTION site_aggregates_post_delete (); + +CREATE TRIGGER site_aggregates_post_insert + AFTER INSERT ON post REFERENCING NEW TABLE AS new_post + FOR EACH STATEMENT + EXECUTE FUNCTION site_aggregates_post_insert (); + +CREATE TRIGGER site_aggregates_post_update + AFTER UPDATE OF removed, + deleted ON post + FOR EACH ROW + WHEN ((new.local = TRUE)) + EXECUTE FUNCTION site_aggregates_post_update (); + +CREATE TRIGGER site_aggregates_site + AFTER INSERT OR DELETE ON site + FOR EACH ROW + EXECUTE FUNCTION site_aggregates_site (); + +-- Rank functions +CREATE FUNCTION controversy_rank (upvotes numeric, downvotes numeric) + RETURNS double precision + LANGUAGE plpgsql + IMMUTABLE + AS $$ +BEGIN + IF downvotes <= 0 OR upvotes <= 0 THEN + RETURN 0; + ELSE + RETURN (upvotes + downvotes) * CASE WHEN upvotes > downvotes THEN + downvotes::float / upvotes::float + ELSE + upvotes::float / downvotes::float + END; + END IF; +END; +$$; + +CREATE FUNCTION hot_rank (score numeric, published timestamp with time zone) + RETURNS double precision + LANGUAGE plpgsql + IMMUTABLE PARALLEL SAFE + AS $$ +DECLARE + hours_diff numeric := EXTRACT(EPOCH FROM (now() - published)) / 3600; +BEGIN + -- 24 * 7 = 168, so after a week, it will default to 0. + IF (hours_diff > 0 AND hours_diff < 168) THEN + -- Use greatest(2,score), so that the hot_rank will be positive and not ignored. + RETURN log(greatest (2, score + 2)) / power((hours_diff + 2), 1.8); + ELSE + -- if the post is from the future, set hot score to 0. otherwise you can game the post to + -- always be on top even with only 1 vote by setting it to the future + RETURN 0.0; + END IF; +END; +$$; + +CREATE FUNCTION scaled_rank (score numeric, published timestamp with time zone, users_active_month numeric) + RETURNS double precision + LANGUAGE plpgsql + IMMUTABLE PARALLEL SAFE + AS $$ +BEGIN + -- Add 2 to avoid divide by zero errors + -- Default for score = 1, active users = 1, and now, is (0.1728 / log(2 + 1)) = 0.3621 + -- There may need to be a scale factor multiplied to users_active_month, to make + -- the log curve less pronounced. This can be tuned in the future. + RETURN (hot_rank (score, published) / log(2 + users_active_month)); +END; +$$; + +-- Don't defer constraints +ALTER TABLE comment_aggregates + ALTER CONSTRAINT comment_aggregates_comment_id_fkey NOT DEFERRABLE; + +ALTER TABLE community_aggregates + ALTER CONSTRAINT community_aggregates_community_id_fkey NOT DEFERRABLE; + +ALTER TABLE person_aggregates + ALTER CONSTRAINT person_aggregates_person_id_fkey NOT DEFERRABLE; + +ALTER TABLE post_aggregates + ALTER CONSTRAINT post_aggregates_community_id_fkey NOT DEFERRABLE, + ALTER CONSTRAINT post_aggregates_creator_id_fkey NOT DEFERRABLE, + ALTER CONSTRAINT post_aggregates_instance_id_fkey NOT DEFERRABLE, + ALTER CONSTRAINT post_aggregates_post_id_fkey NOT DEFERRABLE; + +ALTER TABLE site_aggregates + ALTER CONSTRAINT site_aggregates_site_id_fkey NOT DEFERRABLE; + diff --git a/migrations/2024-02-24-034523_replaceable-schema/up.sql b/migrations/2024-02-24-034523_replaceable-schema/up.sql new file mode 100644 index 000000000..05e57afc0 --- /dev/null +++ b/migrations/2024-02-24-034523_replaceable-schema/up.sql @@ -0,0 +1,81 @@ +CREATE UNIQUE INDEX idx_site_aggregates_1_row_only ON site_aggregates ((TRUE)); + +-- Drop functions and use `CASCADE` to drop the triggers that use them +DROP FUNCTION comment_aggregates_comment, comment_aggregates_score, community_aggregates_comment_count, community_aggregates_community, community_aggregates_post_count, community_aggregates_post_count_insert, community_aggregates_subscriber_count, delete_follow_before_person, person_aggregates_comment_count, person_aggregates_comment_score, person_aggregates_person, person_aggregates_post_count, person_aggregates_post_insert, person_aggregates_post_score, post_aggregates_comment_count, post_aggregates_featured_community, post_aggregates_featured_local, post_aggregates_post, post_aggregates_score, site_aggregates_comment_delete, site_aggregates_comment_insert, site_aggregates_community_delete, site_aggregates_community_insert, site_aggregates_person_delete, site_aggregates_person_insert, site_aggregates_post_delete, site_aggregates_post_insert, site_aggregates_post_update, site_aggregates_site, was_removed_or_deleted, was_restored_or_created CASCADE; + +-- Drop rank functions +DROP FUNCTION controversy_rank, scaled_rank, hot_rank; + +-- Defer constraints +ALTER TABLE comment_aggregates + ALTER CONSTRAINT comment_aggregates_comment_id_fkey INITIALLY DEFERRED; + +ALTER TABLE community_aggregates + ALTER CONSTRAINT community_aggregates_community_id_fkey INITIALLY DEFERRED; + +ALTER TABLE person_aggregates + ALTER CONSTRAINT person_aggregates_person_id_fkey INITIALLY DEFERRED; + +ALTER TABLE post_aggregates + ALTER CONSTRAINT post_aggregates_community_id_fkey INITIALLY DEFERRED, + ALTER CONSTRAINT post_aggregates_creator_id_fkey INITIALLY DEFERRED, + ALTER CONSTRAINT post_aggregates_instance_id_fkey INITIALLY DEFERRED, + ALTER CONSTRAINT post_aggregates_post_id_fkey INITIALLY DEFERRED; + +ALTER TABLE site_aggregates + ALTER CONSTRAINT site_aggregates_site_id_fkey INITIALLY DEFERRED; + +-- Fix values that might be incorrect because of the old triggers +UPDATE + post_aggregates +SET + featured_local = post.featured_local, + featured_community = post.featured_community +FROM + post +WHERE + post_aggregates.post_id = post.id + AND (post_aggregates.featured_local, + post_aggregates.featured_community) != (post.featured_local, + post.featured_community); + +UPDATE + community_aggregates +SET + comments = counted.comments +FROM ( + SELECT + community_id, + count(*) AS comments + FROM + comment, + LATERAL ( + SELECT + * + FROM + post + WHERE + post.id = comment.post_id + LIMIT 1) AS post + WHERE + NOT (comment.deleted + OR comment.removed + OR post.deleted + OR post.removed) + GROUP BY + community_id) AS counted +WHERE + community_aggregates.community_id = counted.community_id + AND community_aggregates.comments != counted.comments; + +UPDATE + site_aggregates +SET + communities = ( + SELECT + count(*) + FROM + community + WHERE + local); + diff --git a/scripts/dump_schema.sh b/scripts/dump_schema.sh new file mode 100755 index 000000000..f783be26b --- /dev/null +++ b/scripts/dump_schema.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -e + +# Dumps database schema, not including things that are added outside of migrations + +CWD="$(cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd -P)" + +cd $CWD/../ + +source scripts/start_dev_db.sh + +diesel migration run +pg_dump --no-owner --no-privileges --no-table-access-method --schema-only --no-sync -f schema.sqldump + +pg_ctl stop +rm -rf $PGDATA diff --git a/scripts/sql_format_check.sh b/scripts/sql_format_check.sh index a75425da8..fabc3b3ed 100755 --- a/scripts/sql_format_check.sh +++ b/scripts/sql_format_check.sh @@ -9,10 +9,12 @@ cd $CWD/../ # Copy the files to a temp dir TMP_DIR=$(mktemp -d) -cp -a migrations/. $TMP_DIR +cp -a migrations/. $TMP_DIR/migrations +cp -a crates/db_schema/replaceable_schema/. $TMP_DIR/replaceable_schema # Format the new files find $TMP_DIR -type f -name '*.sql' -exec pg_format -i {} + # Diff the directories -diff -r migrations $TMP_DIR +diff -r migrations $TMP_DIR/migrations +diff -r crates/db_schema/replaceable_schema $TMP_DIR/replaceable_schema diff --git a/scripts/start_dev_db.sh b/scripts/start_dev_db.sh index 8ea4a294e..5965316ba 100644 --- a/scripts/start_dev_db.sh +++ b/scripts/start_dev_db.sh @@ -2,8 +2,10 @@ export PGDATA="$PWD/dev_pgdata" export PGHOST=$PWD +export PGUSER=postgres export DATABASE_URL="postgresql://lemmy:password@/lemmy?host=$PWD" export LEMMY_DATABASE_URL=$DATABASE_URL +export PGDATABASE=lemmy # If cluster exists, stop the server and delete the cluster if [[ -d $PGDATA ]] @@ -44,5 +46,5 @@ pg_ctl init --silent --options="--username=postgres --auth=trust --no-instructio pg_ctl start --silent --options="${config_args[*]}" # Setup database -psql --quiet -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" -U postgres -psql --quiet -c "CREATE DATABASE lemmy WITH OWNER lemmy;" -U postgres +PGDATABASE=postgres psql --quiet -c "CREATE USER lemmy WITH PASSWORD 'password' SUPERUSER;" +PGDATABASE=postgres psql --quiet -c "CREATE DATABASE lemmy WITH OWNER lemmy;" diff --git a/src/lib.rs b/src/lib.rs index 12a0f4a7d..633fd5313 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -218,15 +218,17 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut terminate = tokio::signal::unix::signal(SignalKind::terminate())?; - tokio::select! { - _ = tokio::signal::ctrl_c() => { - tracing::warn!("Received ctrl-c, shutting down gracefully..."); - } - _ = interrupt.recv() => { - tracing::warn!("Received interrupt, shutting down gracefully..."); - } - _ = terminate.recv() => { - tracing::warn!("Received terminate, shutting down gracefully..."); + if server.is_some() || federate.is_some() { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::warn!("Received ctrl-c, shutting down gracefully..."); + } + _ = interrupt.recv() => { + tracing::warn!("Received interrupt, shutting down gracefully..."); + } + _ = terminate.recv() => { + tracing::warn!("Received terminate, shutting down gracefully..."); + } } } if let Some(server) = server { diff --git a/src/scheduled_tasks.rs b/src/scheduled_tasks.rs index ccbe00267..f7904104f 100644 --- a/src/scheduled_tasks.rs +++ b/src/scheduled_tasks.rs @@ -131,7 +131,7 @@ async fn update_hot_ranks(pool: &mut DbPool<'_>) { &mut conn, "comment", "a.hot_rank != 0", - "SET hot_rank = hot_rank(a.score, a.published)", + "SET hot_rank = r.hot_rank(a.score, a.published)", ) .await; @@ -139,7 +139,7 @@ async fn update_hot_ranks(pool: &mut DbPool<'_>) { &mut conn, "community", "a.hot_rank != 0", - "SET hot_rank = hot_rank(a.subscribers, a.published)", + "SET hot_rank = r.hot_rank(a.subscribers, a.published)", ) .await; @@ -236,9 +236,9 @@ async fn process_post_aggregates_ranks_in_batches(conn: &mut AsyncPgConnection) LIMIT $2 FOR UPDATE SKIP LOCKED) UPDATE post_aggregates pa - SET hot_rank = hot_rank(pa.score, pa.published), - hot_rank_active = hot_rank(pa.score, pa.newest_comment_time_necro), - scaled_rank = scaled_rank(pa.score, pa.published, ca.users_active_month) + SET hot_rank = r.hot_rank(pa.score, pa.published), + hot_rank_active = r.hot_rank(pa.score, pa.newest_comment_time_necro), + scaled_rank = r.scaled_rank(pa.score, pa.published, ca.users_active_month) FROM batch, community_aggregates ca WHERE pa.post_id = batch.post_id and pa.community_id = ca.community_id RETURNING pa.published; "#,