Refactor process_note() function

This commit is contained in:
silverpill 2021-12-05 22:10:35 +00:00
parent 28511eeafa
commit 21499b5fc3
3 changed files with 47 additions and 31 deletions

View file

@ -131,50 +131,66 @@ async fn get_or_fetch_profile_by_actor_id(
pub async fn process_note( pub async fn process_note(
config: &Config, config: &Config,
db_client: &mut impl GenericClient, db_client: &mut impl GenericClient,
object: Object, object_id: String,
object_received: Option<Object>,
) -> Result<Post, HttpError> { ) -> Result<Post, HttpError> {
match get_post_by_object_id(db_client, &object.id).await {
Ok(post) => return Ok(post), // post already exists
Err(DatabaseError::NotFound(_)) => (), // continue processing
Err(other_error) => return Err(other_error.into()),
};
let instance = config.instance(); let instance = config.instance();
let initial_object_id = object.id.clone(); let mut maybe_object_id_to_fetch = Some(object_id);
let mut maybe_parent_object_id = object.in_reply_to.clone(); let mut maybe_object = object_received;
let mut objects = vec![object]; let mut objects = vec![];
let mut posts = vec![]; let mut posts = vec![];
// Fetch ancestors by going through inReplyTo references // Fetch ancestors by going through inReplyTo references
// TODO: fetch replies too // TODO: fetch replies too
#[allow(clippy::while_let_loop)] #[allow(clippy::while_let_loop)]
loop { loop {
let object_id = match maybe_parent_object_id { let object_id = match maybe_object_id_to_fetch {
Some(parent_object_id) => { Some(object_id) => {
if parse_object_id(&instance.url(), &parent_object_id).is_ok() { if parse_object_id(&instance.url(), &object_id).is_ok() {
// Parent object is a local post // Object is a local post
assert!(objects.len() > 0);
break; break;
} }
match get_post_by_object_id(db_client, &parent_object_id).await { match get_post_by_object_id(db_client, &object_id).await {
Ok(_) => { Ok(post) => {
// Parent object has been fetched already // Object already fetched
if objects.len() == 0 {
// Return post corresponding to initial object ID
return Ok(post);
};
break; break;
}, },
Err(DatabaseError::NotFound(_)) => (), Err(DatabaseError::NotFound(_)) => (),
Err(other_error) => return Err(other_error.into()), Err(other_error) => return Err(other_error.into()),
}; };
parent_object_id object_id
}, },
None => { None => {
// Object does not have a parent // No object to fetch
break; break;
}, },
}; };
let object = fetch_object(&instance, &object_id).await let object = match maybe_object {
.map_err(|_| ValidationError("failed to fetch object"))?; Some(object) => object,
maybe_parent_object_id = object.in_reply_to.clone(); None => {
objects.push(object); let object = fetch_object(&instance, &object_id).await
.map_err(|_| ValidationError("failed to fetch object"))?;
log::info!("fetched object {}", object.id);
object
},
};
if object.id != object_id {
// ID of fetched object doesn't match requested ID
maybe_object_id_to_fetch = Some(object.id.clone());
// Don't re-fetch object
maybe_object = Some(object);
} else {
maybe_object_id_to_fetch = object.in_reply_to.clone();
maybe_object = None;
objects.push(object);
};
} }
let initial_object_id = objects[0].id.clone();
// Objects are ordered according to their place in reply tree, // Objects are ordered according to their place in reply tree,
// starting with the root // starting with the root
@ -304,7 +320,7 @@ pub async fn receive_activity(
(CREATE, NOTE) => { (CREATE, NOTE) => {
let object: Object = serde_json::from_value(activity.object) let object: Object = serde_json::from_value(activity.object)
.map_err(|_| ValidationError("invalid object"))?; .map_err(|_| ValidationError("invalid object"))?;
process_note(config, db_client, object).await?; process_note(config, db_client, object.id.clone(), Some(object)).await?;
}, },
(ANNOUNCE, _) => { (ANNOUNCE, _) => {
let author = get_or_fetch_profile_by_actor_id( let author = get_or_fetch_profile_by_actor_id(

View file

@ -60,6 +60,7 @@ pub async fn get_nft_contract(
Ok((web3, token)) Ok((web3, token))
} }
#[allow(dead_code)]
#[derive(Debug)] #[derive(Debug)]
struct TokenTransfer { struct TokenTransfer {
tx_id: Option<H256>, tx_id: Option<H256>,

View file

@ -1,7 +1,7 @@
use regex::Regex; use regex::Regex;
use tokio_postgres::GenericClient; use tokio_postgres::GenericClient;
use crate::activitypub::fetcher::{fetch_object, fetch_profile}; use crate::activitypub::fetcher::fetch_profile;
use crate::activitypub::receiver::process_note; use crate::activitypub::receiver::process_note;
use crate::config::Config; use crate::config::Config;
use crate::errors::{ValidationError, HttpError}; use crate::errors::{ValidationError, HttpError};
@ -82,12 +82,11 @@ async fn search_note(
// Not a valid URL // Not a valid URL
return Ok(None); return Ok(None);
}; };
let instance = config.instance(); let maybe_post = match process_note(
let maybe_post = match fetch_object(&instance, search_query).await { config, db_client,
Ok(object) => { search_query.to_string(), None,
let post = process_note(config, db_client, object).await?; ).await {
Some(post) Ok(post) => Some(post),
},
Err(err) => { Err(err) => {
log::warn!("{}", err); log::warn!("{}", err);
None None