Change actor status to "unreachable" if delivery to inbox fails

This commit is contained in:
silverpill 2022-12-03 21:06:15 +00:00
parent bd158b0a1f
commit 498be66d8b
8 changed files with 101 additions and 7 deletions

View file

@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Added `/api/v1/accounts/lookup` Mastodon API endpoint.
- Implemented activity delivery queue.
- Started to keep track of unreachable actors.
### Changed

View file

@ -0,0 +1 @@
ALTER TABLE actor_profile ADD COLUMN unreachable_since TIMESTAMP WITH TIME ZONE;

View file

@ -32,6 +32,7 @@ CREATE TABLE actor_profile (
actor_id VARCHAR(200) UNIQUE GENERATED ALWAYS AS (actor_json ->> 'id') STORED,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
unreachable_since TIMESTAMP WITH TIME ZONE,
CHECK ((hostname IS NULL) = (actor_json IS NULL))
);

View file

@ -10,7 +10,7 @@ use tokio::time::sleep;
use tokio_postgres::GenericClient;
use crate::config::Instance;
use crate::database::DatabaseError;
use crate::database::{get_database_client, DatabaseError, DbPool};
use crate::http_signatures::create::{
create_http_signature,
HttpSignatureError,
@ -20,7 +20,10 @@ use crate::json_signatures::create::{
sign_object,
JsonSignatureError,
};
use crate::models::users::types::User;
use crate::models::{
profiles::queries::set_reachability_status,
users::types::User,
};
use crate::utils::crypto_rsa::deserialize_private_key;
use super::actors::types::Actor;
use super::constants::{AP_MEDIA_TYPE, ACTOR_KEY_SUFFIX};
@ -46,6 +49,9 @@ pub enum DelivererError {
#[error("http error {0:?}")]
HttpError(reqwest::StatusCode),
#[error(transparent)]
DatabaseError(#[from] DatabaseError),
}
fn build_client(instance: &Instance) -> reqwest::Result<Client> {
@ -117,6 +123,7 @@ pub struct Recipient {
}
async fn deliver_activity_worker(
maybe_db_pool: Option<DbPool>,
instance: Instance,
sender: User,
activity: Value,
@ -185,6 +192,18 @@ async fn deliver_activity_worker(
};
retry_count += 1;
};
if let Some(ref db_pool) = maybe_db_pool {
// Get connection from pool only after finishing delivery
let db_client = &**get_database_client(db_pool).await?;
for (recipient, is_delivered) in queue {
set_reachability_status(
db_client,
&recipient.id,
is_delivered,
).await?;
};
};
Ok(())
}
@ -224,6 +243,7 @@ impl OutgoingActivity {
pub async fn deliver(self) -> Result<(), DelivererError> {
deliver_activity_worker(
None,
self.instance,
self.sender,
self.activity,
@ -243,6 +263,23 @@ impl OutgoingActivity {
});
}
pub fn spawn_deliver_with_tracking(
self,
db_pool: DbPool,
) -> () {
tokio::spawn(async move {
deliver_activity_worker(
Some(db_pool),
self.instance,
self.sender,
self.activity,
self.recipients,
).await.unwrap_or_else(|err| {
log::error!("{}", err);
});
});
}
pub async fn enqueue(
self,
db_client: &impl GenericClient,

View file

@ -5,7 +5,12 @@ use tokio_postgres::GenericClient;
use uuid::Uuid;
use crate::config::Config;
use crate::database::{DatabaseError, DatabaseTypeError};
use crate::database::{
get_database_client,
DatabaseError,
DatabaseTypeError,
DbPool,
};
use crate::models::{
background_jobs::queries::{
enqueue_job,
@ -122,8 +127,9 @@ impl OutgoingActivityJobData {
pub async fn process_queued_outgoing_activities(
config: &Config,
db_client: &impl GenericClient,
db_pool: &DbPool,
) -> Result<(), DatabaseError> {
let db_client = &**get_database_client(db_pool).await?;
let batch_size = 1;
let batch = get_job_batch(
db_client,
@ -141,7 +147,7 @@ pub async fn process_queued_outgoing_activities(
activity: job_data.activity,
recipients: job_data.recipients,
};
outgoing_activity.spawn_deliver();
outgoing_activity.spawn_deliver_with_tracking(db_pool.clone());
delete_job_from_queue(db_client, &job.id).await?;
};
Ok(())

View file

@ -130,8 +130,7 @@ async fn outgoing_activity_queue_task(
config: &Config,
db_pool: &DbPool,
) -> Result<(), Error> {
let db_client = &**get_database_client(db_pool).await?;
process_queued_outgoing_activities(config, db_client).await?;
process_queued_outgoing_activities(config, db_pool).await?;
Ok(())
}

View file

@ -576,6 +576,36 @@ pub async fn update_post_count(
Ok(profile)
}
// Doesn't return error if profile doesn't exist
pub async fn set_reachability_status(
db_client: &impl GenericClient,
actor_id: &str,
is_reachable: bool,
) -> Result<(), DatabaseError> {
if !is_reachable {
// Don't update profile if unreachable_since is already set
db_client.execute(
"
UPDATE actor_profile
SET unreachable_since = CURRENT_TIMESTAMP
WHERE actor_id = $1 AND unreachable_since IS NULL
",
&[&actor_id],
).await?;
} else {
// Remove status (if set)
db_client.execute(
"
UPDATE actor_profile
SET unreachable_since = NULL
WHERE actor_id = $1
",
&[&actor_id],
).await?;
};
Ok(())
}
/// Finds all empty remote profiles
/// (without any posts, reactions, relationships)
/// updated before the specified date
@ -812,6 +842,23 @@ mod tests {
assert_eq!(profiles[0].id, profile.id);
}
#[tokio::test]
#[serial]
async fn test_set_reachability_status() {
let db_client = &create_test_database().await;
let actor_id = "https://example.com/users/test";
let profile_data = ProfileCreateData {
username: "test".to_string(),
hostname: Some("example.com".to_string()),
actor_json: Some(create_test_actor(actor_id)),
..Default::default()
};
let profile = create_profile(db_client, profile_data).await.unwrap();
set_reachability_status(db_client, actor_id, false).await.unwrap();
let profile = get_profile_by_id(db_client, &profile.id).await.unwrap();
assert_eq!(profile.unreachable_since.is_some(), true);
}
#[tokio::test]
#[serial]
async fn test_find_empty_profiles() {

View file

@ -302,6 +302,7 @@ pub struct DbActorProfile {
pub actor_json: Option<Actor>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub unreachable_since: Option<DateTime<Utc>>,
// auto-generated database fields
pub acct: String,
@ -381,6 +382,7 @@ impl Default for DbActorProfile {
actor_id: None,
created_at: now,
updated_at: now,
unreachable_since: None,
}
}
}