finish new implementation of schema_setup::run (not including revert, test, etc.)

This commit is contained in:
Dull Bananas 2024-05-11 03:36:49 +00:00
parent 1aab92cbfa
commit e8e354c9b2
4 changed files with 112 additions and 33 deletions

View file

@ -801,7 +801,8 @@ diesel::table! {
} }
diesel::table! { diesel::table! {
previously_run_sql (content) { previously_run_sql (id) {
id -> Bool,
content -> Text, content -> Text,
} }
} }

View file

@ -1,7 +1,18 @@
use crate::{schema::previously_run_sql};
use anyhow::Context; use anyhow::Context;
use diesel::{connection::SimpleConnection, Connection, PgConnection}; use diesel::{
connection::SimpleConnection,
select,
update,
Connection,
ExpressionMethods,
PgConnection,
QueryDsl,
RunQueryDsl,NullableExpressionMethods
};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness}; use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use lemmy_utils::error::LemmyError; use lemmy_utils::error::LemmyError;
use std::time::Instant;
use tracing::info; use tracing::info;
const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
@ -10,53 +21,119 @@ const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
/// instead of being changed using migrations. It may not create or modify things outside of the `r` schema /// instead of being changed using migrations. It may not create or modify things outside of the `r` schema
/// (indicated by `r.` before the name), unless a comment says otherwise. /// (indicated by `r.` before the name), unless a comment says otherwise.
const REPLACEABLE_SCHEMA: &[&str] = &[ const REPLACEABLE_SCHEMA: &[&str] = &[
"DROP SCHEMA IF EXISTS r CASCADE;",
"CREATE SCHEMA r;", "CREATE SCHEMA r;",
include_str!("../replaceable_schema/utils.sql"), include_str!("../replaceable_schema/utils.sql"),
include_str!("../replaceable_schema/triggers.sql"), include_str!("../replaceable_schema/triggers.sql"),
]; ];
const REVERT_REPLACEABLE_SCHEMA: &str = "DROP SCHEMA IF EXISTS r CASCADE;";
// TODO use full names
const FORBID_DIESEL_CLI_MIGRATION_VERSION: &str = "0000000000000";
const CUSTOM_MIGRATION_RUNNER_MIGRATION_VERSION: &str = "2024-04-29-012113";
pub fn run(db_url: &str) -> Result<(), LemmyError> { pub fn run(db_url: &str) -> Result<(), LemmyError> {
// Migrations don't support async connection
let mut conn = PgConnection::establish(db_url).with_context(|| "Error connecting to database")?;
let test_enabled = std::env::var("LEMMY_TEST_MIGRATIONS") let test_enabled = std::env::var("LEMMY_TEST_MIGRATIONS")
.map(|s| !s.is_empty()) .map(|s| !s.is_empty())
.unwrap_or(false); .unwrap_or(false);
// Migrations don't support async connection let new_sql = REPLACEABLE_SCHEMA.join("\n");
let mut conn = PgConnection::establish(db_url).with_context(|| "Error connecting to database")?;
info!("Running Database migrations (This may take a long time)..."); // Early return should be as fast as possible and not do any locks in the database, because this case
// is reached whenever a lemmy_server process is started, which can happen frequently on a production server
let unfiltered_migrations = conn // with a horizontally scaled setup.
let unfiltered_pending_migrations = conn
.pending_migrations(MIGRATIONS) .pending_migrations(MIGRATIONS)
.map_err(|e| anyhow::anyhow!("Couldn't determine pending migrations: {e}"))?; .map_err(|e| anyhow::anyhow!("Couldn't determine pending migrations: {e}"))?;
// Does not include the "forbid_diesel_cli" migration // Check len first so this doesn't run without the previously_run_sql table existing
let migrations = unfiltered_migrations.iter().filter(|m| m.name().version() != "000000000000000".into()); if unfiltered_pending_migrations.len() == 1 {
let sql_unchanged: bool = select(
previously_run_sql::table
.select(previously_run_sql::content)
.single_value()
.assume_not_null()
.eq(&new_sql),
)
.get_result(&mut conn)?;
conn.transaction::<_, LemmyError, _>(|conn|) // left off here if sql_unchanged {
debug_assert_eq!(
for migration in migrations.clone() { unfiltered_pending_migrations
conn .get(0)
.run_migration(migration) .map(|m| m.name().version()),
.map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?; Some(FORBID_DIESEL_CLI_MIGRATION_VERSION.into())
} );
conn.transaction::<_, LemmyError, _>(|conn| {
if let Some(migration) = migrations.last() {
// Migration is run with a savepoint since there's already a transaction
conn
.run_migration(migration)
.map_err(|e| anyhow::anyhow!("Couldn't run DB Migrations: {e}"))?;
} else if !cfg!(debug_assertions) {
// In production, skip running `REPLACEABLE_SCHEMA` to avoid locking things in the schema. In
// CI, always run it because `diesel migration` commands would otherwise prevent it.
return Ok(()); return Ok(());
} }
conn }
.batch_execute(&REPLACEABLE_SCHEMA.join("\n"))
conn.transaction::<_, LemmyError, _>(|conn| {
// Use the table created by `MigrationHarness::pending_migrations` as a lock target to prevent multiple
// lemmy_server processes from running this transaction concurrently. This lock does not block
// `MigrationHarness::pending_migrations` (`SELECT`) or `MigrationHarness::run_migration` (`INSERT`).
info!("Waiting for lock...");
conn.batch_execute("LOCK __diesel_schema_migrations IN SHARE UPDATE EXCLUSIVE MODE;")?;
info!("Running Database migrations (This may take a long time)...");
// Check pending migrations again after locking
let unfiltered_pending_migrations = conn.pending_migrations(MIGRATIONS).map_err(|e| anyhow::anyhow!("Couldn't determine pending migrations: {e}"))?;
// Does not include the "forbid_diesel_cli" migration
let pending_migrations = unfiltered_pending_migrations.get(1..).expect(
"original pending migrations length should be at least 1 because of the forbid_diesel_cli migration",
);
// Check migration version constants in debug mode
debug_assert_eq!(
unfiltered_pending_migrations
.get(0)
.map(|m| m.name().version()),
Some(FORBID_DIESEL_CLI_MIGRATION_VERSION.into())
);
debug_assert_eq!(
pending_migrations
.iter()
.filter(|m| m.name().version() == FORBID_DIESEL_CLI_MIGRATION_VERSION.into())
.count(),
0
);
/*TODO maybe do this for all migrations not just pending
debug_assert_eq!(
pending_migrations
.iter()
.filter(|m| m.name().version() == CUSTOM_MIGRATION_RUNNER_MIGRATION_VERSION.into())
.count(),
1
);*/
// Run migrations, without stuff from replaceable_schema
conn.batch_execute(REVERT_REPLACEABLE_SCHEMA).context("Couldn't drop schema `r`")?;
for migration in pending_migrations {
let name = migration.name();
// TODO measure time on database
let start_time = Instant::now();
conn.run_migration(migration)
.map_err(|e| anyhow::anyhow!("Couldn't run migration {name}: {e}"))?;
let duration = start_time.elapsed().as_millis();
info!("{duration}ms {name}");
}
// Run replaceable_schema
conn.batch_execute(&new_sql)
.context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?; .context("Couldn't run SQL files in crates/db_schema/replaceable_schema")?;
let num_rows_updated = update(previously_run_sql::table)
.set(previously_run_sql::content.eq(new_sql))
.execute(conn)?;
debug_assert_eq!(num_rows_updated, 1);
Ok(()) Ok(())
})?; })?;
info!("Database migrations complete."); info!("Database migrations complete.");
Ok(()) Ok(())

View file

@ -1,4 +1,5 @@
drop schema if exists r cascade; drop schema if exists r cascade;
create table previously_run_sql (content text primary key); -- `content` can't be used as primary key because of size limit
insert into previously_run_sql (content) values (''); create table previously_run_sql (id boolean primary key, content text);
insert into previously_run_sql (id, content) values (true, '');