Chapter 11: fault tolerant workflows.

This commit is contained in:
Luca P 2022-03-13 19:36:44 +00:00
parent 5eb7cd2df5
commit 9e6339f729
20 changed files with 890 additions and 84 deletions

1
Cargo.lock generated
View file

@ -3441,6 +3441,7 @@ dependencies = [
"serde",
"serde-aux",
"serde_json",
"serde_urlencoded",
"sqlx",
"thiserror",
"tokio",

View file

@ -52,3 +52,4 @@ fake = "~2.3.0"
wiremock = "0.5"
serde_json = "1.0.61"
linkify = "0.8.0"
serde_urlencoded = "0.7.1"

View file

@ -0,0 +1,14 @@
CREATE TYPE header_pair AS (
name TEXT,
value BYTEA
);
CREATE TABLE idempotency (
user_id uuid NOT NULL REFERENCES users(user_id),
idempotency_key TEXT NOT NULL,
response_status_code SMALLINT NOT NULL,
response_headers header_pair[] NOT NULL,
response_body BYTEA NOT NULL,
created_at timestamptz NOT NULL,
PRIMARY KEY(user_id, idempotency_key)
);

View file

@ -0,0 +1,3 @@
ALTER TABLE idempotency ALTER COLUMN response_status_code DROP NOT NULL;
ALTER TABLE idempotency ALTER COLUMN response_body DROP NOT NULL;
ALTER TABLE idempotency ALTER COLUMN response_headers DROP NOT NULL;

View file

@ -0,0 +1,8 @@
CREATE TABLE newsletter_issues (
newsletter_issue_id uuid NOT NULL,
title TEXT NOT NULL,
text_content TEXT NOT NULL,
html_content TEXT NOT NULL,
published_at TEXT NOT NULL,
PRIMARY KEY(newsletter_issue_id)
);

View file

@ -0,0 +1,5 @@
CREATE TABLE issue_delivery_queue (
newsletter_issue_id uuid NOT NULL REFERENCES newsletter_issues (newsletter_issue_id),
subscriber_email TEXT NOT NULL,
PRIMARY KEY(newsletter_issue_id, subscriber_email)
);

View file

@ -1,5 +1,110 @@
{
"db": "PostgreSQL",
"0029b925e31429d25d23538804511943e2ea1fddc5a2db9a4e219c9b5be53fce": {
"query": "INSERT INTO users (user_id, username, password_hash)\n VALUES ($1, $2, $3)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Text"
]
},
"nullable": []
}
},
"06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a": {
"query": "\n SELECT newsletter_issue_id, subscriber_email\n FROM issue_delivery_queue\n FOR UPDATE\n SKIP LOCKED\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "newsletter_issue_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "subscriber_email",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false
]
}
},
"0b93f6f4f1bc59e7ee597ef6df52bbee1233d98e0a4cf53e29c153ccdae0537b": {
"query": "\n INSERT INTO newsletter_issues (\n newsletter_issue_id, \n title, \n text_content, \n html_content,\n published_at\n )\n VALUES ($1, $2, $3, $4, now())\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Text",
"Text"
]
},
"nullable": []
}
},
"1bb5d1c15161a276262535134c306bc392dda0fa1d7bb7deddcd544583a19fc8": {
"query": "\n INSERT INTO idempotency (\n user_id, \n idempotency_key,\n created_at\n ) \n VALUES ($1, $2, now()) \n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": []
}
},
"21f0f4c2ae0e88b99684823b83ce6126c218cec3badc8126492aab8fc7042109": {
"query": "\n UPDATE idempotency\n SET \n response_status_code = $3, \n response_headers = $4,\n response_body = $5\n WHERE\n user_id = $1 AND\n idempotency_key = $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Int2",
{
"Custom": {
"name": "_header_pair",
"kind": {
"Array": {
"Custom": {
"name": "header_pair",
"kind": {
"Composite": [
[
"name",
"Text"
],
[
"value",
"Bytea"
]
]
}
}
}
}
}
},
"Bytea"
]
},
"nullable": []
}
},
"2880480077b654e38b63f423ab40680697a500ffe1af1d1b39108910594b581b": {
"query": "\n UPDATE users\n SET password_hash = $1\n WHERE user_id = $2\n ",
"describe": {
@ -33,6 +138,38 @@
]
}
},
"38d1a12165ad4f50d8fbd4fc92376d9cc243dcc344c67b37f7fef13c6589e1eb": {
"query": "\n SELECT title, text_content, html_content\n FROM newsletter_issues\n WHERE\n newsletter_issue_id = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "title",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "text_content",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "html_content",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
false
]
}
},
"51c9c995452d3359e3da7e2f2ff8a6e68690f740a36d2a32ec7c40b08931ebdb": {
"query": "\n INSERT INTO subscriptions (id, email, name, subscribed_at, status)\n VALUES ($1, $2, $3, $4, 'pending_confirmation')\n ",
"describe": {
@ -61,24 +198,92 @@
"nullable": []
}
},
"7b57e2776a245ba1602f638121550485e2219a6ccaaa62b5ec3e4683e33a3b5f": {
"query": "\n SELECT email\n FROM subscriptions\n WHERE status = 'confirmed'\n ",
"9ab6536d2bf619381573b3bf13507d53b2e9cf50051e51c803e916f25b51abd2": {
"query": "SELECT email, name, status FROM subscriptions",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "email",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "status",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false
]
}
},
"9f103f7d6dfa569bafce4546e6e610f3d31b95fe81f96ea72575b27ddfea796e": {
"query": "\n SELECT \n response_status_code as \"response_status_code!\", \n response_headers as \"response_headers!: Vec<HeaderPairRecord>\",\n response_body as \"response_body!\"\n FROM idempotency\n WHERE \n user_id = $1 AND\n idempotency_key = $2\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "response_status_code!",
"type_info": "Int2"
},
{
"ordinal": 1,
"name": "response_headers!: Vec<HeaderPairRecord>",
"type_info": {
"Custom": {
"name": "_header_pair",
"kind": {
"Array": {
"Custom": {
"name": "header_pair",
"kind": {
"Composite": [
[
"name",
"Text"
],
[
"value",
"Bytea"
]
]
}
}
}
}
}
}
},
{
"ordinal": 2,
"name": "response_body!",
"type_info": "Bytea"
}
],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": [
true,
true,
true
]
}
},
"a71a1932b894572106460ca2e34a63dc0cb8c1ba7a70547add1cddbb68133c2b": {
"query": "UPDATE subscriptions SET status = 'confirmed' WHERE id = $1",
"describe": {
@ -91,6 +296,28 @@
"nullable": []
}
},
"aa682ff5c6485c4faa8168322413294a282ddcc0ef4e38ca3980e6fc7c00c87c": {
"query": "\n INSERT INTO issue_delivery_queue (\n newsletter_issue_id, \n subscriber_email\n )\n SELECT $1, email\n FROM subscriptions\n WHERE status = 'confirmed'\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
}
},
"aa6ec2d18c8536eb8340bdf02a833440ff7954c503133ed99ebd6190822edf04": {
"query": "ALTER TABLE subscriptions DROP COLUMN email;",
"describe": {
"columns": [],
"parameters": {
"Left": []
},
"nullable": []
}
},
"acf1b96c82ddf18db02e71a0e297c822b46f10add52c54649cf599b883165e58": {
"query": "\n SELECT user_id, password_hash\n FROM users\n WHERE username = $1\n ",
"describe": {
@ -136,5 +363,18 @@
false
]
}
},
"c00b32b331e0444b4bb0cd823b71a8c7ed3a3c8f2b8db3b12c6fbc434aa4d34b": {
"query": "\n DELETE FROM issue_delivery_queue\n WHERE \n newsletter_issue_id = $1 AND\n subscriber_email = $2 \n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": []
}
}
}

