Add local jobs, query connected servers for info

This commit is contained in:
asonix 2020-03-22 22:52:42 -05:00
parent 97b5612717
commit 3bfa2c0e45
9 changed files with 624 additions and 38 deletions

110
src/jobs/instance.rs Normal file
View file

@ -0,0 +1,110 @@
use crate::jobs::JobState;
use activitystreams::primitives::XsdAnyUri;
use anyhow::Error;
use background_jobs::{Job, Processor};
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct QueryInstance {
listener: XsdAnyUri,
}
impl QueryInstance {
pub fn new(listener: XsdAnyUri) -> Self {
QueryInstance { listener }
}
async fn perform(mut self, state: JobState) -> Result<(), Error> {
let listener = self.listener.clone();
let url = self.listener.as_url_mut();
url.set_fragment(None);
url.set_query(None);
url.set_path("api/v1/instance");
let instance = state
.requests
.fetch::<Instance>(self.listener.as_str())
.await?;
let description = if instance.description.is_empty() {
instance.short_description
} else {
instance.description
};
if let Some(contact) = instance.contact {
state
.node_cache
.set_contact(
listener.clone(),
contact.username,
contact.display_name,
contact.url,
contact.avatar,
)
.await;
}
state
.node_cache
.set_instance(
listener,
instance.title,
description,
instance.version,
instance.registrations,
instance.approval_required,
)
.await;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct InstanceProcessor;
impl Job for QueryInstance {
type State = JobState;
type Processor = InstanceProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel();
actix::spawn(async move {
let _ = tx.send(self.perform(state).await);
});
Box::pin(async move { rx.await? })
}
}
impl Processor for InstanceProcessor {
type Job = QueryInstance;
const NAME: &'static str = "InstanceProcessor";
const QUEUE: &'static str = "default";
}
#[derive(serde::Deserialize)]
struct Instance {
title: String,
short_description: String,
description: String,
version: String,
registrations: bool,
approval_required: bool,
contact: Option<Contact>,
}
#[derive(serde::Deserialize)]
struct Contact {
username: String,
display_name: String,
url: XsdAnyUri,
avatar: XsdAnyUri,
}

View file

@ -1,66 +1,105 @@
mod deliver;
mod deliver_many;
mod instance;
mod nodeinfo;
mod process_listeners;
mod storage;
pub use self::{deliver::Deliver, deliver_many::DeliverMany};
pub use self::{
deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, nodeinfo::QueryNodeinfo,
};
use crate::{
db::Db,
error::MyError,
jobs::{deliver::DeliverProcessor, deliver_many::DeliverManyProcessor, storage::Storage},
jobs::{
deliver::DeliverProcessor,
deliver_many::DeliverManyProcessor,
instance::InstanceProcessor,
nodeinfo::NodeinfoProcessor,
process_listeners::{Listeners, ListenersProcessor},
storage::Storage,
},
node::NodeCache,
requests::Requests,
state::State,
};
use background_jobs::{Job, QueueHandle, WorkerConfig};
use background_jobs::{memory_storage::Storage as MemoryStorage, Job, QueueHandle, WorkerConfig};
use std::time::Duration;
pub fn create_server(db: Db) -> JobServer {
JobServer::new(background_jobs::create_server(Storage::new(db)))
let local = background_jobs::create_server(MemoryStorage::new());
let shared = background_jobs::create_server(Storage::new(db));
local.every(Duration::from_secs(60 * 5), Listeners);
JobServer::new(shared, local)
}
pub fn create_workers(state: State, job_server: JobServer) {
let queue_handle = job_server.queue_handle();
let state2 = state.clone();
let job_server2 = job_server.clone();
WorkerConfig::new(move || JobState::new(state.requests(), job_server.clone()))
let remote_handle = job_server.remote.clone();
let local_handle = job_server.local.clone();
WorkerConfig::new(move || JobState::new(state.clone(), job_server.clone()))
.register(DeliverProcessor)
.register(DeliverManyProcessor)
.set_processor_count("default", 4)
.start(queue_handle);
.start(remote_handle);
WorkerConfig::new(move || JobState::new(state2.clone(), job_server2.clone()))
.register(NodeinfoProcessor)
.register(InstanceProcessor)
.register(ListenersProcessor)
.set_processor_count("default", 4)
.start(local_handle);
}
#[derive(Clone)]
pub struct JobState {
requests: Requests,
state: State,
node_cache: NodeCache,
job_server: JobServer,
}
#[derive(Clone)]
pub struct JobServer {
inner: QueueHandle,
remote: QueueHandle,
local: QueueHandle,
}
impl JobState {
fn new(requests: Requests, job_server: JobServer) -> Self {
fn new(state: State, job_server: JobServer) -> Self {
JobState {
requests,
requests: state.requests(),
node_cache: state.node_cache(),
state,
job_server,
}
}
}
impl JobServer {
fn new(queue_handle: QueueHandle) -> Self {
fn new(remote_handle: QueueHandle, local_handle: QueueHandle) -> Self {
JobServer {
inner: queue_handle,
remote: remote_handle,
local: local_handle,
}
}
pub fn queue_handle(&self) -> QueueHandle {
self.inner.clone()
}
pub fn queue<J>(&self, job: J) -> Result<(), MyError>
where
J: Job,
{
self.inner.queue(job).map_err(MyError::Queue)
self.remote.queue(job).map_err(MyError::Queue)
}
pub fn queue_local<J>(&self, job: J) -> Result<(), MyError>
where
J: Job,
{
self.local.queue(job).map_err(MyError::Queue)
}
}

170
src/jobs/nodeinfo.rs Normal file
View file

@ -0,0 +1,170 @@
use crate::jobs::JobState;
use activitystreams::primitives::XsdAnyUri;
use anyhow::Error;
use background_jobs::{Job, Processor};
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct QueryNodeinfo {
listener: XsdAnyUri,
}
impl QueryNodeinfo {
pub fn new(listener: XsdAnyUri) -> Self {
QueryNodeinfo { listener }
}
async fn perform(mut self, state: JobState) -> Result<(), Error> {
let listener = self.listener.clone();
let url = self.listener.as_url_mut();
url.set_fragment(None);
url.set_query(None);
url.set_path(".well-known/nodeinfo");
let well_known = state
.requests
.fetch::<WellKnown>(self.listener.as_str())
.await?;
let href = if let Some(link) = well_known.links.into_iter().next() {
link.href
} else {
return Ok(());
};
let nodeinfo = state.requests.fetch::<Nodeinfo>(&href).await?;
state
.node_cache
.set_info(
listener,
nodeinfo.software.name,
nodeinfo.software.version,
nodeinfo.open_registrations,
)
.await;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct NodeinfoProcessor;
impl Job for QueryNodeinfo {
type State = JobState;
type Processor = NodeinfoProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel();
actix::spawn(async move {
let _ = tx.send(self.perform(state).await);
});
Box::pin(async move { rx.await? })
}
}
impl Processor for NodeinfoProcessor {
type Job = QueryNodeinfo;
const NAME: &'static str = "NodeinfoProcessor";
const QUEUE: &'static str = "default";
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct Nodeinfo {
#[allow(dead_code)]
version: SupportedVersion,
software: Software,
open_registrations: bool,
}
#[derive(serde::Deserialize)]
struct Software {
name: String,
version: String,
}
#[derive(serde::Deserialize)]
struct WellKnown {
links: Vec<Link>,
}
#[derive(serde::Deserialize)]
struct Link {
#[allow(dead_code)]
rel: SupportedNodeinfo,
href: String,
}
struct SupportedVersion;
struct SupportedNodeinfo;
static SUPPORTED_VERSION: &'static str = "2.0";
static SUPPORTED_NODEINFO: &'static str = "http://nodeinfo.diaspora.software/ns/schema/2.0";
struct SupportedVersionVisitor;
struct SupportedNodeinfoVisitor;
impl<'de> serde::de::Visitor<'de> for SupportedVersionVisitor {
type Value = SupportedVersion;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "the string '{}'", SUPPORTED_VERSION)
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if s == SUPPORTED_VERSION {
Ok(SupportedVersion)
} else {
Err(serde::de::Error::custom("Invalid nodeinfo version"))
}
}
}
impl<'de> serde::de::Visitor<'de> for SupportedNodeinfoVisitor {
type Value = SupportedNodeinfo;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "the string '{}'", SUPPORTED_NODEINFO)
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if s == SUPPORTED_NODEINFO {
Ok(SupportedNodeinfo)
} else {
Err(serde::de::Error::custom("Invalid nodeinfo version"))
}
}
}
impl<'de> serde::de::Deserialize<'de> for SupportedVersion {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
deserializer.deserialize_str(SupportedVersionVisitor)
}
}
impl<'de> serde::de::Deserialize<'de> for SupportedNodeinfo {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
deserializer.deserialize_str(SupportedNodeinfoVisitor)
}
}

