diff --git a/CHANGELOG.md b/CHANGELOG.md index b50874a..4236bfa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Added `/api/v1/accounts/lookup` Mastodon API endpoint. - Implemented activity delivery queue. +- Started to keep track of unreachable actors. ### Changed diff --git a/migrations/V0037__actor_profile__add_unreachable_since.sql b/migrations/V0037__actor_profile__add_unreachable_since.sql new file mode 100644 index 0000000..d3b7054 --- /dev/null +++ b/migrations/V0037__actor_profile__add_unreachable_since.sql @@ -0,0 +1 @@ +ALTER TABLE actor_profile ADD COLUMN unreachable_since TIMESTAMP WITH TIME ZONE; diff --git a/migrations/schema.sql b/migrations/schema.sql index 10bc695..08d9629 100644 --- a/migrations/schema.sql +++ b/migrations/schema.sql @@ -32,6 +32,7 @@ CREATE TABLE actor_profile ( actor_id VARCHAR(200) UNIQUE GENERATED ALWAYS AS (actor_json ->> 'id') STORED, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, + unreachable_since TIMESTAMP WITH TIME ZONE, CHECK ((hostname IS NULL) = (actor_json IS NULL)) ); diff --git a/src/activitypub/deliverer.rs b/src/activitypub/deliverer.rs index 76300fe..61e5f45 100644 --- a/src/activitypub/deliverer.rs +++ b/src/activitypub/deliverer.rs @@ -10,7 +10,7 @@ use tokio::time::sleep; use tokio_postgres::GenericClient; use crate::config::Instance; -use crate::database::DatabaseError; +use crate::database::{get_database_client, DatabaseError, DbPool}; use crate::http_signatures::create::{ create_http_signature, HttpSignatureError, @@ -20,7 +20,10 @@ use crate::json_signatures::create::{ sign_object, JsonSignatureError, }; -use crate::models::users::types::User; +use crate::models::{ + profiles::queries::set_reachability_status, + users::types::User, +}; use crate::utils::crypto_rsa::deserialize_private_key; use super::actors::types::Actor; use super::constants::{AP_MEDIA_TYPE, ACTOR_KEY_SUFFIX}; @@ -46,6 +49,9 @@ pub enum DelivererError { #[error("http error {0:?}")] HttpError(reqwest::StatusCode), + + #[error(transparent)] + DatabaseError(#[from] DatabaseError), } fn build_client(instance: &Instance) -> reqwest::Result { @@ -117,6 +123,7 @@ pub struct Recipient { } async fn deliver_activity_worker( + maybe_db_pool: Option, instance: Instance, sender: User, activity: Value, @@ -185,6 +192,18 @@ async fn deliver_activity_worker( }; retry_count += 1; }; + + if let Some(ref db_pool) = maybe_db_pool { + // Get connection from pool only after finishing delivery + let db_client = &**get_database_client(db_pool).await?; + for (recipient, is_delivered) in queue { + set_reachability_status( + db_client, + &recipient.id, + is_delivered, + ).await?; + }; + }; Ok(()) } @@ -224,6 +243,7 @@ impl OutgoingActivity { pub async fn deliver(self) -> Result<(), DelivererError> { deliver_activity_worker( + None, self.instance, self.sender, self.activity, @@ -243,6 +263,23 @@ impl OutgoingActivity { }); } + pub fn spawn_deliver_with_tracking( + self, + db_pool: DbPool, + ) -> () { + tokio::spawn(async move { + deliver_activity_worker( + Some(db_pool), + self.instance, + self.sender, + self.activity, + self.recipients, + ).await.unwrap_or_else(|err| { + log::error!("{}", err); + }); + }); + } + pub async fn enqueue( self, db_client: &impl GenericClient, diff --git a/src/activitypub/queues.rs b/src/activitypub/queues.rs index 3abdbc8..2b27dbc 100644 --- a/src/activitypub/queues.rs +++ b/src/activitypub/queues.rs @@ -5,7 +5,12 @@ use tokio_postgres::GenericClient; use uuid::Uuid; use crate::config::Config; -use crate::database::{DatabaseError, DatabaseTypeError}; +use crate::database::{ + get_database_client, + DatabaseError, + DatabaseTypeError, + DbPool, +}; use crate::models::{ background_jobs::queries::{ enqueue_job, @@ -122,8 +127,9 @@ impl OutgoingActivityJobData { pub async fn process_queued_outgoing_activities( config: &Config, - db_client: &impl GenericClient, + db_pool: &DbPool, ) -> Result<(), DatabaseError> { + let db_client = &**get_database_client(db_pool).await?; let batch_size = 1; let batch = get_job_batch( db_client, @@ -141,7 +147,7 @@ pub async fn process_queued_outgoing_activities( activity: job_data.activity, recipients: job_data.recipients, }; - outgoing_activity.spawn_deliver(); + outgoing_activity.spawn_deliver_with_tracking(db_pool.clone()); delete_job_from_queue(db_client, &job.id).await?; }; Ok(()) diff --git a/src/job_queue/scheduler.rs b/src/job_queue/scheduler.rs index 2a184af..078defd 100644 --- a/src/job_queue/scheduler.rs +++ b/src/job_queue/scheduler.rs @@ -130,8 +130,7 @@ async fn outgoing_activity_queue_task( config: &Config, db_pool: &DbPool, ) -> Result<(), Error> { - let db_client = &**get_database_client(db_pool).await?; - process_queued_outgoing_activities(config, db_client).await?; + process_queued_outgoing_activities(config, db_pool).await?; Ok(()) } diff --git a/src/models/profiles/queries.rs b/src/models/profiles/queries.rs index e4efd93..85f88ce 100644 --- a/src/models/profiles/queries.rs +++ b/src/models/profiles/queries.rs @@ -576,6 +576,36 @@ pub async fn update_post_count( Ok(profile) } +// Doesn't return error if profile doesn't exist +pub async fn set_reachability_status( + db_client: &impl GenericClient, + actor_id: &str, + is_reachable: bool, +) -> Result<(), DatabaseError> { + if !is_reachable { + // Don't update profile if unreachable_since is already set + db_client.execute( + " + UPDATE actor_profile + SET unreachable_since = CURRENT_TIMESTAMP + WHERE actor_id = $1 AND unreachable_since IS NULL + ", + &[&actor_id], + ).await?; + } else { + // Remove status (if set) + db_client.execute( + " + UPDATE actor_profile + SET unreachable_since = NULL + WHERE actor_id = $1 + ", + &[&actor_id], + ).await?; + }; + Ok(()) +} + /// Finds all empty remote profiles /// (without any posts, reactions, relationships) /// updated before the specified date @@ -812,6 +842,23 @@ mod tests { assert_eq!(profiles[0].id, profile.id); } + #[tokio::test] + #[serial] + async fn test_set_reachability_status() { + let db_client = &create_test_database().await; + let actor_id = "https://example.com/users/test"; + let profile_data = ProfileCreateData { + username: "test".to_string(), + hostname: Some("example.com".to_string()), + actor_json: Some(create_test_actor(actor_id)), + ..Default::default() + }; + let profile = create_profile(db_client, profile_data).await.unwrap(); + set_reachability_status(db_client, actor_id, false).await.unwrap(); + let profile = get_profile_by_id(db_client, &profile.id).await.unwrap(); + assert_eq!(profile.unreachable_since.is_some(), true); + } + #[tokio::test] #[serial] async fn test_find_empty_profiles() { diff --git a/src/models/profiles/types.rs b/src/models/profiles/types.rs index 35af338..190ff26 100644 --- a/src/models/profiles/types.rs +++ b/src/models/profiles/types.rs @@ -302,6 +302,7 @@ pub struct DbActorProfile { pub actor_json: Option, pub created_at: DateTime, pub updated_at: DateTime, + pub unreachable_since: Option>, // auto-generated database fields pub acct: String, @@ -381,6 +382,7 @@ impl Default for DbActorProfile { actor_id: None, created_at: now, updated_at: now, + unreachable_since: None, } } }