diff --git a/Cargo.lock b/Cargo.lock index 0df2ac6..fcc4f8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3441,6 +3441,7 @@ dependencies = [ "serde", "serde-aux", "serde_json", + "serde_urlencoded", "sqlx", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 537d57e..18db9a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,3 +52,4 @@ fake = "~2.3.0" wiremock = "0.5" serde_json = "1.0.61" linkify = "0.8.0" +serde_urlencoded = "0.7.1" \ No newline at end of file diff --git a/migrations/20220313182312_create_idempotency_table.sql b/migrations/20220313182312_create_idempotency_table.sql new file mode 100644 index 0000000..edea0a8 --- /dev/null +++ b/migrations/20220313182312_create_idempotency_table.sql @@ -0,0 +1,14 @@ +CREATE TYPE header_pair AS ( + name TEXT, + value BYTEA +); + +CREATE TABLE idempotency ( + user_id uuid NOT NULL REFERENCES users(user_id), + idempotency_key TEXT NOT NULL, + response_status_code SMALLINT NOT NULL, + response_headers header_pair[] NOT NULL, + response_body BYTEA NOT NULL, + created_at timestamptz NOT NULL, + PRIMARY KEY(user_id, idempotency_key) +); \ No newline at end of file diff --git a/migrations/20220313184809_relax_null_checks_on_idempotency.sql b/migrations/20220313184809_relax_null_checks_on_idempotency.sql new file mode 100644 index 0000000..ec60b32 --- /dev/null +++ b/migrations/20220313184809_relax_null_checks_on_idempotency.sql @@ -0,0 +1,3 @@ +ALTER TABLE idempotency ALTER COLUMN response_status_code DROP NOT NULL; +ALTER TABLE idempotency ALTER COLUMN response_body DROP NOT NULL; +ALTER TABLE idempotency ALTER COLUMN response_headers DROP NOT NULL; \ No newline at end of file diff --git a/migrations/20220313190920_create_newsletter_issues_table.sql b/migrations/20220313190920_create_newsletter_issues_table.sql new file mode 100644 index 0000000..878fd96 --- /dev/null +++ b/migrations/20220313190920_create_newsletter_issues_table.sql @@ -0,0 +1,8 @@ +CREATE TABLE newsletter_issues ( + newsletter_issue_id uuid NOT NULL, + title TEXT NOT NULL, + text_content TEXT NOT NULL, + html_content TEXT NOT NULL, + published_at TEXT NOT NULL, + PRIMARY KEY(newsletter_issue_id) +); diff --git a/migrations/20220313191254_create_issue_delivery_queue_table.sql b/migrations/20220313191254_create_issue_delivery_queue_table.sql new file mode 100644 index 0000000..75763d8 --- /dev/null +++ b/migrations/20220313191254_create_issue_delivery_queue_table.sql @@ -0,0 +1,5 @@ +CREATE TABLE issue_delivery_queue ( + newsletter_issue_id uuid NOT NULL REFERENCES newsletter_issues (newsletter_issue_id), + subscriber_email TEXT NOT NULL, + PRIMARY KEY(newsletter_issue_id, subscriber_email) +); \ No newline at end of file diff --git a/sqlx-data.json b/sqlx-data.json index 78208ec..0715709 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -1,5 +1,110 @@ { "db": "PostgreSQL", + "0029b925e31429d25d23538804511943e2ea1fddc5a2db9a4e219c9b5be53fce": { + "query": "INSERT INTO users (user_id, username, password_hash)\n VALUES ($1, $2, $3)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Text" + ] + }, + "nullable": [] + } + }, + "06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a": { + "query": "\n SELECT newsletter_issue_id, subscriber_email\n FROM issue_delivery_queue\n FOR UPDATE\n SKIP LOCKED\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "newsletter_issue_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "subscriber_email", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + } + }, + "0b93f6f4f1bc59e7ee597ef6df52bbee1233d98e0a4cf53e29c153ccdae0537b": { + "query": "\n INSERT INTO newsletter_issues (\n newsletter_issue_id, \n title, \n text_content, \n html_content,\n published_at\n )\n VALUES ($1, $2, $3, $4, now())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + } + }, + "1bb5d1c15161a276262535134c306bc392dda0fa1d7bb7deddcd544583a19fc8": { + "query": "\n INSERT INTO idempotency (\n user_id, \n idempotency_key,\n created_at\n ) \n VALUES ($1, $2, now()) \n ON CONFLICT DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [] + } + }, + "21f0f4c2ae0e88b99684823b83ce6126c218cec3badc8126492aab8fc7042109": { + "query": "\n UPDATE idempotency\n SET \n response_status_code = $3, \n response_headers = $4,\n response_body = $5\n WHERE\n user_id = $1 AND\n idempotency_key = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Int2", + { + "Custom": { + "name": "_header_pair", + "kind": { + "Array": { + "Custom": { + "name": "header_pair", + "kind": { + "Composite": [ + [ + "name", + "Text" + ], + [ + "value", + "Bytea" + ] + ] + } + } + } + } + } + }, + "Bytea" + ] + }, + "nullable": [] + } + }, "2880480077b654e38b63f423ab40680697a500ffe1af1d1b39108910594b581b": { "query": "\n UPDATE users\n SET password_hash = $1\n WHERE user_id = $2\n ", "describe": { @@ -33,6 +138,38 @@ ] } }, + "38d1a12165ad4f50d8fbd4fc92376d9cc243dcc344c67b37f7fef13c6589e1eb": { + "query": "\n SELECT title, text_content, html_content\n FROM newsletter_issues\n WHERE\n newsletter_issue_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "title", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "text_content", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "html_content", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false + ] + } + }, "51c9c995452d3359e3da7e2f2ff8a6e68690f740a36d2a32ec7c40b08931ebdb": { "query": "\n INSERT INTO subscriptions (id, email, name, subscribed_at, status)\n VALUES ($1, $2, $3, $4, 'pending_confirmation')\n ", "describe": { @@ -61,24 +198,92 @@ "nullable": [] } }, - "7b57e2776a245ba1602f638121550485e2219a6ccaaa62b5ec3e4683e33a3b5f": { - "query": "\n SELECT email\n FROM subscriptions\n WHERE status = 'confirmed'\n ", + "9ab6536d2bf619381573b3bf13507d53b2e9cf50051e51c803e916f25b51abd2": { + "query": "SELECT email, name, status FROM subscriptions", "describe": { "columns": [ { "ordinal": 0, "name": "email", "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "status", + "type_info": "Text" } ], "parameters": { "Left": [] }, "nullable": [ + false, + false, false ] } }, + "9f103f7d6dfa569bafce4546e6e610f3d31b95fe81f96ea72575b27ddfea796e": { + "query": "\n SELECT \n response_status_code as \"response_status_code!\", \n response_headers as \"response_headers!: Vec\",\n response_body as \"response_body!\"\n FROM idempotency\n WHERE \n user_id = $1 AND\n idempotency_key = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "response_status_code!", + "type_info": "Int2" + }, + { + "ordinal": 1, + "name": "response_headers!: Vec", + "type_info": { + "Custom": { + "name": "_header_pair", + "kind": { + "Array": { + "Custom": { + "name": "header_pair", + "kind": { + "Composite": [ + [ + "name", + "Text" + ], + [ + "value", + "Bytea" + ] + ] + } + } + } + } + } + } + }, + { + "ordinal": 2, + "name": "response_body!", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true + ] + } + }, "a71a1932b894572106460ca2e34a63dc0cb8c1ba7a70547add1cddbb68133c2b": { "query": "UPDATE subscriptions SET status = 'confirmed' WHERE id = $1", "describe": { @@ -91,6 +296,28 @@ "nullable": [] } }, + "aa682ff5c6485c4faa8168322413294a282ddcc0ef4e38ca3980e6fc7c00c87c": { + "query": "\n INSERT INTO issue_delivery_queue (\n newsletter_issue_id, \n subscriber_email\n )\n SELECT $1, email\n FROM subscriptions\n WHERE status = 'confirmed'\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + } + }, + "aa6ec2d18c8536eb8340bdf02a833440ff7954c503133ed99ebd6190822edf04": { + "query": "ALTER TABLE subscriptions DROP COLUMN email;", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + } + }, "acf1b96c82ddf18db02e71a0e297c822b46f10add52c54649cf599b883165e58": { "query": "\n SELECT user_id, password_hash\n FROM users\n WHERE username = $1\n ", "describe": { @@ -136,5 +363,18 @@ false ] } + }, + "c00b32b331e0444b4bb0cd823b71a8c7ed3a3c8f2b8db3b12c6fbc434aa4d34b": { + "query": "\n DELETE FROM issue_delivery_queue\n WHERE \n newsletter_issue_id = $1 AND\n subscriber_email = $2 \n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [] + } } } \ No newline at end of file diff --git a/src/configuration.rs b/src/configuration.rs index e5dcb85..81ddab4 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,4 +1,5 @@ use crate::domain::SubscriberEmail; +use crate::email_client::EmailClient; use secrecy::{ExposeSecret, Secret}; use serde_aux::field_attributes::deserialize_number_from_string; use sqlx::postgres::{PgConnectOptions, PgSslMode}; @@ -65,6 +66,17 @@ pub struct EmailClientSettings { } impl EmailClientSettings { + pub fn client(self) -> EmailClient { + let sender_email = self.sender().expect("Invalid sender email address."); + let timeout = self.timeout(); + EmailClient::new( + self.base_url, + sender_email, + self.authorization_token, + timeout, + ) + } + pub fn sender(&self) -> Result { SubscriberEmail::parse(self.sender_email.clone()) } diff --git a/src/idempotency/key.rs b/src/idempotency/key.rs new file mode 100644 index 0000000..8f21106 --- /dev/null +++ b/src/idempotency/key.rs @@ -0,0 +1,32 @@ +#[derive(Debug)] +pub struct IdempotencyKey(String); + +impl TryFrom for IdempotencyKey { + type Error = anyhow::Error; + + fn try_from(s: String) -> Result { + if s.is_empty() { + anyhow::bail!("The idempotency key cannot be empty"); + } + let max_length = 50; + if s.len() >= max_length { + anyhow::bail!( + "The idempotency key must be shorter + than {max_length} characters" + ); + } + Ok(Self(s)) + } +} + +impl From for String { + fn from(k: IdempotencyKey) -> Self { + k.0 + } +} + +impl AsRef for IdempotencyKey { + fn as_ref(&self) -> &str { + &self.0 + } +} diff --git a/src/idempotency/mod.rs b/src/idempotency/mod.rs new file mode 100644 index 0000000..0a05ef7 --- /dev/null +++ b/src/idempotency/mod.rs @@ -0,0 +1,6 @@ +mod key; +mod persistence; +pub use key::IdempotencyKey; +pub use persistence::get_saved_response; +pub use persistence::save_response; +pub use persistence::{try_processing, NextAction}; diff --git a/src/idempotency/persistence.rs b/src/idempotency/persistence.rs new file mode 100644 index 0000000..a43ad1c --- /dev/null +++ b/src/idempotency/persistence.rs @@ -0,0 +1,136 @@ +use super::IdempotencyKey; +use actix_web::body::to_bytes; +use actix_web::http::StatusCode; +use actix_web::HttpResponse; +use sqlx::postgres::PgHasArrayType; +use sqlx::PgPool; +use sqlx::{Postgres, Transaction}; +use uuid::Uuid; + +#[derive(Debug, sqlx::Type)] +#[sqlx(type_name = "header_pair")] +struct HeaderPairRecord { + name: String, + value: Vec, +} + +impl PgHasArrayType for HeaderPairRecord { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("_header_pair") + } +} + +pub async fn get_saved_response( + pool: &PgPool, + idempotency_key: &IdempotencyKey, + user_id: Uuid, +) -> Result, anyhow::Error> { + let saved_response = sqlx::query!( + r#" + SELECT + response_status_code as "response_status_code!", + response_headers as "response_headers!: Vec", + response_body as "response_body!" + FROM idempotency + WHERE + user_id = $1 AND + idempotency_key = $2 + "#, + user_id, + idempotency_key.as_ref() + ) + .fetch_optional(pool) + .await?; + if let Some(r) = saved_response { + let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?; + let mut response = HttpResponse::build(status_code); + for HeaderPairRecord { name, value } in r.response_headers { + response.append_header((name, value)); + } + Ok(Some(response.body(r.response_body))) + } else { + Ok(None) + } +} + +pub async fn save_response( + mut transaction: Transaction<'static, Postgres>, + idempotency_key: &IdempotencyKey, + user_id: Uuid, + http_response: HttpResponse, +) -> Result { + let (response_head, body) = http_response.into_parts(); + let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?; + let status_code = response_head.status().as_u16() as i16; + let headers = { + let mut h = Vec::with_capacity(response_head.headers().len()); + for (name, value) in response_head.headers().iter() { + let name = name.as_str().to_owned(); + let value = value.as_bytes().to_owned(); + h.push(HeaderPairRecord { name, value }); + } + h + }; + sqlx::query_unchecked!( + r#" + UPDATE idempotency + SET + response_status_code = $3, + response_headers = $4, + response_body = $5 + WHERE + user_id = $1 AND + idempotency_key = $2 + "#, + user_id, + idempotency_key.as_ref(), + status_code, + headers, + body.as_ref() + ) + .execute(&mut transaction) + .await?; + transaction.commit().await?; + + let http_response = response_head.set_body(body).map_into_boxed_body(); + Ok(http_response) +} + +#[allow(clippy::large_enum_variant)] +pub enum NextAction { + // Return transaction for later usage + StartProcessing(Transaction<'static, Postgres>), + ReturnSavedResponse(HttpResponse), +} + +pub async fn try_processing( + pool: &PgPool, + idempotency_key: &IdempotencyKey, + user_id: Uuid, +) -> Result { + let mut transaction = pool.begin().await?; + let n_inserted_rows = sqlx::query!( + r#" + INSERT INTO idempotency ( + user_id, + idempotency_key, + created_at + ) + VALUES ($1, $2, now()) + ON CONFLICT DO NOTHING + "#, + user_id, + idempotency_key.as_ref() + ) + .execute(&mut transaction) + .await? + .rows_affected(); + if n_inserted_rows > 0 { + Ok(NextAction::StartProcessing(transaction)) + } else { + let saved_response = get_saved_response(pool, idempotency_key, user_id) + .await? + .ok_or_else(|| anyhow::anyhow!("We expected a saved response, we didn't find it"))?; + Ok(NextAction::ReturnSavedResponse(saved_response)) + } +} diff --git a/src/issue_delivery_worker.rs b/src/issue_delivery_worker.rs new file mode 100644 index 0000000..2a7f890 --- /dev/null +++ b/src/issue_delivery_worker.rs @@ -0,0 +1,158 @@ +use crate::{configuration::Settings, startup::get_connection_pool}; +use crate::{domain::SubscriberEmail, email_client::EmailClient}; +use sqlx::{PgPool, Postgres, Transaction}; +use std::time::Duration; +use tracing::{field::display, Span}; +use uuid::Uuid; + +pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> { + let connection_pool = get_connection_pool(&configuration.database); + let email_client = configuration.email_client.client(); + worker_loop(connection_pool, email_client).await +} + +async fn worker_loop(pool: PgPool, email_client: EmailClient) -> Result<(), anyhow::Error> { + loop { + match try_execute_task(&pool, &email_client).await { + Ok(ExecutionOutcome::EmptyQueue) => { + tokio::time::sleep(Duration::from_secs(10)).await; + } + Err(_) => { + tokio::time::sleep(Duration::from_secs(1)).await; + } + Ok(ExecutionOutcome::TaskCompleted) => {} + } + } +} + +pub enum ExecutionOutcome { + TaskCompleted, + EmptyQueue, +} + +#[tracing::instrument( + skip_all, + fields( + newsletter_issue_id=tracing::field::Empty, + subscriber_email=tracing::field::Empty + ), + err +)] +pub async fn try_execute_task( + pool: &PgPool, + email_client: &EmailClient, +) -> Result { + let task = dequeue_task(pool).await?; + if task.is_none() { + return Ok(ExecutionOutcome::EmptyQueue); + } + let (transaction, issue_id, email) = task.unwrap(); + Span::current() + .record("newsletter_issue_id", &display(issue_id)) + .record("subscriber_email", &display(&email)); + match SubscriberEmail::parse(email.clone()) { + Ok(email) => { + let issue = get_issue(pool, issue_id).await?; + if let Err(e) = email_client + .send_email( + &email, + &issue.title, + &issue.html_content, + &issue.text_content, + ) + .await + { + tracing::error!( + error.cause_chain = ?e, + error.message = %e, + "Failed to deliver issue to a confirmed subscriber. \ + Skipping.", + ); + } + } + Err(e) => { + tracing::error!( + error.cause_chain = ?e, + error.message = %e, + "Skipping a confirmed subscriber. \ + Their stored contact details are invalid", + ); + } + } + delete_task(transaction, issue_id, &email).await?; + Ok(ExecutionOutcome::TaskCompleted) +} + +type PgTransaction = Transaction<'static, Postgres>; + +#[tracing::instrument(skip_all)] +async fn dequeue_task( + pool: &PgPool, +) -> Result, anyhow::Error> { + let mut transaction = pool.begin().await?; + let r = sqlx::query!( + r#" + SELECT newsletter_issue_id, subscriber_email + FROM issue_delivery_queue + FOR UPDATE + SKIP LOCKED + LIMIT 1 + "#, + ) + .fetch_optional(&mut transaction) + .await?; + if let Some(r) = r { + Ok(Some(( + transaction, + r.newsletter_issue_id, + r.subscriber_email, + ))) + } else { + Ok(None) + } +} + +#[tracing::instrument(skip_all)] +async fn delete_task( + mut transaction: PgTransaction, + issue_id: Uuid, + email: &str, +) -> Result<(), anyhow::Error> { + sqlx::query!( + r#" + DELETE FROM issue_delivery_queue + WHERE + newsletter_issue_id = $1 AND + subscriber_email = $2 + "#, + issue_id, + email + ) + .execute(&mut transaction) + .await?; + transaction.commit().await?; + Ok(()) +} + +struct NewsletterIssue { + title: String, + text_content: String, + html_content: String, +} + +#[tracing::instrument(skip_all)] +async fn get_issue(pool: &PgPool, issue_id: Uuid) -> Result { + let issue = sqlx::query_as!( + NewsletterIssue, + r#" + SELECT title, text_content, html_content + FROM newsletter_issues + WHERE + newsletter_issue_id = $1 + "#, + issue_id + ) + .fetch_one(pool) + .await?; + Ok(issue) +} diff --git a/src/lib.rs b/src/lib.rs index f469562..d5d0ffe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ pub mod authentication; pub mod configuration; pub mod domain; pub mod email_client; +pub mod idempotency; +pub mod issue_delivery_worker; pub mod routes; pub mod session_state; pub mod startup; diff --git a/src/main.rs b/src/main.rs index 786f6c6..63f47bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,7 @@ +use std::fmt::{Debug, Display}; +use tokio::task::JoinError; use zero2prod::configuration::get_configuration; +use zero2prod::issue_delivery_worker::run_worker_until_stopped; use zero2prod::startup::Application; use zero2prod::telemetry::{get_subscriber, init_subscriber}; @@ -8,7 +11,38 @@ async fn main() -> anyhow::Result<()> { init_subscriber(subscriber); let configuration = get_configuration().expect("Failed to read configuration."); - let application = Application::build(configuration).await?; - application.run_until_stopped().await?; + let application = Application::build(configuration.clone()).await?; + let application_task = tokio::spawn(application.run_until_stopped()); + let worker_task = tokio::spawn(run_worker_until_stopped(configuration)); + + tokio::select! { + o = application_task => report_exit("API", o), + o = worker_task => report_exit("Background worker", o), + }; + Ok(()) } + +fn report_exit(task_name: &str, outcome: Result, JoinError>) { + match outcome { + Ok(Ok(())) => { + tracing::info!("{} has exited", task_name) + } + Ok(Err(e)) => { + tracing::error!( + error.cause_chain = ?e, + error.message = %e, + "{} failed", + task_name + ) + } + Err(e) => { + tracing::error!( + error.cause_chain = ?e, + error.message = %e, + "{}' task failed to complete", + task_name + ) + } + } +} diff --git a/src/routes/admin/newsletter/get.rs b/src/routes/admin/newsletter/get.rs index 0213592..9484261 100644 --- a/src/routes/admin/newsletter/get.rs +++ b/src/routes/admin/newsletter/get.rs @@ -10,7 +10,7 @@ pub async fn publish_newsletter_form( for m in flash_messages.iter() { writeln!(msg_html, "

{}

", m.content()).unwrap(); } - + let idempotency_key = uuid::Uuid::new_v4(); Ok(HttpResponse::Ok() .content_type(ContentType::html()) .body(format!( @@ -49,6 +49,7 @@ pub async fn publish_newsletter_form( >
+

<- Back

diff --git a/src/routes/admin/newsletter/post.rs b/src/routes/admin/newsletter/post.rs index eb766eb..9ac0fe0 100644 --- a/src/routes/admin/newsletter/post.rs +++ b/src/routes/admin/newsletter/post.rs @@ -1,83 +1,119 @@ use crate::authentication::UserId; -use crate::domain::SubscriberEmail; -use crate::email_client::EmailClient; +use crate::idempotency::{save_response, try_processing, IdempotencyKey, NextAction}; +use crate::utils::e400; use crate::utils::{e500, see_other}; -use actix_web::web::ReqData; use actix_web::{web, HttpResponse}; use actix_web_flash_messages::FlashMessage; use anyhow::Context; -use sqlx::PgPool; +use sqlx::{PgPool, Postgres, Transaction}; +use uuid::Uuid; #[derive(serde::Deserialize)] pub struct FormData { title: String, text_content: String, html_content: String, + idempotency_key: String, +} + +fn success_message() -> FlashMessage { + FlashMessage::info( + "The newsletter issue has been accepted - \ + emails will go out shortly.", + ) } #[tracing::instrument( name = "Publish a newsletter issue", - skip(form, pool, email_client, user_id), - fields(user_id=%*user_id) + skip_all, + fields(user_id=%&*user_id) )] pub async fn publish_newsletter( form: web::Form, - user_id: ReqData, pool: web::Data, - email_client: web::Data, + user_id: web::ReqData, ) -> Result { - let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?; - for subscriber in subscribers { - match subscriber { - Ok(subscriber) => { - email_client - .send_email( - &subscriber.email, - &form.title, - &form.html_content, - &form.text_content, - ) - .await - .with_context(|| { - format!("Failed to send newsletter issue to {}", subscriber.email) - }) - .map_err(e500)?; - } - Err(error) => { - tracing::warn!( - error.cause_chain = ?error, - error.message = %error, - "Skipping a confirmed subscriber. Their stored contact details are invalid", - ); - } + let user_id = user_id.into_inner(); + let FormData { + title, + text_content, + html_content, + idempotency_key, + } = form.0; + let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?; + let mut transaction = match try_processing(&pool, &idempotency_key, *user_id) + .await + .map_err(e500)? + { + NextAction::StartProcessing(t) => t, + NextAction::ReturnSavedResponse(saved_response) => { + success_message().send(); + return Ok(saved_response); } - } - FlashMessage::info("The newsletter issue has been published!").send(); - Ok(see_other("/admin/newsletters")) + }; + let issue_id = insert_newsletter_issue(&mut transaction, &title, &text_content, &html_content) + .await + .context("Failed to store newsletter issue details") + .map_err(e500)?; + enqueue_delivery_tasks(&mut transaction, issue_id) + .await + .context("Failed to enqueue delivery tasks") + .map_err(e500)?; + let response = see_other("/admin/newsletters"); + let response = save_response(transaction, &idempotency_key, *user_id, response) + .await + .map_err(e500)?; + success_message().send(); + Ok(response) } -struct ConfirmedSubscriber { - email: SubscriberEmail, -} - -#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))] -async fn get_confirmed_subscribers( - pool: &PgPool, -) -> Result>, anyhow::Error> { - let confirmed_subscribers = sqlx::query!( +#[tracing::instrument(skip_all)] +async fn insert_newsletter_issue( + transaction: &mut Transaction<'_, Postgres>, + title: &str, + text_content: &str, + html_content: &str, +) -> Result { + let newsletter_issue_id = Uuid::new_v4(); + sqlx::query!( r#" - SELECT email + INSERT INTO newsletter_issues ( + newsletter_issue_id, + title, + text_content, + html_content, + published_at + ) + VALUES ($1, $2, $3, $4, now()) + "#, + newsletter_issue_id, + title, + text_content, + html_content + ) + .execute(transaction) + .await?; + Ok(newsletter_issue_id) +} + +#[tracing::instrument(skip_all)] +async fn enqueue_delivery_tasks( + transaction: &mut Transaction<'_, Postgres>, + newsletter_issue_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + INSERT INTO issue_delivery_queue ( + newsletter_issue_id, + subscriber_email + ) + SELECT $1, email FROM subscriptions WHERE status = 'confirmed' "#, + newsletter_issue_id, ) - .fetch_all(pool) - .await? - .into_iter() - .map(|r| match SubscriberEmail::parse(r.email) { - Ok(email) => Ok(ConfirmedSubscriber { email }), - Err(error) => Err(anyhow::anyhow!(error)), - }) - .collect(); - Ok(confirmed_subscribers) + .execute(transaction) + .await?; + Ok(()) } diff --git a/src/startup.rs b/src/startup.rs index 25b1951..93da91a 100644 --- a/src/startup.rs +++ b/src/startup.rs @@ -27,21 +27,8 @@ pub struct Application { impl Application { pub async fn build(configuration: Settings) -> Result { - let connection_pool = get_connection_pool(&configuration.database) - .await - .expect("Failed to connect to Postgres."); - - let sender_email = configuration - .email_client - .sender() - .expect("Invalid sender email address."); - let timeout = configuration.email_client.timeout(); - let email_client = EmailClient::new( - configuration.email_client.base_url, - sender_email, - configuration.email_client.authorization_token, - timeout, - ); + let connection_pool = get_connection_pool(&configuration.database); + let email_client = configuration.email_client.client(); let address = format!( "{}:{}", @@ -71,11 +58,10 @@ impl Application { } } -pub async fn get_connection_pool(configuration: &DatabaseSettings) -> Result { +pub fn get_connection_pool(configuration: &DatabaseSettings) -> PgPool { PgPoolOptions::new() .connect_timeout(std::time::Duration::from_secs(2)) - .connect_with(configuration.with_db()) - .await + .connect_lazy_with(configuration.with_db()) } pub struct ApplicationBaseUrl(pub String); diff --git a/src/utils.rs b/src/utils.rs index b606c98..6951176 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -8,6 +8,16 @@ where { actix_web::error::ErrorInternalServerError(e) } + +// Return a 400 with the user-representation of the validation error as body. +// The error root cause is preserved for logging purposes. +pub fn e400(e: T) -> actix_web::Error +where + T: std::fmt::Debug + std::fmt::Display + 'static, +{ + actix_web::error::ErrorBadRequest(e) +} + pub fn see_other(location: &str) -> HttpResponse { HttpResponse::SeeOther() .insert_header((LOCATION, location)) diff --git a/tests/api/helpers.rs b/tests/api/helpers.rs index 2c2dc0d..25ef7be 100644 --- a/tests/api/helpers.rs +++ b/tests/api/helpers.rs @@ -5,6 +5,8 @@ use sqlx::{Connection, Executor, PgConnection, PgPool}; use uuid::Uuid; use wiremock::MockServer; use zero2prod::configuration::{get_configuration, DatabaseSettings}; +use zero2prod::email_client::EmailClient; +use zero2prod::issue_delivery_worker::{try_execute_task, ExecutionOutcome}; use zero2prod::startup::{get_connection_pool, Application}; use zero2prod::telemetry::{get_subscriber, init_subscriber}; @@ -28,6 +30,7 @@ pub struct TestApp { pub email_server: MockServer, pub test_user: TestUser, pub api_client: reqwest::Client, + pub email_client: EmailClient, } /// Confirmation links embedded in the request to the email API. @@ -37,6 +40,18 @@ pub struct ConfirmationLinks { } impl TestApp { + pub async fn dispatch_all_pending_emails(&self) { + loop { + if let ExecutionOutcome::EmptyQueue = + try_execute_task(&self.db_pool, &self.email_client) + .await + .unwrap() + { + break; + } + } + } + pub async fn post_subscriptions(&self, body: String) -> reqwest::Response { self.api_client .post(&format!("{}/subscriptions", &self.address)) @@ -200,12 +215,11 @@ pub async fn spawn_app() -> TestApp { let test_app = TestApp { address: format!("http://localhost:{}", application_port), port: application_port, - db_pool: get_connection_pool(&configuration.database) - .await - .expect("Failed to connect to the database"), + db_pool: get_connection_pool(&configuration.database), email_server, test_user: TestUser::generate(), api_client: client, + email_client: configuration.email_client.client(), }; test_app.test_user.store(&test_app.db_pool).await; diff --git a/tests/api/newsletter.rs b/tests/api/newsletter.rs index 735e1ae..7eac180 100644 --- a/tests/api/newsletter.rs +++ b/tests/api/newsletter.rs @@ -1,9 +1,21 @@ use crate::helpers::{assert_is_redirect_to, spawn_app, ConfirmationLinks, TestApp}; +use fake::faker::internet::en::SafeEmail; +use fake::faker::name::en::Name; +use fake::Fake; +use std::time::Duration; use wiremock::matchers::{any, method, path}; -use wiremock::{Mock, ResponseTemplate}; +use wiremock::{Mock, MockBuilder, ResponseTemplate}; async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks { - let body = "name=le%20guin&email=ursula_le_guin%40gmail.com"; + // We are working with multiple subscribers now, + // their details must be randomised to avoid conflicts! + let name: String = Name().fake(); + let email: String = SafeEmail().fake(); + let body = serde_urlencoded::to_string(&serde_json::json!({ + "name": name, + "email": email + })) + .unwrap(); let _mock_guard = Mock::given(path("/email")) .and(method("POST")) @@ -54,13 +66,18 @@ async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() { "title": "Newsletter title", "text_content": "Newsletter body as plain text", "html_content": "

Newsletter body as HTML

", + "idempotency_key": uuid::Uuid::new_v4().to_string() }); let response = app.post_publish_newsletter(&newsletter_request_body).await; assert_is_redirect_to(&response, "/admin/newsletters"); // Act - Part 2 - Follow the redirect let html_page = app.get_publish_newsletter_html().await; - assert!(html_page.contains("

The newsletter issue has been published!

")); + assert!(html_page.contains( + "

The newsletter issue has been accepted - \ + emails will go out shortly.

" + )); + app.dispatch_all_pending_emails().await; // Mock verifies on Drop that we haven't sent the newsletter email } @@ -83,13 +100,18 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() { "title": "Newsletter title", "text_content": "Newsletter body as plain text", "html_content": "

Newsletter body as HTML

", + "idempotency_key": uuid::Uuid::new_v4().to_string() }); let response = app.post_publish_newsletter(&newsletter_request_body).await; assert_is_redirect_to(&response, "/admin/newsletters"); // Act - Part 2 - Follow the redirect let html_page = app.get_publish_newsletter_html().await; - assert!(html_page.contains("

The newsletter issue has been published!

")); + assert!(html_page.contains( + "

The newsletter issue has been accepted - \ + emails will go out shortly.

" + )); + app.dispatch_all_pending_emails().await; // Mock verifies on Drop that we have sent the newsletter email } @@ -115,9 +137,94 @@ async fn you_must_be_logged_in_to_publish_a_newsletter() { "title": "Newsletter title", "text_content": "Newsletter body as plain text", "html_content": "

Newsletter body as HTML

", + "idempotency_key": uuid::Uuid::new_v4().to_string() }); let response = app.post_publish_newsletter(&newsletter_request_body).await; // Assert assert_is_redirect_to(&response, "/login"); } + +#[tokio::test] +async fn newsletter_creation_is_idempotent() { + // Arrange + let app = spawn_app().await; + create_confirmed_subscriber(&app).await; + app.test_user.login(&app).await; + + Mock::given(path("/email")) + .and(method("POST")) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&app.email_server) + .await; + + // Act - Part 1 - Submit newsletter form + let newsletter_request_body = serde_json::json!({ + "title": "Newsletter title", + "text_content": "Newsletter body as plain text", + "html_content": "

Newsletter body as HTML

", + // We expect the idempotency key as part of the + // form data, not as an header + "idempotency_key": uuid::Uuid::new_v4().to_string() + }); + let response = app.post_publish_newsletter(&newsletter_request_body).await; + assert_is_redirect_to(&response, "/admin/newsletters"); + + // Act - Part 2 - Follow the redirect + let html_page = app.get_publish_newsletter_html().await; + assert!(html_page.contains( + "

The newsletter issue has been accepted - \ + emails will go out shortly.

" + )); + + // Act - Part 3 - Submit newsletter form **again** + let response = app.post_publish_newsletter(&newsletter_request_body).await; + assert_is_redirect_to(&response, "/admin/newsletters"); + + // Act - Part 4 - Follow the redirect + let html_page = app.get_publish_newsletter_html().await; + assert!(html_page.contains( + "

The newsletter issue has been accepted - \ + emails will go out shortly.

" + )); + + app.dispatch_all_pending_emails().await; + // Mock verifies on Drop that we have sent the newsletter email **once** +} + +#[tokio::test] +async fn concurrent_form_submission_is_handled_gracefully() { + // Arrange + let app = spawn_app().await; + create_confirmed_subscriber(&app).await; + app.test_user.login(&app).await; + + Mock::given(path("/email")) + .and(method("POST")) + // Setting a long delay to ensure that the second request + // arrives before the first one completes + .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2))) + .expect(1) + .mount(&app.email_server) + .await; + + // Act - Submit two newsletter forms concurrently + let newsletter_request_body = serde_json::json!({ + "title": "Newsletter title", + "text_content": "Newsletter body as plain text", + "html_content": "

Newsletter body as HTML

", + "idempotency_key": uuid::Uuid::new_v4().to_string() + }); + let response1 = app.post_publish_newsletter(&newsletter_request_body); + let response2 = app.post_publish_newsletter(&newsletter_request_body); + let (response1, response2) = tokio::join!(response1, response2); + + assert_eq!(response1.status(), response2.status()); + assert_eq!( + response1.text().await.unwrap(), + response2.text().await.unwrap() + ); + app.dispatch_all_pending_emails().await; + // Mock verifies on Drop that we have sent the newsletter email **once** +}