Setup postgres in CI

This commit is contained in:
Diggory Blake 2021-03-29 23:06:47 +01:00
parent 23010c9512
commit 4992bcf582
6 changed files with 45 additions and 10 deletions

22
.github/actions/postgres/action.yml vendored Normal file
View file

@ -0,0 +1,22 @@
name: "Setup PostgreSQL database"
runs:
using: "composite"
steps:
- name: Set environment variable
shell: bash
run: echo "DATABASE_URL=postgres://postgres:password@localhost/sqlxmq" >> $GITHUB_ENV
- name: Install sqlx-cli
shell: bash
run: cargo install sqlx-cli
- name: Start PostgreSQL on Ubuntu
shell: bash
run: |
sudo systemctl start postgresql.service
pg_isready
sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'password'"
- name: Setup database
shell: bash
run: cargo sqlx database setup

View file

@ -59,6 +59,7 @@ jobs:
profile: minimal profile: minimal
toolchain: stable toolchain: stable
override: true override: true
- uses: ./.github/actions/postgres
- uses: actions-rs/cargo@v1 - uses: actions-rs/cargo@v1
with: with:
command: test command: test
@ -73,6 +74,7 @@ jobs:
profile: minimal profile: minimal
toolchain: nightly toolchain: nightly
override: true override: true
- uses: ./.github/actions/postgres
- uses: actions-rs/cargo@v1 - uses: actions-rs/cargo@v1
with: with:
command: test command: test

View file

@ -1,3 +1,5 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- The UDT for creating messages -- The UDT for creating messages
CREATE TYPE mq_new_t AS ( CREATE TYPE mq_new_t AS (
-- Unique message ID -- Unique message ID

View file

@ -195,7 +195,7 @@
//! # async fn example( //! # async fn example(
//! # pool: sqlx::Pool<sqlx::Postgres> //! # pool: sqlx::Pool<sqlx::Postgres>
//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { //! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
//! example_job.new() //! example_job.builder()
//! // This is where we override job configuration //! // This is where we override job configuration
//! .set_channel_name("bar") //! .set_channel_name("bar")
//! .set_json("John")? //! .set_json("John")?
@ -482,8 +482,8 @@ mod tests {
let pool = &*test_pool().await; let pool = &*test_pool().await;
let _runner = named_job_runner(pool).await; let _runner = named_job_runner(pool).await;
example_job1.new().spawn(pool).await.unwrap(); example_job1.builder().spawn(pool).await.unwrap();
example_job2.new().spawn(pool).await.unwrap(); example_job2.builder().spawn(pool).await.unwrap();
pause().await; pause().await;
} }
} }

View file

@ -109,14 +109,14 @@ impl NamedJob {
} }
} }
/// Initialize a job builder with the name and defaults of this job. /// Initialize a job builder with the name and defaults of this job.
pub fn new(&self) -> JobBuilder<'static> { pub fn builder(&self) -> JobBuilder<'static> {
let mut builder = JobBuilder::new(self.name); let mut builder = JobBuilder::new(self.name);
(self.build_fn.0 .0)(&mut builder); (self.build_fn.0 .0)(&mut builder);
builder builder
} }
/// Initialize a job builder with the name and defaults of this job, /// Initialize a job builder with the name and defaults of this job,
/// using the provided job ID. /// using the provided job ID.
pub fn new_with_id(&self, id: Uuid) -> JobBuilder<'static> { pub fn builder_with_id(&self, id: Uuid) -> JobBuilder<'static> {
let mut builder = JobBuilder::new_with_id(id, self.name); let mut builder = JobBuilder::new_with_id(id, self.name);
(self.build_fn.0 .0)(&mut builder); (self.build_fn.0 .0)(&mut builder);
builder builder

View file

@ -33,7 +33,7 @@ struct JobRunner {
} }
/// Type used to checkpoint a running job. /// Type used to checkpoint a running job.
#[derive(Debug, Clone)] #[derive(Debug, Clone, Default)]
pub struct Checkpoint<'a> { pub struct Checkpoint<'a> {
duration: Duration, duration: Duration,
extra_retries: usize, extra_retries: usize,
@ -54,7 +54,7 @@ impl<'a> Checkpoint<'a> {
} }
/// Construct a new checkpoint. /// Construct a new checkpoint.
pub fn new() -> Self { pub fn new() -> Self {
Self::new_keep_alive(Duration::from_secs(0)) Self::default()
} }
/// Add extra retries to the current job. /// Add extra retries to the current job.
pub fn set_extra_retries(&mut self, extra_retries: usize) -> &mut Self { pub fn set_extra_retries(&mut self, extra_retries: usize) -> &mut Self {
@ -272,8 +272,15 @@ async fn start_listener(job_runner: Arc<JobRunner>) -> Result<OwnedHandle, sqlx:
listener.listen("mq").await?; listener.listen("mq").await?;
} }
Ok(OwnedHandle(task::spawn(async move { Ok(OwnedHandle(task::spawn(async move {
while let Ok(_) = listener.recv().await { let mut num_errors = 0;
loop {
if num_errors > 0 || listener.recv().await.is_ok() {
job_runner.notify.notify_one(); job_runner.notify.notify_one();
num_errors = 0;
} else {
tokio::time::sleep(Duration::from_secs(1 << num_errors)).await;
num_errors += 1;
}
} }
}))) })))
} }
@ -327,12 +334,14 @@ async fn poll_and_dispatch(
.await?; .await?;
} }
const MAX_WAIT: Duration = Duration::from_secs(60);
let wait_time = messages let wait_time = messages
.iter() .iter()
.filter_map(|msg| msg.wait_time.clone()) .filter_map(|msg| msg.wait_time.clone())
.map(to_duration) .map(to_duration)
.min() .min()
.unwrap_or(Duration::from_secs(60)); .unwrap_or(MAX_WAIT);
for msg in messages { for msg in messages {
if let PolledMessage { if let PolledMessage {