View file

@ -1,4 +1,5 @@
use crate::domain::SubscriberEmail;
use crate::email_client::EmailClient;
use secrecy::{ExposeSecret, Secret};
use serde_aux::field_attributes::deserialize_number_from_string;
use sqlx::postgres::{PgConnectOptions, PgSslMode};
@ -65,6 +66,17 @@ pub struct EmailClientSettings {
}
impl EmailClientSettings {
pub fn client(self) -> EmailClient {
let sender_email = self.sender().expect("Invalid sender email address.");
let timeout = self.timeout();
EmailClient::new(
self.base_url,
sender_email,
self.authorization_token,
timeout,
)
}
pub fn sender(&self) -> Result<SubscriberEmail, String> {
SubscriberEmail::parse(self.sender_email.clone())
}

32
src/idempotency/key.rs Normal file
View file

@ -0,0 +1,32 @@
#[derive(Debug)]
pub struct IdempotencyKey(String);
impl TryFrom<String> for IdempotencyKey {
type Error = anyhow::Error;
fn try_from(s: String) -> Result<Self, Self::Error> {
if s.is_empty() {
anyhow::bail!("The idempotency key cannot be empty");
}
let max_length = 50;
if s.len() >= max_length {
anyhow::bail!(
"The idempotency key must be shorter
than {max_length} characters"
);
}
Ok(Self(s))
}
}
impl From<IdempotencyKey> for String {
fn from(k: IdempotencyKey) -> Self {
k.0
}
}
impl AsRef<str> for IdempotencyKey {
fn as_ref(&self) -> &str {
&self.0
}
}

6
src/idempotency/mod.rs Normal file
View file

@ -0,0 +1,6 @@
mod key;
mod persistence;
pub use key::IdempotencyKey;
pub use persistence::get_saved_response;
pub use persistence::save_response;
pub use persistence::{try_processing, NextAction};

View file

@ -0,0 +1,136 @@
use super::IdempotencyKey;
use actix_web::body::to_bytes;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;
use sqlx::postgres::PgHasArrayType;
use sqlx::PgPool;
use sqlx::{Postgres, Transaction};
use uuid::Uuid;
#[derive(Debug, sqlx::Type)]
#[sqlx(type_name = "header_pair")]
struct HeaderPairRecord {
name: String,
value: Vec<u8>,
}
impl PgHasArrayType for HeaderPairRecord {
fn array_type_info() -> sqlx::postgres::PgTypeInfo {
sqlx::postgres::PgTypeInfo::with_name("_header_pair")
}
}
pub async fn get_saved_response(
pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<Option<HttpResponse>, anyhow::Error> {
let saved_response = sqlx::query!(
r#"
SELECT
response_status_code as "response_status_code!",
response_headers as "response_headers!: Vec<HeaderPairRecord>",
response_body as "response_body!"
FROM idempotency
WHERE
user_id = $1 AND
idempotency_key = $2
"#,
user_id,
idempotency_key.as_ref()
)
.fetch_optional(pool)
.await?;
if let Some(r) = saved_response {
let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?;
let mut response = HttpResponse::build(status_code);
for HeaderPairRecord { name, value } in r.response_headers {
response.append_header((name, value));
}
Ok(Some(response.body(r.response_body)))
} else {
Ok(None)
}
}
pub async fn save_response(
mut transaction: Transaction<'static, Postgres>,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
http_response: HttpResponse,
) -> Result<HttpResponse, anyhow::Error> {
let (response_head, body) = http_response.into_parts();
let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?;
let status_code = response_head.status().as_u16() as i16;
let headers = {
let mut h = Vec::with_capacity(response_head.headers().len());
for (name, value) in response_head.headers().iter() {
let name = name.as_str().to_owned();
let value = value.as_bytes().to_owned();
h.push(HeaderPairRecord { name, value });
}
h
};
sqlx::query_unchecked!(
r#"
UPDATE idempotency
SET
response_status_code = $3,
response_headers = $4,
response_body = $5
WHERE
user_id = $1 AND
idempotency_key = $2
"#,
user_id,
idempotency_key.as_ref(),
status_code,
headers,
body.as_ref()
)
.execute(&mut transaction)
.await?;
transaction.commit().await?;
let http_response = response_head.set_body(body).map_into_boxed_body();
Ok(http_response)
}
#[allow(clippy::large_enum_variant)]
pub enum NextAction {
// Return transaction for later usage
StartProcessing(Transaction<'static, Postgres>),
ReturnSavedResponse(HttpResponse),
}
pub async fn try_processing(
pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<NextAction, anyhow::Error> {
let mut transaction = pool.begin().await?;
let n_inserted_rows = sqlx::query!(
r#"
INSERT INTO idempotency (
user_id,
idempotency_key,
created_at
)
VALUES ($1, $2, now())
ON CONFLICT DO NOTHING
"#,
user_id,
idempotency_key.as_ref()
)
.execute(&mut transaction)
.await?
.rows_affected();
if n_inserted_rows > 0 {
Ok(NextAction::StartProcessing(transaction))
} else {
let saved_response = get_saved_response(pool, idempotency_key, user_id)
.await?
.ok_or_else(|| anyhow::anyhow!("We expected a saved response, we didn't find it"))?;
Ok(NextAction::ReturnSavedResponse(saved_response))
}
}

View file

@ -0,0 +1,158 @@
use crate::{configuration::Settings, startup::get_connection_pool};
use crate::{domain::SubscriberEmail, email_client::EmailClient};
use sqlx::{PgPool, Postgres, Transaction};
use std::time::Duration;
use tracing::{field::display, Span};
use uuid::Uuid;
pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
let connection_pool = get_connection_pool(&configuration.database);
let email_client = configuration.email_client.client();
worker_loop(connection_pool, email_client).await
}
async fn worker_loop(pool: PgPool, email_client: EmailClient) -> Result<(), anyhow::Error> {
loop {
match try_execute_task(&pool, &email_client).await {
Ok(ExecutionOutcome::EmptyQueue) => {
tokio::time::sleep(Duration::from_secs(10)).await;
}
Err(_) => {
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(ExecutionOutcome::TaskCompleted) => {}
}
}
}
pub enum ExecutionOutcome {
TaskCompleted,
EmptyQueue,
}
#[tracing::instrument(
skip_all,
fields(
newsletter_issue_id=tracing::field::Empty,
subscriber_email=tracing::field::Empty
),
err
)]
pub async fn try_execute_task(
pool: &PgPool,
email_client: &EmailClient,
) -> Result<ExecutionOutcome, anyhow::Error> {
let task = dequeue_task(pool).await?;
if task.is_none() {
return Ok(ExecutionOutcome::EmptyQueue);
}
let (transaction, issue_id, email) = task.unwrap();
Span::current()
.record("newsletter_issue_id", &display(issue_id))
.record("subscriber_email", &display(&email));
match SubscriberEmail::parse(email.clone()) {
Ok(email) => {
let issue = get_issue(pool, issue_id).await?;
if let Err(e) = email_client
.send_email(
&email,
&issue.title,
&issue.html_content,
&issue.text_content,
)
.await
{
tracing::error!(
error.cause_chain = ?e,
error.message = %e,
"Failed to deliver issue to a confirmed subscriber. \
Skipping.",
);
}
}
Err(e) => {
tracing::error!(
error.cause_chain = ?e,
error.message = %e,
"Skipping a confirmed subscriber. \
Their stored contact details are invalid",
);
}
}
delete_task(transaction, issue_id, &email).await?;
Ok(ExecutionOutcome::TaskCompleted)
}
type PgTransaction = Transaction<'static, Postgres>;
#[tracing::instrument(skip_all)]
async fn dequeue_task(
pool: &PgPool,
) -> Result<Option<(PgTransaction, Uuid, String)>, anyhow::Error> {
let mut transaction = pool.begin().await?;
let r = sqlx::query!(
r#"
SELECT newsletter_issue_id, subscriber_email
FROM issue_delivery_queue
FOR UPDATE
SKIP LOCKED
LIMIT 1
"#,
)
.fetch_optional(&mut transaction)
.await?;
if let Some(r) = r {
Ok(Some((
transaction,
r.newsletter_issue_id,
r.subscriber_email,
)))
} else {
Ok(None)
}
}
#[tracing::instrument(skip_all)]
async fn delete_task(
mut transaction: PgTransaction,
issue_id: Uuid,
email: &str,
) -> Result<(), anyhow::Error> {
sqlx::query!(
r#"
DELETE FROM issue_delivery_queue
WHERE
newsletter_issue_id = $1 AND
subscriber_email = $2
"#,
issue_id,
email
)
.execute(&mut transaction)
.await?;
transaction.commit().await?;
Ok(())
}
struct NewsletterIssue {
title: String,
text_content: String,
html_content: String,
}
#[tracing::instrument(skip_all)]
async fn get_issue(pool: &PgPool, issue_id: Uuid) -> Result<NewsletterIssue, anyhow::Error> {
let issue = sqlx::query_as!(
NewsletterIssue,
r#"
SELECT title, text_content, html_content
FROM newsletter_issues
WHERE
newsletter_issue_id = $1
"#,
issue_id
)
.fetch_one(pool)
.await?;
Ok(issue)
}

View file

@ -2,6 +2,8 @@ pub mod authentication;
pub mod configuration;
pub mod domain;
pub mod email_client;
pub mod idempotency;
pub mod issue_delivery_worker;
pub mod routes;
pub mod session_state;
pub mod startup;

View file

@ -1,4 +1,7 @@
use std::fmt::{Debug, Display};
use tokio::task::JoinError;
use zero2prod::configuration::get_configuration;
use zero2prod::issue_delivery_worker::run_worker_until_stopped;
use zero2prod::startup::Application;
use zero2prod::telemetry::{get_subscriber, init_subscriber};
@ -8,7 +11,38 @@ async fn main() -> anyhow::Result<()> {
init_subscriber(subscriber);
let configuration = get_configuration().expect("Failed to read configuration.");
let application = Application::build(configuration).await?;
application.run_until_stopped().await?;
let application = Application::build(configuration.clone()).await?;
let application_task = tokio::spawn(application.run_until_stopped());
let worker_task = tokio::spawn(run_worker_until_stopped(configuration));
tokio::select! {
o = application_task => report_exit("API", o),
o = worker_task => report_exit("Background worker", o),
};
Ok(())
}
fn report_exit(task_name: &str, outcome: Result<Result<(), impl Debug + Display>, JoinError>) {
match outcome {
Ok(Ok(())) => {
tracing::info!("{} has exited", task_name)
}
Ok(Err(e)) => {
tracing::error!(
error.cause_chain = ?e,
error.message = %e,
"{} failed",
task_name
)
}
Err(e) => {
tracing::error!(
error.cause_chain = ?e,
error.message = %e,
"{}' task failed to complete",
task_name
)
}
}
}

View file

@ -10,7 +10,7 @@ pub async fn publish_newsletter_form(
for m in flash_messages.iter() {
writeln!(msg_html, "<p><i>{}</i></p>", m.content()).unwrap();
}
let idempotency_key = uuid::Uuid::new_v4();
Ok(HttpResponse::Ok()
.content_type(ContentType::html())
.body(format!(
@ -49,6 +49,7 @@ pub async fn publish_newsletter_form(
></textarea>
</label>
<br>
<input hidden type="text" name="idempotency_key" value="{idempotency_key}">
<button type="submit">Publish</button>
</form>
<p><a href="/admin/dashboard">&lt;- Back</a></p>

View file

@ -1,83 +1,119 @@
use crate::authentication::UserId;
use crate::domain::SubscriberEmail;
use crate::email_client::EmailClient;
use crate::idempotency::{save_response, try_processing, IdempotencyKey, NextAction};
use crate::utils::e400;
use crate::utils::{e500, see_other};
use actix_web::web::ReqData;
use actix_web::{web, HttpResponse};
use actix_web_flash_messages::FlashMessage;
use anyhow::Context;
use sqlx::PgPool;
use sqlx::{PgPool, Postgres, Transaction};
use uuid::Uuid;
#[derive(serde::Deserialize)]
pub struct FormData {
title: String,
text_content: String,
html_content: String,
idempotency_key: String,
}
fn success_message() -> FlashMessage {
FlashMessage::info(
"The newsletter issue has been accepted - \
emails will go out shortly.",
)
}
#[tracing::instrument(
name = "Publish a newsletter issue",
skip(form, pool, email_client, user_id),
fields(user_id=%*user_id)
skip_all,
fields(user_id=%&*user_id)
)]
pub async fn publish_newsletter(
form: web::Form<FormData>,
user_id: ReqData<UserId>,
pool: web::Data<PgPool>,
email_client: web::Data<EmailClient>,
user_id: web::ReqData<UserId>,
) -> Result<HttpResponse, actix_web::Error> {
let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?;
for subscriber in subscribers {
match subscriber {
Ok(subscriber) => {
email_client
.send_email(
&subscriber.email,
&form.title,
&form.html_content,
&form.text_content,
)
.await
.with_context(|| {
format!("Failed to send newsletter issue to {}", subscriber.email)
})
.map_err(e500)?;
}
Err(error) => {
tracing::warn!(
error.cause_chain = ?error,
error.message = %error,
"Skipping a confirmed subscriber. Their stored contact details are invalid",
);
}
let user_id = user_id.into_inner();
let FormData {
title,
text_content,
html_content,
idempotency_key,
} = form.0;
let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?;
let mut transaction = match try_processing(&pool, &idempotency_key, *user_id)
.await
.map_err(e500)?
{
NextAction::StartProcessing(t) => t,
NextAction::ReturnSavedResponse(saved_response) => {
success_message().send();
return Ok(saved_response);
}
}
FlashMessage::info("The newsletter issue has been published!").send();
Ok(see_other("/admin/newsletters"))
};
let issue_id = insert_newsletter_issue(&mut transaction, &title, &text_content, &html_content)
.await
.context("Failed to store newsletter issue details")
.map_err(e500)?;
enqueue_delivery_tasks(&mut transaction, issue_id)
.await
.context("Failed to enqueue delivery tasks")
.map_err(e500)?;
let response = see_other("/admin/newsletters");
let response = save_response(transaction, &idempotency_key, *user_id, response)
.await
.map_err(e500)?;
success_message().send();
Ok(response)
}
struct ConfirmedSubscriber {
email: SubscriberEmail,
}
#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))]
async fn get_confirmed_subscribers(
pool: &PgPool,
) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
let confirmed_subscribers = sqlx::query!(
#[tracing::instrument(skip_all)]
async fn insert_newsletter_issue(
transaction: &mut Transaction<'_, Postgres>,
title: &str,
text_content: &str,
html_content: &str,
) -> Result<Uuid, sqlx::Error> {
let newsletter_issue_id = Uuid::new_v4();
sqlx::query!(
r#"
SELECT email
INSERT INTO newsletter_issues (
newsletter_issue_id,
title,
text_content,
html_content,
published_at
)
VALUES ($1, $2, $3, $4, now())
"#,
newsletter_issue_id,
title,
text_content,
html_content
)
.execute(transaction)
.await?;
Ok(newsletter_issue_id)
}
#[tracing::instrument(skip_all)]
async fn enqueue_delivery_tasks(
transaction: &mut Transaction<'_, Postgres>,
newsletter_issue_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO issue_delivery_queue (
newsletter_issue_id,
subscriber_email
)
SELECT $1, email
FROM subscriptions
WHERE status = 'confirmed'
"#,
newsletter_issue_id,
)
.fetch_all(pool)
.await?
.into_iter()
.map(|r| match SubscriberEmail::parse(r.email) {
Ok(email) => Ok(ConfirmedSubscriber { email }),
Err(error) => Err(anyhow::anyhow!(error)),
})
.collect();
Ok(confirmed_subscribers)
.execute(transaction)
.await?;
Ok(())
}

View file

@ -27,21 +27,8 @@ pub struct Application {
impl Application {
pub async fn build(configuration: Settings) -> Result<Self, anyhow::Error> {
let connection_pool = get_connection_pool(&configuration.database)
.await
.expect("Failed to connect to Postgres.");
let sender_email = configuration
.email_client
.sender()
.expect("Invalid sender email address.");
let timeout = configuration.email_client.timeout();
let email_client = EmailClient::new(
configuration.email_client.base_url,
sender_email,
configuration.email_client.authorization_token,
timeout,
);
let connection_pool = get_connection_pool(&configuration.database);
let email_client = configuration.email_client.client();
let address = format!(
"{}:{}",
@ -71,11 +58,10 @@ impl Application {
}
}
pub async fn get_connection_pool(configuration: &DatabaseSettings) -> Result<PgPool, sqlx::Error> {
pub fn get_connection_pool(configuration: &DatabaseSettings) -> PgPool {
PgPoolOptions::new()
.connect_timeout(std::time::Duration::from_secs(2))
.connect_with(configuration.with_db())
.await
.connect_lazy_with(configuration.with_db())
}
pub struct ApplicationBaseUrl(pub String);

View file

@ -8,6 +8,16 @@ where
{
actix_web::error::ErrorInternalServerError(e)
}
// Return a 400 with the user-representation of the validation error as body.
// The error root cause is preserved for logging purposes.
pub fn e400<T: std::fmt::Debug + std::fmt::Display>(e: T) -> actix_web::Error
where
T: std::fmt::Debug + std::fmt::Display + 'static,
{
actix_web::error::ErrorBadRequest(e)
}
pub fn see_other(location: &str) -> HttpResponse {
HttpResponse::SeeOther()
.insert_header((LOCATION, location))

View file

@ -5,6 +5,8 @@ use sqlx::{Connection, Executor, PgConnection, PgPool};
use uuid::Uuid;
use wiremock::MockServer;
use zero2prod::configuration::{get_configuration, DatabaseSettings};
use zero2prod::email_client::EmailClient;
use zero2prod::issue_delivery_worker::{try_execute_task, ExecutionOutcome};
use zero2prod::startup::{get_connection_pool, Application};
use zero2prod::telemetry::{get_subscriber, init_subscriber};
@ -28,6 +30,7 @@ pub struct TestApp {
pub email_server: MockServer,
pub test_user: TestUser,
pub api_client: reqwest::Client,
pub email_client: EmailClient,
}
/// Confirmation links embedded in the request to the email API.
@ -37,6 +40,18 @@ pub struct ConfirmationLinks {
}
impl TestApp {
pub async fn dispatch_all_pending_emails(&self) {
loop {
if let ExecutionOutcome::EmptyQueue =
try_execute_task(&self.db_pool, &self.email_client)
.await
.unwrap()
{
break;
}
}
}
pub async fn post_subscriptions(&self, body: String) -> reqwest::Response {
self.api_client
.post(&format!("{}/subscriptions", &self.address))
@ -200,12 +215,11 @@ pub async fn spawn_app() -> TestApp {
let test_app = TestApp {
address: format!("http://localhost:{}", application_port),
port: application_port,
db_pool: get_connection_pool(&configuration.database)
.await
.expect("Failed to connect to the database"),
db_pool: get_connection_pool(&configuration.database),
email_server,
test_user: TestUser::generate(),
api_client: client,
email_client: configuration.email_client.client(),
};
test_app.test_user.store(&test_app.db_pool).await;

View file

@ -1,9 +1,21 @@
use crate::helpers::{assert_is_redirect_to, spawn_app, ConfirmationLinks, TestApp};
use fake::faker::internet::en::SafeEmail;
use fake::faker::name::en::Name;
use fake::Fake;
use std::time::Duration;
use wiremock::matchers::{any, method, path};
use wiremock::{Mock, ResponseTemplate};
use wiremock::{Mock, MockBuilder, ResponseTemplate};
async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks {
let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";
// We are working with multiple subscribers now,
// their details must be randomised to avoid conflicts!
let name: String = Name().fake();
let email: String = SafeEmail().fake();
let body = serde_urlencoded::to_string(&serde_json::json!({
"name": name,
"email": email
}))
.unwrap();
let _mock_guard = Mock::given(path("/email"))
.and(method("POST"))
@ -54,13 +66,18 @@ async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() {
"title": "Newsletter title",
"text_content": "Newsletter body as plain text",
"html_content": "<p>Newsletter body as HTML</p>",
"idempotency_key": uuid::Uuid::new_v4().to_string()
});
let response = app.post_publish_newsletter(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/admin/newsletters");
// Act - Part 2 - Follow the redirect
let html_page = app.get_publish_newsletter_html().await;
assert!(html_page.contains("<p><i>The newsletter issue has been published!</i></p>"));
assert!(html_page.contains(
"<p><i>The newsletter issue has been accepted - \
emails will go out shortly.</i></p>"
));
app.dispatch_all_pending_emails().await;
// Mock verifies on Drop that we haven't sent the newsletter email
}
@ -83,13 +100,18 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() {
"title": "Newsletter title",
"text_content": "Newsletter body as plain text",
"html_content": "<p>Newsletter body as HTML</p>",
"idempotency_key": uuid::Uuid::new_v4().to_string()
});
let response = app.post_publish_newsletter(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/admin/newsletters");
// Act - Part 2 - Follow the redirect
let html_page = app.get_publish_newsletter_html().await;
assert!(html_page.contains("<p><i>The newsletter issue has been published!</i></p>"));
assert!(html_page.contains(
"<p><i>The newsletter issue has been accepted - \
emails will go out shortly.</i></p>"
));
app.dispatch_all_pending_emails().await;
// Mock verifies on Drop that we have sent the newsletter email
}
@ -115,9 +137,94 @@ async fn you_must_be_logged_in_to_publish_a_newsletter() {
"title": "Newsletter title",
"text_content": "Newsletter body as plain text",
"html_content": "<p>Newsletter body as HTML</p>",
"idempotency_key": uuid::Uuid::new_v4().to_string()
});
let response = app.post_publish_newsletter(&newsletter_request_body).await;
// Assert
assert_is_redirect_to(&response, "/login");
}
#[tokio::test]
async fn newsletter_creation_is_idempotent() {
// Arrange
let app = spawn_app().await;
create_confirmed_subscriber(&app).await;
app.test_user.login(&app).await;
Mock::given(path("/email"))
.and(method("POST"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&app.email_server)
.await;
// Act - Part 1 - Submit newsletter form
let newsletter_request_body = serde_json::json!({
"title": "Newsletter title",
"text_content": "Newsletter body as plain text",
"html_content": "<p>Newsletter body as HTML</p>",
// We expect the idempotency key as part of the
// form data, not as an header
"idempotency_key": uuid::Uuid::new_v4().to_string()
});
let response = app.post_publish_newsletter(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/admin/newsletters");
// Act - Part 2 - Follow the redirect
let html_page = app.get_publish_newsletter_html().await;
assert!(html_page.contains(
"<p><i>The newsletter issue has been accepted - \
emails will go out shortly.</i></p>"
));
// Act - Part 3 - Submit newsletter form **again**
let response = app.post_publish_newsletter(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/admin/newsletters");
// Act - Part 4 - Follow the redirect
let html_page = app.get_publish_newsletter_html().await;
assert!(html_page.contains(
"<p><i>The newsletter issue has been accepted - \
emails will go out shortly.</i></p>"
));
app.dispatch_all_pending_emails().await;
// Mock verifies on Drop that we have sent the newsletter email **once**
}
#[tokio::test]
async fn concurrent_form_submission_is_handled_gracefully() {
// Arrange
let app = spawn_app().await;
create_confirmed_subscriber(&app).await;
app.test_user.login(&app).await;
Mock::given(path("/email"))
.and(method("POST"))
// Setting a long delay to ensure that the second request
// arrives before the first one completes
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2)))
.expect(1)
.mount(&app.email_server)
.await;
// Act - Submit two newsletter forms concurrently
let newsletter_request_body = serde_json::json!({
"title": "Newsletter title",
"text_content": "Newsletter body as plain text",
"html_content": "<p>Newsletter body as HTML</p>",
"idempotency_key": uuid::Uuid::new_v4().to_string()
});
let response1 = app.post_publish_newsletter(&newsletter_request_body);
let response2 = app.post_publish_newsletter(&newsletter_request_body);
let (response1, response2) = tokio::join!(response1, response2);
assert_eq!(response1.status(), response2.status());
assert_eq!(
response1.text().await.unwrap(),
response2.text().await.unwrap()
);
app.dispatch_all_pending_emails().await;
// Mock verifies on Drop that we have sent the newsletter email **once**
}