View file

@ -0,0 +1,47 @@
use crate::jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState};
use anyhow::Error;
use background_jobs::{Job, Processor};
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Listeners;
#[derive(Clone, Debug)]
pub struct ListenersProcessor;
impl Listeners {
async fn perform(self, state: JobState) -> Result<(), Error> {
for listener in state.state.listeners().await {
state
.job_server
.queue_local(QueryInstance::new(listener.clone()))?;
state.job_server.queue_local(QueryNodeinfo::new(listener))?;
}
Ok(())
}
}
impl Job for Listeners {
type State = JobState;
type Processor = ListenersProcessor;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel();
actix::spawn(async move {
let _ = tx.send(self.perform(state).await);
});
Box::pin(async move { rx.await? })
}
}
impl Processor for ListenersProcessor {
type Job = Listeners;
const NAME: &'static str = "ProcessListenersProcessor";
const QUEUE: &'static str = "default";
}

View file

@ -18,6 +18,7 @@ mod db;
mod error;
mod inbox;
mod jobs;
mod node;
mod nodeinfo;
mod notify;
mod rehydrate;
@ -42,11 +43,11 @@ async fn index(
state: web::Data<State>,
config: web::Data<Config>,
) -> Result<HttpResponse, MyError> {
let listeners = state.listeners().await;
let nodes = state.node_cache().nodes().await;
let mut buf = BufWriter::new(Vec::new());
templates::index(&mut buf, &listeners, &config)?;
templates::index(&mut buf, &nodes, &config)?;
let buf = buf.into_inner().map_err(|e| {
error!("Error rendering template, {}", e.error());
MyError::FlushBuffer
@ -111,11 +112,10 @@ async fn main() -> Result<(), anyhow::Error> {
}
let state = State::hydrate(config.clone(), &db).await?;
let job_server = create_server(db.clone());
rehydrate::spawn(db.clone(), state.clone());
notify::spawn(state.clone(), &config)?;
let job_server = create_server(db.clone());
notify::spawn(state.clone(), job_server.clone(), &config)?;
if args.jobs_only() {
for _ in 0..num_cpus::get() {

194
src/node.rs Normal file
View file

@ -0,0 +1,194 @@
use activitystreams::primitives::XsdAnyUri;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::RwLock;
pub type ListenersCache = Arc<RwLock<HashSet<XsdAnyUri>>>;
#[derive(Clone)]
pub struct NodeCache {
listeners: ListenersCache,
nodes: Arc<RwLock<HashMap<XsdAnyUri, Node>>>,
}
impl NodeCache {
pub fn new(listeners: ListenersCache) -> Self {
NodeCache {
listeners,
nodes: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn nodes(&self) -> Vec<Node> {
let listeners: HashSet<_> = self.listeners.read().await.clone();
self.nodes
.read()
.await
.iter()
.filter_map(|(k, v)| {
if listeners.contains(k) {
Some(v.clone())
} else {
None
}
})
.collect()
}
pub async fn set_info(
&self,
listener: XsdAnyUri,
software: String,
version: String,
reg: bool,
) {
if !self.listeners.read().await.contains(&listener) {
let mut nodes = self.nodes.write().await;
nodes.remove(&listener);
return;
}
let mut write_guard = self.nodes.write().await;
let node = write_guard
.entry(listener.clone())
.or_insert(Node::new(listener));
node.set_info(software, version, reg);
}
pub async fn set_instance(
&self,
listener: XsdAnyUri,
title: String,
description: String,
version: String,
reg: bool,
requires_approval: bool,
) {
if !self.listeners.read().await.contains(&listener) {
let mut nodes = self.nodes.write().await;
nodes.remove(&listener);
return;
}
let mut write_guard = self.nodes.write().await;
let node = write_guard
.entry(listener.clone())
.or_insert(Node::new(listener));
node.set_instance(title, description, version, reg, requires_approval);
}
pub async fn set_contact(
&self,
listener: XsdAnyUri,
username: String,
display_name: String,
url: XsdAnyUri,
avatar: XsdAnyUri,
) {
if !self.listeners.read().await.contains(&listener) {
let mut nodes = self.nodes.write().await;
nodes.remove(&listener);
return;
}
let mut write_guard = self.nodes.write().await;
let node = write_guard
.entry(listener.clone())
.or_insert(Node::new(listener));
node.set_contact(username, display_name, url, avatar);
}
}
#[derive(Clone, Debug)]
pub struct Node {
pub base: XsdAnyUri,
pub info: Option<Info>,
pub instance: Option<Instance>,
pub contact: Option<Contact>,
}
impl Node {
pub fn new(mut uri: XsdAnyUri) -> Self {
let url = uri.as_mut();
url.set_fragment(None);
url.set_query(None);
url.set_path("");
Node {
base: uri,
info: None,
instance: None,
contact: None,
}
}
fn set_info(&mut self, software: String, version: String, reg: bool) -> &mut Self {
self.info = Some(Info {
software,
version,
reg,
});
self
}
fn set_instance(
&mut self,
title: String,
description: String,
version: String,
reg: bool,
requires_approval: bool,
) -> &mut Self {
self.instance = Some(Instance {
title,
description,
version,
reg,
requires_approval,
});
self
}
fn set_contact(
&mut self,
username: String,
display_name: String,
url: XsdAnyUri,
avatar: XsdAnyUri,
) -> &mut Self {
self.contact = Some(Contact {
username,
display_name,
url,
avatar,
});
self
}
}
#[derive(Clone, Debug)]
pub struct Info {
pub software: String,
pub version: String,
pub reg: bool,
}
#[derive(Clone, Debug)]
pub struct Instance {
pub title: String,
pub description: String,
pub version: String,
pub reg: bool,
pub requires_approval: bool,
}
#[derive(Clone, Debug)]
pub struct Contact {
pub username: String,
pub display_name: String,
pub url: XsdAnyUri,
pub avatar: XsdAnyUri,
}

View file

@ -1,4 +1,9 @@
use crate::{db::listen, error::MyError, state::State};
use crate::{
db::listen,
error::MyError,
jobs::{JobServer, QueryInstance, QueryNodeinfo},
state::State,
};
use activitystreams::primitives::XsdAnyUri;
use actix::clock::{delay_for, Duration};
use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config, Notification};
@ -9,7 +14,7 @@ use futures::{
use log::{debug, error, info, warn};
use std::sync::Arc;
async fn handle_notification(state: State, notif: Notification) {
async fn handle_notification(state: State, job_server: JobServer, notif: Notification) {
match notif.channel() {
"new_blocks" => {
info!("Caching block of {}", notif.payload());
@ -22,7 +27,9 @@ async fn handle_notification(state: State, notif: Notification) {
"new_listeners" => {
if let Ok(uri) = notif.payload().parse::<XsdAnyUri>() {
info!("Caching listener {}", uri);
state.cache_listener(uri).await;
state.cache_listener(uri.clone()).await;
let _ = job_server.queue_local(QueryInstance::new(uri.clone()));
let _ = job_server.queue_local(QueryNodeinfo::new(uri));
}
}
"rm_blocks" => {
@ -43,12 +50,14 @@ async fn handle_notification(state: State, notif: Notification) {
};
}
pub fn spawn(state: State, config: &crate::config::Config) -> Result<(), MyError> {
pub fn spawn(
state: State,
job_server: JobServer,
config: &crate::config::Config,
) -> Result<(), MyError> {
let config: Config = config.database_url().parse()?;
actix::spawn(async move {
let mut client;
loop {
let (new_client, mut conn) = match config.connect(NoTls).await {
Ok((client, conn)) => (client, conn),
@ -59,7 +68,7 @@ pub fn spawn(state: State, config: &crate::config::Config) -> Result<(), MyError
}
};
client = Arc::new(new_client);
let client = Arc::new(new_client);
let new_client = client.clone();
actix::spawn(async move {
@ -88,7 +97,7 @@ pub fn spawn(state: State, config: &crate::config::Config) -> Result<(), MyError
});
while let Some(n) = stream.next().await {
actix::spawn(handle_notification(state.clone(), n));
actix::spawn(handle_notification(state.clone(), job_server.clone(), n));
}
drop(client);

View file

@ -3,6 +3,7 @@ use crate::{
config::{Config, UrlKind},
db::Db,
error::MyError,
node::NodeCache,
requests::Requests,
};
use activitystreams::primitives::XsdAnyUri;
@ -28,9 +29,14 @@ pub struct State {
blocks: Arc<RwLock<HashSet<String>>>,
whitelists: Arc<RwLock<HashSet<String>>>,
listeners: Arc<RwLock<HashSet<XsdAnyUri>>>,
node_cache: NodeCache,
}
impl State {
pub fn node_cache(&self) -> NodeCache {
self.node_cache.clone()
}
pub fn requests(&self) -> Requests {
Requests::new(
self.config.generate_url(UrlKind::MainKey),
@ -191,6 +197,7 @@ impl State {
let (blocks, whitelists, listeners, private_key) = try_join!(f1, f2, f3, f4)?;
let public_key = private_key.to_public_key();
let listeners = Arc::new(RwLock::new(listeners));
Ok(State {
public_key,
@ -200,7 +207,8 @@ impl State {
actor_id_cache: Arc::new(RwLock::new(LruCache::new(1024 * 8))),
blocks: Arc::new(RwLock::new(blocks)),
whitelists: Arc::new(RwLock::new(whitelists)),
listeners: Arc::new(RwLock::new(listeners)),
listeners: listeners.clone(),
node_cache: NodeCache::new(listeners),
})
}
}

View file

@ -1,7 +1,6 @@
@use crate::{config::{Config, UrlKind}, templates::statics::index_css};
@use activitystreams::primitives::XsdAnyUri;
@use crate::{config::{Config, UrlKind}, templates::statics::index_css, node::Node};
@(listeners: &[XsdAnyUri], config: &Config)
@(nodes: &[Node], config: &Config)
<!doctype html>
<html>
@ -17,13 +16,23 @@
<main>
<section>
<h3>Connected Servers:</h3>
@if listeners.is_empty() {
@if nodes.is_empty() {
<p>There are no connected servers at this time.</p>
} else {
<ul>
@for listener in listeners {
@if let Some(domain) = listener.as_url().domain() {
<li>@domain</li>
@for node in nodes {
@if let Some(domain) = node.base.as_url().domain() {
<li>
<p><a href="@node.base">@domain</a></p>
@if let Some(info) = node.info.as_ref() {
<p>Running @info.software version @info.version</p>
@if info.reg {
<p>Registration is open</p>
} else {
<p>Registration is closed</p>
}
}
</li>
}
}
</ul>