diff --git a/crates/apub/src/activities/community/announce.rs b/crates/apub/src/activities/community/announce.rs index 0ece90b76..5bfcb9317 100644 --- a/crates/apub/src/activities/community/announce.rs +++ b/crates/apub/src/activities/community/announce.rs @@ -14,6 +14,7 @@ use lemmy_apub_lib::{ }; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; +use tracing::debug; #[async_trait::async_trait(?Send)] pub(crate) trait GetCommunity { @@ -113,7 +114,15 @@ impl ActivityHandler for AnnounceActivity { let object_value = serde_json::to_value(&self.object)?; let object_data: ActivityCommonFields = serde_json::from_value(object_value.to_owned())?; - insert_activity(&object_data.id, object_value, false, true, context.pool()).await?; + let insert = + insert_activity(&object_data.id, object_value, false, true, context.pool()).await?; + if !insert { + debug!( + "Received duplicate activity in announce {}", + object_data.id.to_string() + ); + return Ok(()); + } } } self.object.receive(context, request_counter).await diff --git a/crates/apub/src/http/mod.rs b/crates/apub/src/http/mod.rs index 3328e509d..477357e04 100644 --- a/crates/apub/src/http/mod.rs +++ b/crates/apub/src/http/mod.rs @@ -28,7 +28,7 @@ use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::LemmyContext; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, io::Read}; -use tracing::info; +use tracing::{debug, info}; use url::Url; mod comment; @@ -108,7 +108,15 @@ where // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen // if we receive the same activity twice in very quick succession. let object_value = serde_json::to_value(&activity)?; - insert_activity(&activity_data.id, object_value, false, true, context.pool()).await?; + let insert = + insert_activity(&activity_data.id, object_value, false, true, context.pool()).await?; + if !insert { + debug!( + "Received duplicate activity {}", + activity_data.id.to_string() + ); + return Ok(HttpResponse::BadRequest().finish()); + } info!("Receiving activity {}", activity_data.id.to_string()); activity diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index b540ac2ba..7a66e8aa2 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -201,11 +201,12 @@ async fn insert_activity( local: bool, sensitive: bool, pool: &DbPool, -) -> Result<(), LemmyError> { +) -> Result { let ap_id = ap_id.to_owned().into(); - blocking(pool, move |conn| { - Activity::insert(conn, ap_id, activity, local, sensitive) - }) - .await??; - Ok(()) + Ok( + blocking(pool, move |conn| { + Activity::insert(conn, ap_id, activity, local, sensitive) + }) + .await??, + ) } diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index d946d628f..261a89cbe 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -1,7 +1,10 @@ use crate::{newtypes::DbUrl, source::activity::*, traits::Crud}; -use diesel::{dsl::*, result::Error, *}; +use diesel::{ + dsl::*, + result::{DatabaseErrorKind, Error}, + *, +}; use serde_json::Value; -use std::io::{Error as IoError, ErrorKind}; impl Crud for Activity { type Form = ActivityForm; @@ -35,13 +38,14 @@ impl Crud for Activity { } impl Activity { + /// Returns true if the insert was successful pub fn insert( conn: &PgConnection, ap_id: DbUrl, data: Value, local: bool, sensitive: bool, - ) -> Result { + ) -> Result { let activity_form = ActivityForm { ap_id, data, @@ -49,13 +53,14 @@ impl Activity { sensitive, updated: None, }; - let result = Activity::create(conn, &activity_form); - match result { - Ok(s) => Ok(s), - Err(e) => Err(IoError::new( - ErrorKind::Other, - format!("Failed to insert activity into database: {}", e), - )), + match Activity::create(conn, &activity_form) { + Ok(_) => Ok(true), + Err(e) => { + if let Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) = e { + return Ok(false); + } + Err(e) + } } }