diff --git a/src/activitypub/handlers/mod.rs b/src/activitypub/handlers/mod.rs index 10a3f06..7f7996a 100644 --- a/src/activitypub/handlers/mod.rs +++ b/src/activitypub/handlers/mod.rs @@ -1,3 +1,8 @@ +use super::fetcher::helpers::ImportError; +// Handlers should return object type if activity has been accepted +// or None if it has been ignored +pub type HandlerResult = Result, ImportError>; + pub mod create_note; pub mod update_note; pub mod update_person; diff --git a/src/activitypub/handlers/update_note.rs b/src/activitypub/handlers/update_note.rs index 63d4748..a182070 100644 --- a/src/activitypub/handlers/update_note.rs +++ b/src/activitypub/handlers/update_note.rs @@ -2,28 +2,29 @@ use chrono::Utc; use tokio_postgres::GenericClient; use crate::activitypub::activity::Object; -use crate::activitypub::fetcher::helpers::ImportError; use crate::activitypub::receiver::parse_object_id; +use crate::activitypub::vocabulary::NOTE; use crate::errors::DatabaseError; use crate::models::posts::queries::{ get_post_by_object_id, update_post, }; use crate::models::posts::types::PostUpdateData; +use super::HandlerResult; use super::create_note::get_note_content; pub async fn handle_update_note( db_client: &mut impl GenericClient, instance_url: &str, object: Object, -) -> Result<(), ImportError> { +) -> HandlerResult { let post_id = match parse_object_id(instance_url, &object.id) { Ok(post_id) => post_id, Err(_) => { let post = match get_post_by_object_id(db_client, &object.id).await { Ok(post) => post, // Ignore Update if post is not found locally - Err(DatabaseError::NotFound(_)) => return Ok(()), + Err(DatabaseError::NotFound(_)) => return Ok(None), Err(other_error) => return Err(other_error.into()), }; post.id @@ -33,5 +34,5 @@ pub async fn handle_update_note( let updated_at = object.updated.unwrap_or(Utc::now()); let post_data = PostUpdateData { content, updated_at }; update_post(db_client, &post_id, post_data).await?; - Ok(()) + Ok(Some(NOTE)) } diff --git a/src/activitypub/handlers/update_person.rs b/src/activitypub/handlers/update_person.rs index dfcd25c..4cec8e7 100644 --- a/src/activitypub/handlers/update_person.rs +++ b/src/activitypub/handlers/update_person.rs @@ -7,6 +7,7 @@ use crate::activitypub::{ actor::Actor, fetcher::fetchers::fetch_avatar_and_banner, fetcher::helpers::ImportError, + vocabulary::PERSON, }; use crate::errors::ValidationError; use crate::models::profiles::queries::{ @@ -14,18 +15,20 @@ use crate::models::profiles::queries::{ update_profile, }; use crate::models::profiles::types::ProfileUpdateData; +use super::HandlerResult; pub async fn handle_update_person( db_client: &impl GenericClient, media_dir: &Path, activity: Activity, -) -> Result<(), ImportError> { +) -> HandlerResult { let actor: Actor = serde_json::from_value(activity.object) .map_err(|_| ValidationError("invalid actor data"))?; if actor.id != activity.actor { return Err(ValidationError("actor ID mismatch").into()); }; - update_actor(db_client, media_dir, actor).await + update_actor(db_client, media_dir, actor).await?; + Ok(Some(PERSON)) } pub async fn update_actor( diff --git a/src/activitypub/receiver.rs b/src/activitypub/receiver.rs index 1a289a6..994bef6 100644 --- a/src/activitypub/receiver.rs +++ b/src/activitypub/receiver.rs @@ -194,7 +194,7 @@ pub async fn receive_activity( return Err(HttpError::ValidationError("instance is blocked".into())); }; - let object_type = match (activity_type.as_str(), maybe_object_type) { + let maybe_object_type = match (activity_type.as_str(), maybe_object_type) { (ACCEPT, FOLLOW) => { require_actor_signature(&activity.actor, &signer_id)?; let actor_profile = get_profile_by_actor_id(db_client, &activity.actor).await?; @@ -205,7 +205,7 @@ pub async fn receive_activity( return Err(HttpError::ValidationError("actor is not a target".into())); }; follow_request_accepted(db_client, &follow_request_id).await?; - FOLLOW + Some(FOLLOW) }, (REJECT, FOLLOW) => { require_actor_signature(&activity.actor, &signer_id)?; @@ -217,7 +217,7 @@ pub async fn receive_activity( return Err(HttpError::ValidationError("actor is not a target".into())); }; follow_request_rejected(db_client, &follow_request_id).await?; - FOLLOW + Some(FOLLOW) }, (CREATE, NOTE | QUESTION | PAGE) => { let object: Object = serde_json::from_value(activity.object) @@ -230,7 +230,7 @@ pub async fn receive_activity( None }; import_post(config, db_client, object_id, object_received).await?; - NOTE + Some(NOTE) }, (ANNOUNCE, _) => { require_actor_signature(&activity.actor, &signer_id)?; @@ -261,7 +261,7 @@ pub async fn receive_activity( ..Default::default() }; create_post(db_client, &author.id, repost_data).await?; - NOTE + Some(NOTE) }, (DELETE, _) => { if signer_id != activity.actor { @@ -289,7 +289,7 @@ pub async fn receive_activity( actix_rt::spawn(async move { deletion_queue.process(&config).await; }); - NOTE + Some(NOTE) }, (EMOJI_REACT | LIKE, _) => { require_actor_signature(&activity.actor, &signer_id)?; @@ -323,7 +323,7 @@ pub async fn receive_activity( Err(DatabaseError::AlreadyExists(_)) => return Ok(()), Err(other_error) => return Err(other_error.into()), }; - NOTE + Some(NOTE) }, (FOLLOW, _) => { require_actor_signature(&activity.actor, &signer_id)?; @@ -354,7 +354,7 @@ pub async fn receive_activity( ); let recipients = vec![source_actor]; deliver_activity(config, &target_user, new_activity, recipients); - PERSON + Some(PERSON) }, (UNDO, FOLLOW) => { require_actor_signature(&activity.actor, &signer_id)?; @@ -371,7 +371,7 @@ pub async fn receive_activity( Err(DatabaseError::NotFound(_)) => return Ok(()), Err(other_error) => return Err(other_error.into()), }; - FOLLOW + Some(FOLLOW) }, (UNDO, _) => { require_actor_signature(&activity.actor, &signer_id)?; @@ -388,7 +388,7 @@ pub async fn receive_activity( &reaction.author_id, &reaction.post_id, ).await?; - LIKE + Some(LIKE) }, Err(DatabaseError::NotFound(_)) => { // Undo(Announce) @@ -407,7 +407,7 @@ pub async fn receive_activity( // Can't undo regular post None => return Err(HttpError::ValidationError("object is not a repost".into())), }; - ANNOUNCE + Some(ANNOUNCE) }, Err(other_error) => return Err(other_error.into()), } @@ -416,8 +416,7 @@ pub async fn receive_activity( require_actor_signature(&activity.actor, &signer_id)?; let object: Object = serde_json::from_value(activity.object) .map_err(|_| ValidationError("invalid object"))?; - handle_update_note(db_client, &config.instance_url(), object).await?; - NOTE + handle_update_note(db_client, &config.instance_url(), object).await? }, (UPDATE, PERSON) => { require_actor_signature(&activity.actor, &signer_id)?; @@ -425,20 +424,21 @@ pub async fn receive_activity( db_client, &config.media_dir(), activity, - ).await?; - PERSON + ).await? }, _ => { log::warn!("activity type is not supported: {}", activity_raw); return Ok(()); }, }; - log::info!( - "processed {}({}) from {}", - activity_type, - object_type, - activity_actor, - ); + if let Some(object_type) = maybe_object_type { + log::info!( + "processed {}({}) from {}", + activity_type, + object_type, + activity_actor, + ); + }; Ok(()) }