Some refactoring of puller.rs

This commit is contained in:
Felix Ableitner 2020-04-08 14:08:33 +02:00
parent d2bad5f79e
commit edd0ef5991

View file

@ -12,6 +12,7 @@ use diesel::result::Error::NotFound;
use diesel::PgConnection; use diesel::PgConnection;
use failure::Error; use failure::Error;
use isahc::prelude::*; use isahc::prelude::*;
use log::warn;
use serde::Deserialize; use serde::Deserialize;
use std::time::Duration; use std::time::Duration;
@ -26,30 +27,29 @@ fn fetch_node_info(domain: &str) -> Result<NodeInfo, Error> {
} }
fn fetch_communities_from_instance( fn fetch_communities_from_instance(
domain: &str, community_list_url: &str,
conn: &PgConnection, conn: &PgConnection,
) -> Result<Vec<CommunityForm>, Error> { ) -> Result<Vec<Community>, Error> {
let node_info = fetch_node_info(domain)?; fetch_remote_object::<UnorderedCollection>(community_list_url)?
if let Some(community_list_url) = node_info.metadata.community_list_url {
let collection = fetch_remote_object::<UnorderedCollection>(&community_list_url)?;
let object_boxes = collection
.collection_props .collection_props
.get_many_items_base_boxes() .get_many_items_base_boxes()
.unwrap(); .unwrap()
let communities: Result<Vec<CommunityForm>, Error> = object_boxes .map(|b| -> Result<CommunityForm, Error> {
.map(|c| { let group = b.to_owned().to_concrete::<GroupExt>()?;
let group = c.to_owned().to_concrete::<GroupExt>()?; Ok(CommunityForm::from_group(&group, conn)?)
CommunityForm::from_group(&group, conn)
}) })
.collect(); .map(
Ok(communities?) |cf: Result<CommunityForm, Error>| -> Result<Community, Error> {
} else { let cf2 = cf?;
Err(format_err!( let existing = Community::read_from_actor_id(conn, &cf2.actor_id);
"{} is not a Lemmy instance, federation is not supported", match existing {
domain Err(NotFound {}) => Ok(Community::create(conn, &cf2)?),
)) Ok(c) => Ok(Community::update(conn, c.id, &cf2)?),
Err(e) => Err(Error::from(e)),
} }
},
)
.collect()
} }
// TODO: add an optional param last_updated and only fetch if its too old // TODO: add an optional param last_updated and only fetch if its too old
@ -60,7 +60,6 @@ where
if Settings::get().federation.tls_enabled && !uri.starts_with("https://") { if Settings::get().federation.tls_enabled && !uri.starts_with("https://") {
return Err(format_err!("Activitypub uri is insecure: {}", uri)); return Err(format_err!("Activitypub uri is insecure: {}", uri));
} }
// TODO: should cache responses here when we are in production
// TODO: this function should return a future // TODO: this function should return a future
let timeout = Duration::from_secs(60); let timeout = Duration::from_secs(60);
let text = Request::get(uri) let text = Request::get(uri)
@ -76,23 +75,35 @@ where
fn fetch_remote_community_posts( fn fetch_remote_community_posts(
instance: &str, instance: &str,
community: &str, community: &Community,
conn: &PgConnection, conn: &PgConnection,
) -> Result<Vec<PostForm>, Error> { ) -> Result<Vec<Post>, Error> {
let endpoint = format!("http://{}/federation/c/{}", instance, community); let endpoint = format!("http://{}/federation/c/{}", instance, community.name);
let community = fetch_remote_object::<GroupExt>(&endpoint)?; let group = fetch_remote_object::<GroupExt>(&endpoint)?;
let outbox_uri = &community.extension.get_outbox().to_string(); let outbox_uri = &group.extension.get_outbox().to_string();
// TODO: outbox url etc should be stored in local db
let outbox = fetch_remote_object::<OrderedCollection>(outbox_uri)?; let outbox = fetch_remote_object::<OrderedCollection>(outbox_uri)?;
let items = outbox.collection_props.get_many_items_base_boxes(); let items = outbox.collection_props.get_many_items_base_boxes();
let posts = items Ok(
items
.unwrap() .unwrap()
.map(|obox: &BaseBox| { .map(|obox: &BaseBox| -> Result<PostForm, Error> {
let page = obox.clone().to_concrete::<Page>().unwrap(); let page = obox.clone().to_concrete::<Page>()?;
PostForm::from_page(&page, conn) PostForm::from_page(&page, conn)
}) })
.collect::<Result<Vec<PostForm>, Error>>()?; .map(|pf: Result<PostForm, Error>| -> Result<Post, Error> {
Ok(posts) let mut pf2 = pf?;
pf2.community_id = community.id;
let existing = Post::read_from_apub_id(conn, &pf2.ap_id);
match existing {
Err(NotFound {}) => Ok(Post::create(conn, &pf2)?),
Ok(p) => Ok(Post::update(conn, p.id, &pf2)?),
Err(e) => Err(Error::from(e)),
}
})
.collect::<Result<Vec<Post>, Error>>()?,
)
} }
pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result<User_, Error> { pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result<User_, Error> {
@ -110,25 +121,17 @@ pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result<User_, Er
// after that, we should rely in the inbox, and fetch on demand when needed // after that, we should rely in the inbox, and fetch on demand when needed
pub fn fetch_all(conn: &PgConnection) -> Result<(), Error> { pub fn fetch_all(conn: &PgConnection) -> Result<(), Error> {
for instance in &get_following_instances() { for instance in &get_following_instances() {
let communities = fetch_communities_from_instance(instance, conn)?; let node_info = fetch_node_info(instance)?;
if let Some(community_list_url) = node_info.metadata.community_list_url {
for community in &communities { let communities = fetch_communities_from_instance(&community_list_url, conn)?;
let existing = Community::read_from_actor_id(conn, &community.actor_id); for c in communities {
let community_id = match existing { fetch_remote_community_posts(instance, &c, conn)?;
Err(NotFound {}) => Community::create(conn, community)?.id,
Ok(c) => Community::update(conn, c.id, community)?.id,
Err(e) => return Err(Error::from(e)),
};
let mut posts = fetch_remote_community_posts(instance, &community.name, conn)?;
for post_ in &mut posts {
post_.community_id = community_id;
let existing = Post::read_from_apub_id(conn, &post_.ap_id);
match existing {
Err(NotFound {}) => Post::create(conn, post_)?,
Ok(p) => Post::update(conn, p.id, post_)?,
Err(e) => return Err(Error::from(e)),
};
} }
} else {
warn!(
"{} is not a Lemmy instance, federation is not supported",
instance
);
} }
} }
Ok(()) Ok(())