diff --git a/CHANGELOG.md b/CHANGELOG.md index 19c5067..eb36830 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Support Monero Wallet RPC authentication. - Added `create-user` command. +- Added `read-outbox` command. ### Changed diff --git a/mitra-cli/src/cli.rs b/mitra-cli/src/cli.rs index 46bb89d..e80483d 100644 --- a/mitra-cli/src/cli.rs +++ b/mitra-cli/src/cli.rs @@ -7,6 +7,7 @@ use mitra::activitypub::{ builders::delete_note::prepare_delete_note, builders::delete_person::prepare_delete_person, fetcher::fetchers::fetch_actor, + fetcher::helpers::import_from_outbox, }; use mitra::admin::roles::{role_from_str, ALLOWED_ROLES}; use mitra::ethereum::{ @@ -86,6 +87,7 @@ pub enum SubCommand { SetPassword(SetPassword), SetRole(SetRole), RefetchActor(RefetchActor), + ReadOutbox(ReadOutbox), DeleteProfile(DeleteProfile), DeletePost(DeletePost), DeleteEmoji(DeleteEmoji), @@ -277,6 +279,27 @@ impl RefetchActor { } } +/// Pull activities from actor's outbox +#[derive(Parser)] +pub struct ReadOutbox { + actor_id: String, +} + +impl ReadOutbox { + pub async fn execute( + &self, + config: &Config, + db_client: &mut impl DatabaseClient, + ) -> Result<(), Error> { + import_from_outbox( + config, + db_client, + &self.actor_id, + ).await?; + Ok(()) + } +} + /// Delete profile #[derive(Parser)] pub struct DeleteProfile { diff --git a/mitra-cli/src/main.rs b/mitra-cli/src/main.rs index 4be67b0..e3672a7 100644 --- a/mitra-cli/src/main.rs +++ b/mitra-cli/src/main.rs @@ -35,6 +35,7 @@ async fn main() { SubCommand::SetPassword(cmd) => cmd.execute(db_client).await.unwrap(), SubCommand::SetRole(cmd) => cmd.execute(db_client).await.unwrap(), SubCommand::RefetchActor(cmd) => cmd.execute(&config, db_client).await.unwrap(), + SubCommand::ReadOutbox(cmd) => cmd.execute(&config, db_client).await.unwrap(), SubCommand::DeleteProfile(cmd) => cmd.execute(&config, db_client).await.unwrap(), SubCommand::DeletePost(cmd) => cmd.execute(&config, db_client).await.unwrap(), SubCommand::DeleteEmoji(cmd) => cmd.execute(&config, db_client).await.unwrap(), diff --git a/src/activitypub/fetcher/fetchers.rs b/src/activitypub/fetcher/fetchers.rs index c35c96d..9da8afd 100644 --- a/src/activitypub/fetcher/fetchers.rs +++ b/src/activitypub/fetcher/fetchers.rs @@ -1,7 +1,8 @@ use std::path::Path; use reqwest::{Client, Method, RequestBuilder}; -use serde_json::Value; +use serde::Deserialize; +use serde_json::{Value as JsonValue}; use mitra_config::Instance; use mitra_utils::{ @@ -222,7 +223,31 @@ pub async fn fetch_object( object_url: &str, ) -> Result { let object_json = send_request(instance, object_url).await?; - let object_value: Value = serde_json::from_str(&object_json)?; + let object_value: JsonValue = serde_json::from_str(&object_json)?; let object: Object = serde_json::from_value(object_value)?; Ok(object) } + +const OUTBOX_PAGE_SIZE_LIMIT: usize = 5; + +pub async fn fetch_outbox( + instance: &Instance, + outbox_url: &str, +) -> Result, FetchError> { + #[derive(Deserialize)] + struct Collection { + first: String, + } + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct CollectionPage { + ordered_items: Vec, + } + let collection_json = send_request(instance, outbox_url).await?; + let collection: Collection = serde_json::from_str(&collection_json)?; + let page_json = send_request(instance, &collection.first).await?; + let page: CollectionPage = serde_json::from_str(&page_json)?; + let activities = page.ordered_items.into_iter() + .take(OUTBOX_PAGE_SIZE_LIMIT).collect(); + Ok(activities) +} diff --git a/src/activitypub/fetcher/helpers.rs b/src/activitypub/fetcher/helpers.rs index 111271b..a67a8fd 100644 --- a/src/activitypub/fetcher/helpers.rs +++ b/src/activitypub/fetcher/helpers.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use mitra_config::Instance; +use mitra_config::{Config, Instance}; use mitra_models::{ database::{DatabaseClient, DatabaseError}, posts::helpers::get_local_post_by_id, @@ -17,7 +17,7 @@ use crate::activitypub::{ actors::helpers::{create_remote_profile, update_remote_profile}, handlers::create::{get_object_links, handle_note}, identifiers::parse_local_object_id, - receiver::HandlerError, + receiver::{handle_activity, HandlerError}, types::Object, }; use crate::errors::ValidationError; @@ -26,6 +26,7 @@ use crate::webfinger::types::ActorAddress; use super::fetchers::{ fetch_actor, fetch_object, + fetch_outbox, perform_webfinger_query, FetchError, }; @@ -301,3 +302,29 @@ pub async fn import_post( .unwrap(); Ok(initial_post) } + +pub async fn import_from_outbox( + config: &Config, + db_client: &mut impl DatabaseClient, + actor_id: &str, +) -> Result<(), HandlerError> { + let instance = config.instance(); + let actor = fetch_actor(&instance, actor_id).await?; + let activities = fetch_outbox(&instance, &actor.outbox).await?; + log::info!("fetched {} activities", activities.len()); + for activity in activities { + let activity_actor = activity["actor"].as_str() + .ok_or(ValidationError("actor property is missing"))?; + if activity_actor != actor.id { + log::warn!("activity doesn't belong to outbox owner"); + continue; + }; + handle_activity( + config, + db_client, + &activity, + true, // is authenticated + ).await?; + }; + Ok(()) +}