Simplifying transaction call (#5703) (#5726)

* Simplifying transaction call (#5703)

* fmt fix

* changing other transaction calls

* fmt fix
This commit is contained in:
Anton Boitsov 2025-06-03 10:43:02 +03:00 committed by GitHub
parent 0ee3262c58
commit f323da00f8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 66 additions and 68 deletions

View file

@ -1,6 +1,6 @@
use activitypub_federation::config::Data;
use actix_web::web::Json;
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection};
use diesel_async::scoped_futures::ScopedFutureExt;
use lemmy_api_common::{
community::{AddModToCommunity, AddModToCommunityResponse},
context::LemmyContext,
@ -18,7 +18,7 @@ use lemmy_db_schema::{
};
use lemmy_db_views_community_moderator::CommunityModeratorView;
use lemmy_db_views_local_user::LocalUserView;
use lemmy_utils::error::{LemmyError, LemmyResult};
use lemmy_utils::error::LemmyResult;
pub async fn add_mod_to_community(
data: Json<AddModToCommunity>,
@ -56,7 +56,7 @@ pub async fn add_mod_to_community(
let conn = &mut get_conn(pool).await?;
let tx_data = data.clone();
conn
.transaction::<_, LemmyError, _>(|conn| {
.run_transaction(|conn| {
async move {
// Update in local database
let community_moderator_form =

View file

@ -1,6 +1,6 @@
use activitypub_federation::config::Data;
use actix_web::web::Json;
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection};
use diesel_async::scoped_futures::ScopedFutureExt;
use lemmy_api_common::{
community::{BanFromCommunity, BanFromCommunityResponse},
context::LemmyContext,
@ -22,10 +22,7 @@ use lemmy_db_schema::{
};
use lemmy_db_views_local_user::LocalUserView;
use lemmy_db_views_person::PersonView;
use lemmy_utils::{
error::{LemmyError, LemmyResult},
utils::validation::is_valid_body_field,
};
use lemmy_utils::{error::LemmyResult, utils::validation::is_valid_body_field};
pub async fn ban_from_community(
data: Json<BanFromCommunity>,
@ -61,7 +58,7 @@ pub async fn ban_from_community(
let conn = &mut get_conn(pool).await?;
let tx_data = data.clone();
conn
.transaction::<_, LemmyError, _>(|conn| {
.run_transaction(|conn| {
async move {
if tx_data.ban {
CommunityActions::ban(&mut conn.into(), &community_user_ban_form).await?;

View file

@ -1,6 +1,6 @@
use activitypub_federation::config::Data;
use actix_web::web::Json;
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection};
use diesel_async::scoped_futures::ScopedFutureExt;
use lemmy_api_common::{
community::{BlockCommunity, BlockCommunityResponse},
context::LemmyContext,
@ -13,7 +13,7 @@ use lemmy_db_schema::{
};
use lemmy_db_views_community::CommunityView;
use lemmy_db_views_local_user::LocalUserView;
use lemmy_utils::error::{LemmyError, LemmyResult};
use lemmy_utils::error::LemmyResult;
pub async fn user_block_community(
data: Json<BlockCommunity>,
@ -28,7 +28,7 @@ pub async fn user_block_community(
let conn = &mut get_conn(pool).await?;
let tx_data = data.clone();
conn
.transaction::<_, LemmyError, _>(|conn| {
.run_transaction(|conn| {
async move {
if tx_data.block {
CommunityActions::block(&mut conn.into(), &community_block_form).await?;

View file

@ -1,6 +1,6 @@
use actix_web::web::{Data, Json};
use anyhow::Context;
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection};
use diesel_async::scoped_futures::ScopedFutureExt;
use lemmy_api_common::{
community::{GetCommunityResponse, TransferCommunity},
context::LemmyContext,
@ -18,7 +18,7 @@ use lemmy_db_views_community::CommunityView;
use lemmy_db_views_community_moderator::CommunityModeratorView;
use lemmy_db_views_local_user::LocalUserView;
use lemmy_utils::{
error::{LemmyError, LemmyErrorType, LemmyResult},
error::{LemmyErrorType, LemmyResult},
location_info,
};
@ -58,7 +58,7 @@ pub async fn transfer_community(
let conn = &mut get_conn(pool).await?;
let tx_data = data.clone();
conn
.transaction::<_, LemmyError, _>(|conn| {
.run_transaction(|conn| {
async move {
CommunityActions::delete_mods_for_community(&mut conn.into(), community_id).await?;

View file

@ -1,6 +1,6 @@
use activitypub_federation::config::Data;
use actix_web::web::Json;
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection};
use diesel_async::scoped_futures::ScopedFutureExt;
use lemmy_api_common::{
context::LemmyContext,
site::{ApproveRegistrationApplication, RegistrationApplicationResponse},
@ -17,7 +17,7 @@ use lemmy_db_schema::{
use lemmy_db_views_local_user::LocalUserView;
use lemmy_db_views_registration_applications::RegistrationApplicationView;
use lemmy_email::account::{send_application_approved_email, send_application_denied_email};
use lemmy_utils::error::{LemmyError, LemmyResult};
use lemmy_utils::error::LemmyResult;
pub async fn approve_registration_application(
data: Json<ApproveRegistrationApplication>,
@ -33,7 +33,7 @@ pub async fn approve_registration_application(
let conn = &mut get_conn(pool).await?;
let tx_data = data.clone();
let approved_user_id = conn
.transaction::<_, LemmyError, _>(|conn| {
.run_transaction(|conn| {
async move {
// Update the registration with reason, admin_id
let deny_reason = diesel_string_update(tx_data.deny_reason.as_deref());

View file

@ -1,6 +1,6 @@
use activitypub_federation::config::Data;
use actix_web::{web::Json, HttpRequest};
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection, AsyncPgConnection};
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncPgConnection};
use lemmy_api_common::{
claims::Claims,
context::LemmyContext,
@ -133,7 +133,7 @@ pub async fn register(
let tx_data = data.clone();
let tx_context = context.clone();
let user = conn
.transaction::<_, LemmyError, _>(|conn| {
.run_transaction(|conn| {
async move {
let site_view = SiteView::read_local(&mut tx_context.pool()).await?;
// We have to create both a person, and local_user
@ -350,7 +350,7 @@ pub async fn authenticate_with_oauth(
let tx_data = data.clone();
let tx_context = context.clone();
let user = conn
.transaction::<_, LemmyError, _>(|conn| {
.run_transaction(|conn| {
async move {
let site_view = SiteView::read_local(&mut tx_context.pool()).await?;
// make sure the username is provided

View file

@ -19,17 +19,11 @@ use diesel::{
delete,
dsl::{count, exists},
insert_into,
result::Error,
select,
ExpressionMethods,
QueryDsl,
};
use diesel_async::{
scoped_futures::ScopedFutureExt,
AsyncConnection,
AsyncPgConnection,
RunQueryDsl,
};
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncPgConnection, RunQueryDsl};
use lemmy_db_schema_file::schema::{
community_language,
local_site,
@ -76,14 +70,15 @@ impl LocalUserLanguage {
}
conn
.transaction::<_, Error, _>(|conn| {
.run_transaction(|conn| {
async move {
// Delete old languages, not including new languages
delete(local_user_language::table)
.filter(local_user_language::local_user_id.eq(for_local_user_id))
.filter(local_user_language::language_id.ne_all(&lang_ids))
.execute(conn)
.await?;
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLanguages)?;
let forms = lang_ids
.iter()
@ -103,11 +98,11 @@ impl LocalUserLanguage {
.do_nothing()
.execute(conn)
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLanguages)
}
.scope_boxed()
})
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLanguages)
}
}
@ -153,14 +148,15 @@ impl SiteLanguage {
}
conn
.transaction::<_, Error, _>(|conn| {
.run_transaction(|conn| {
async move {
// Delete old languages, not including new languages
delete(site_language::table)
.filter(site_language::site_id.eq(for_site_id))
.filter(site_language::language_id.ne_all(&lang_ids))
.execute(conn)
.await?;
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLanguages)?;
let forms = lang_ids
.iter()
@ -176,18 +172,16 @@ impl SiteLanguage {
.on_conflict((site_language::site_id, site_language::language_id))
.do_nothing()
.execute(conn)
.await?;
CommunityLanguage::limit_languages(conn, instance_id)
.await
.map_err(|_e| diesel::result::Error::NotFound)?;
.with_lemmy_type(LemmyErrorType::CouldntUpdateLanguages)?;
CommunityLanguage::limit_languages(conn, instance_id).await?;
Ok(())
}
.scope_boxed()
})
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLanguages)
}
}
@ -289,14 +283,15 @@ impl CommunityLanguage {
.collect::<Vec<_>>();
conn
.transaction::<_, Error, _>(|conn| {
.run_transaction(|conn| {
async move {
// Delete old languages, not including new languages
delete(community_language::table)
.filter(community_language::community_id.eq(for_community_id))
.filter(community_language::language_id.ne_all(&lang_ids))
.execute(conn)
.await?;
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLanguages)?;
// Insert new languages
insert_into(community_language::table)
@ -308,11 +303,11 @@ impl CommunityLanguage {
.do_nothing()
.execute(conn)
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLanguages)
}
.scope_boxed()
})
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLanguages)
}
}

View file

@ -6,13 +6,12 @@ use crate::{
use diesel::{
dsl::exists,
insert_into,
result::Error,
select,
BoolExpressionMethods,
ExpressionMethods,
QueryDsl,
};
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection, RunQueryDsl};
use diesel_async::{scoped_futures::ScopedFutureExt, RunQueryDsl};
use lemmy_db_schema_file::schema::{image_details, local_image, remote_image};
use lemmy_utils::error::{LemmyErrorExt, LemmyErrorType, LemmyResult};
use url::Url;
@ -25,23 +24,21 @@ impl LocalImage {
) -> LemmyResult<Self> {
let conn = &mut get_conn(pool).await?;
conn
.transaction::<_, Error, _>(|conn| {
.run_transaction(|conn| {
async move {
let local_insert = insert_into(local_image::table)
.values(form)
.get_result::<Self>(conn)
.await;
ImageDetails::create(&mut conn.into(), image_details_form)
.await
.map_err(|_e| diesel::result::Error::NotFound)?;
.with_lemmy_type(LemmyErrorType::CouldntCreateImage);
ImageDetails::create(&mut conn.into(), image_details_form).await?;
local_insert
}
.scope_boxed()
})
.await
.with_lemmy_type(LemmyErrorType::CouldntCreateImage)
}
pub async fn validate_by_alias_and_user(

View file

@ -3,8 +3,8 @@ use crate::{
source::keyword_block::{LocalUserKeywordBlock, LocalUserKeywordBlockForm},
utils::{get_conn, DbPool},
};
use diesel::{delete, insert_into, result::Error, ExpressionMethods, QueryDsl};
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection, RunQueryDsl};
use diesel::{delete, insert_into, ExpressionMethods, QueryDsl};
use diesel_async::{scoped_futures::ScopedFutureExt, RunQueryDsl};
use lemmy_db_schema_file::schema::local_user_keyword_block;
use lemmy_utils::error::{LemmyErrorExt, LemmyErrorType, LemmyResult};
@ -30,13 +30,14 @@ impl LocalUserKeywordBlock {
let conn = &mut get_conn(pool).await?;
// No need to update if keywords unchanged
conn
.transaction::<_, Error, _>(|conn| {
.run_transaction(|conn| {
async move {
delete(local_user_keyword_block::table)
.filter(local_user_keyword_block::local_user_id.eq(for_local_user_id))
.filter(local_user_keyword_block::keyword.ne_all(&blocking_keywords))
.execute(conn)
.await?;
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateKeywords)?;
let forms = blocking_keywords
.into_iter()
.map(|k| LocalUserKeywordBlockForm {
@ -49,10 +50,10 @@ impl LocalUserKeywordBlock {
.on_conflict_do_nothing()
.execute(conn)
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateKeywords)
}
.scope_boxed()
})
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateKeywords)
}
}

View file

@ -2,13 +2,8 @@ use crate::{
source::local_site_url_blocklist::{LocalSiteUrlBlocklist, LocalSiteUrlBlocklistForm},
utils::{get_conn, DbPool},
};
use diesel::{dsl::insert_into, result::Error};
use diesel_async::{
scoped_futures::ScopedFutureExt,
AsyncConnection,
AsyncPgConnection,
RunQueryDsl,
};
use diesel::dsl::insert_into;
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncPgConnection, RunQueryDsl};
use lemmy_db_schema_file::schema::local_site_url_blocklist;
use lemmy_utils::error::{LemmyErrorExt, LemmyErrorType, LemmyResult};
@ -17,11 +12,9 @@ impl LocalSiteUrlBlocklist {
let conn = &mut get_conn(pool).await?;
conn
.transaction::<_, Error, _>(|conn| {
.run_transaction(|conn| {
async move {
Self::clear(conn)
.await
.map_err(|_e| diesel::result::Error::NotFound)?;
Self::clear(conn).await?;
let forms = url_blocklist
.into_iter()
@ -32,11 +25,11 @@ impl LocalSiteUrlBlocklist {
.values(forms)
.execute(conn)
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLocalSiteUrlBlocklist)
}
.scope_boxed()
})
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdateLocalSiteUrlBlocklist)
}
async fn clear(conn: &mut AsyncPgConnection) -> LemmyResult<usize> {

View file

@ -27,13 +27,14 @@ use diesel_async::{
AsyncDieselConnectionManager,
ManagerConfig,
},
scoped_futures::ScopedBoxFuture,
AsyncConnection,
};
use futures_util::{future::BoxFuture, FutureExt};
use i_love_jesus::{CursorKey, PaginatedQueryBuilder, SortDirection};
use lemmy_db_schema_file::schema_setup;
use lemmy_utils::{
error::{LemmyErrorExt, LemmyErrorType, LemmyResult},
error::{LemmyError, LemmyErrorExt, LemmyErrorType, LemmyResult},
settings::SETTINGS,
utils::validation::clean_url,
};
@ -90,6 +91,21 @@ pub async fn get_conn<'a, 'b: 'a>(pool: &'a mut DbPool<'b>) -> Result<DbConn<'a>
})
}
impl DbConn<'_> {
pub async fn run_transaction<'a, R, F>(&mut self, callback: F) -> LemmyResult<R>
where
F: for<'r> FnOnce(&'r mut AsyncPgConnection) -> ScopedBoxFuture<'a, 'r, LemmyResult<R>>
+ Send
+ 'a,
R: Send + 'a,
{
self
.deref_mut()
.transaction::<_, LemmyError, _>(callback)
.await
}
}
impl Deref for DbConn<'_> {
type Target = AsyncPgConnection;
@ -610,7 +626,6 @@ pub fn paginate<Q, C>(
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;