From 4992bcf582fa60a76809cc526a591ab4177a9d43 Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Mon, 29 Mar 2021 23:06:47 +0100 Subject: [PATCH] Setup postgres in CI --- .github/actions/postgres/action.yml | 22 ++++++++++++++++++++++ .github/workflows/toolchain.yml | 2 ++ migrations/20210316025847_setup.up.sql | 2 ++ src/lib.rs | 6 +++--- src/registry.rs | 4 ++-- src/runner.rs | 19 ++++++++++++++----- 6 files changed, 45 insertions(+), 10 deletions(-) create mode 100644 .github/actions/postgres/action.yml diff --git a/.github/actions/postgres/action.yml b/.github/actions/postgres/action.yml new file mode 100644 index 0000000..03d1c1b --- /dev/null +++ b/.github/actions/postgres/action.yml @@ -0,0 +1,22 @@ +name: "Setup PostgreSQL database" +runs: + using: "composite" + steps: + - name: Set environment variable + shell: bash + run: echo "DATABASE_URL=postgres://postgres:password@localhost/sqlxmq" >> $GITHUB_ENV + + - name: Install sqlx-cli + shell: bash + run: cargo install sqlx-cli + + - name: Start PostgreSQL on Ubuntu + shell: bash + run: | + sudo systemctl start postgresql.service + pg_isready + sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'password'" + + - name: Setup database + shell: bash + run: cargo sqlx database setup diff --git a/.github/workflows/toolchain.yml b/.github/workflows/toolchain.yml index eea4c71..b4d52ed 100644 --- a/.github/workflows/toolchain.yml +++ b/.github/workflows/toolchain.yml @@ -59,6 +59,7 @@ jobs: profile: minimal toolchain: stable override: true + - uses: ./.github/actions/postgres - uses: actions-rs/cargo@v1 with: command: test @@ -73,6 +74,7 @@ jobs: profile: minimal toolchain: nightly override: true + - uses: ./.github/actions/postgres - uses: actions-rs/cargo@v1 with: command: test diff --git a/migrations/20210316025847_setup.up.sql b/migrations/20210316025847_setup.up.sql index 0b528fa..bcb1ab5 100644 --- a/migrations/20210316025847_setup.up.sql +++ b/migrations/20210316025847_setup.up.sql @@ -1,3 +1,5 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + -- The UDT for creating messages CREATE TYPE mq_new_t AS ( -- Unique message ID diff --git a/src/lib.rs b/src/lib.rs index 38dc14f..f0ee518 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -195,7 +195,7 @@ //! # async fn example( //! # pool: sqlx::Pool //! # ) -> Result<(), Box> { -//! example_job.new() +//! example_job.builder() //! // This is where we override job configuration //! .set_channel_name("bar") //! .set_json("John")? @@ -482,8 +482,8 @@ mod tests { let pool = &*test_pool().await; let _runner = named_job_runner(pool).await; - example_job1.new().spawn(pool).await.unwrap(); - example_job2.new().spawn(pool).await.unwrap(); + example_job1.builder().spawn(pool).await.unwrap(); + example_job2.builder().spawn(pool).await.unwrap(); pause().await; } } diff --git a/src/registry.rs b/src/registry.rs index 51c32e8..01d9aab 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -109,14 +109,14 @@ impl NamedJob { } } /// Initialize a job builder with the name and defaults of this job. - pub fn new(&self) -> JobBuilder<'static> { + pub fn builder(&self) -> JobBuilder<'static> { let mut builder = JobBuilder::new(self.name); (self.build_fn.0 .0)(&mut builder); builder } /// Initialize a job builder with the name and defaults of this job, /// using the provided job ID. - pub fn new_with_id(&self, id: Uuid) -> JobBuilder<'static> { + pub fn builder_with_id(&self, id: Uuid) -> JobBuilder<'static> { let mut builder = JobBuilder::new_with_id(id, self.name); (self.build_fn.0 .0)(&mut builder); builder diff --git a/src/runner.rs b/src/runner.rs index 44b58e6..ea00740 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -33,7 +33,7 @@ struct JobRunner { } /// Type used to checkpoint a running job. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Checkpoint<'a> { duration: Duration, extra_retries: usize, @@ -54,7 +54,7 @@ impl<'a> Checkpoint<'a> { } /// Construct a new checkpoint. pub fn new() -> Self { - Self::new_keep_alive(Duration::from_secs(0)) + Self::default() } /// Add extra retries to the current job. pub fn set_extra_retries(&mut self, extra_retries: usize) -> &mut Self { @@ -272,8 +272,15 @@ async fn start_listener(job_runner: Arc) -> Result 0 || listener.recv().await.is_ok() { + job_runner.notify.notify_one(); + num_errors = 0; + } else { + tokio::time::sleep(Duration::from_secs(1 << num_errors)).await; + num_errors += 1; + } } }))) } @@ -327,12 +334,14 @@ async fn poll_and_dispatch( .await?; } + const MAX_WAIT: Duration = Duration::from_secs(60); + let wait_time = messages .iter() .filter_map(|msg| msg.wait_time.clone()) .map(to_duration) .min() - .unwrap_or(Duration::from_secs(60)); + .unwrap_or(MAX_WAIT); for msg in messages { if let PolledMessage {