diff --git a/migrations/2020-03-26-163641_create-media/down.sql b/migrations/2020-03-26-163641_create-media/down.sql new file mode 100644 index 0000000..a1e615c --- /dev/null +++ b/migrations/2020-03-26-163641_create-media/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE media; diff --git a/migrations/2020-03-26-163641_create-media/up.sql b/migrations/2020-03-26-163641_create-media/up.sql new file mode 100644 index 0000000..8346739 --- /dev/null +++ b/migrations/2020-03-26-163641_create-media/up.sql @@ -0,0 +1,10 @@ +-- Your SQL goes here +CREATE TABLE media ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + media_id UUID UNIQUE NOT NULL, + url TEXT UNIQUE NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +SELECT diesel_manage_updated_at('media'); diff --git a/src/data/actor.rs b/src/data/actor.rs index d69f773..5a0a79b 100644 --- a/src/data/actor.rs +++ b/src/data/actor.rs @@ -92,11 +92,15 @@ impl ActorCache { } pub async fn unfollower(&self, actor: &Actor) -> Result, MyError> { - let conn = self.db.pool().get().await?; - - let row_opt = conn + let row_opt = self + .db + .pool() + .get() + .await? .query_opt( - "DELETE FROM actors WHERE actor_id = $1::TEXT RETURNING listener_id;", + "DELETE FROM actors + WHERE actor_id = $1::TEXT + RETURNING listener_id;", &[&actor.id.as_str()], ) .await?; @@ -109,9 +113,14 @@ impl ActorCache { let listener_id: Uuid = row.try_get(0)?; - let row_opt = conn + let row_opt = self + .db + .pool() + .get() + .await? .query_opt( - "SELECT FROM actors WHERE listener_id = $1::UUID;", + "SELECT FROM actors + WHERE listener_id = $1::UUID;", &[&listener_id], ) .await?; @@ -124,9 +133,11 @@ impl ActorCache { } async fn lookup(&self, id: &XsdAnyUri) -> Result, MyError> { - let conn = self.db.pool().get().await?; - - let row_opt = conn + let row_opt = self + .db + .pool() + .get() + .await? .query_opt( "SELECT listeners.actor_id, actors.public_key, actors.public_key_id FROM listeners @@ -158,9 +169,11 @@ impl ActorCache { } async fn save(&self, actor: Actor) -> Result<(), MyError> { - let conn = self.db.pool().get().await?; - - let row_opt = conn + let row_opt = self + .db + .pool() + .get() + .await? .query_opt( "SELECT id FROM listeners WHERE actor_id = $1::TEXT LIMIT 1;", &[&actor.inbox.as_str()], @@ -175,14 +188,35 @@ impl ActorCache { let listener_id: Uuid = row.try_get(0)?; - conn.execute( - "INSERT INTO actors (actor_id, public_key, public_key_id, listener_id, created_at, updated_at) - VALUES ($1::TEXT, $2::TEXT, $3::TEXT, $4::UUID, 'now', 'now') - ON CONFLICT (actor_id) - DO UPDATE SET public_key = $2::TEXT;", - &[&actor.id.as_str(), &actor.public_key, &actor.public_key_id.as_str(), &listener_id], - ) - .await?; + self.db + .pool() + .get() + .await? + .execute( + "INSERT INTO actors ( + actor_id, + public_key, + public_key_id, + listener_id, + created_at, + updated_at + ) VALUES ( + $1::TEXT, + $2::TEXT, + $3::TEXT, + $4::UUID, + 'now', + 'now' + ) ON CONFLICT (actor_id) + DO UPDATE SET public_key = $2::TEXT;", + &[ + &actor.id.as_str(), + &actor.public_key, + &actor.public_key_id.as_str(), + &listener_id, + ], + ) + .await?; Ok(()) } @@ -192,15 +226,17 @@ impl ActorCache { public_key: &str, public_key_id: &XsdAnyUri, ) -> Result<(), MyError> { - let conn = self.db.pool().get().await?; - - conn.execute( - "UPDATE actors - SET public_key = $2::TEXT, public_key_id = $3::TEXT - WHERE actor_id = $1::TEXT;", - &[&id.as_str(), &public_key, &public_key_id.as_str()], - ) - .await?; + self.db + .pool() + .get() + .await? + .execute( + "UPDATE actors + SET public_key = $2::TEXT, public_key_id = $3::TEXT + WHERE actor_id = $1::TEXT;", + &[&id.as_str(), &public_key, &public_key_id.as_str()], + ) + .await?; Ok(()) } @@ -223,9 +259,13 @@ impl ActorCache { } async fn rehydrate(&self) -> Result<(), MyError> { - let conn = self.db.pool().get().await?; - - let rows = conn.query("SELECT actor_id FROM actors;", &[]).await?; + let rows = self + .db + .pool() + .get() + .await? + .query("SELECT actor_id FROM actors;", &[]) + .await?; let actor_ids = rows .into_iter() diff --git a/src/data/media.rs b/src/data/media.rs index e854ec4..6b9f426 100644 --- a/src/data/media.rs +++ b/src/data/media.rs @@ -1,5 +1,7 @@ +use crate::{db::Db, error::MyError}; use activitystreams::primitives::XsdAnyUri; use bytes::Bytes; +use futures::join; use lru::LruCache; use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{Mutex, RwLock}; @@ -10,45 +12,154 @@ static MEDIA_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 2); #[derive(Clone)] pub struct Media { + db: Db, inverse: Arc>>, url_cache: Arc>>, byte_cache: Arc>>, } impl Media { - pub fn new() -> Self { + pub fn new(db: Db) -> Self { Media { + db, inverse: Arc::new(Mutex::new(HashMap::new())), url_cache: Arc::new(Mutex::new(LruCache::new(128))), byte_cache: Arc::new(RwLock::new(TtlCache::new(128))), } } - pub async fn get_uuid(&self, url: &XsdAnyUri) -> Option { - let uuid = self.inverse.lock().await.get(url).cloned()?; + pub async fn get_uuid(&self, url: &XsdAnyUri) -> Result, MyError> { + let res = self.inverse.lock().await.get(url).cloned(); + let uuid = match res { + Some(uuid) => uuid, + _ => { + let row_opt = self + .db + .pool() + .get() + .await? + .query_opt( + "SELECT media_id + FROM media + WHERE url = $1::TEXT + LIMIT 1;", + &[&url.as_str()], + ) + .await?; + + if let Some(row) = row_opt { + let uuid: Uuid = row.try_get(0)?; + self.inverse.lock().await.insert(url.clone(), uuid); + uuid + } else { + return Ok(None); + } + } + }; if self.url_cache.lock().await.contains(&uuid) { - return Some(uuid); + return Ok(Some(uuid)); + } + + let row_opt = self + .db + .pool() + .get() + .await? + .query_opt( + "SELECT id + FROM media + WHERE + url = $1::TEXT + AND + media_id = $2::UUID + LIMIT 1;", + &[&url.as_str(), &uuid], + ) + .await?; + + if row_opt.is_some() { + self.url_cache.lock().await.put(uuid, url.clone()); + + return Ok(Some(uuid)); } self.inverse.lock().await.remove(url); - None + Ok(None) } - pub async fn get_url(&self, uuid: Uuid) -> Option { - self.url_cache.lock().await.get(&uuid).cloned() + pub async fn get_url(&self, uuid: Uuid) -> Result, MyError> { + match self.url_cache.lock().await.get(&uuid).cloned() { + Some(url) => return Ok(Some(url)), + _ => (), + } + + let row_opt = self + .db + .pool() + .get() + .await? + .query_opt( + "SELECT url + FROM media + WHERE media_id = $1::UUID + LIMIT 1;", + &[&uuid], + ) + .await?; + + if let Some(row) = row_opt { + let url: String = row.try_get(0)?; + let url: XsdAnyUri = url.parse()?; + return Ok(Some(url)); + } + + Ok(None) } pub async fn get_bytes(&self, uuid: Uuid) -> Option<(String, Bytes)> { self.byte_cache.read().await.get(&uuid).cloned() } - pub async fn store_url(&self, url: &XsdAnyUri) -> Uuid { + pub async fn store_url(&self, url: &XsdAnyUri) -> Result { let uuid = Uuid::new_v4(); - self.inverse.lock().await.insert(url.clone(), uuid); - self.url_cache.lock().await.put(uuid, url.clone()); - uuid + + let (_, _, res) = join!( + async { + self.inverse.lock().await.insert(url.clone(), uuid); + }, + async { + self.url_cache.lock().await.put(uuid, url.clone()); + }, + async { + self.db + .pool() + .get() + .await? + .execute( + "INSERT INTO media ( + media_id, + url, + created_at, + updated_at + ) VALUES ( + $1::UUID, + $2::TEXT, + 'now', + 'now' + ) ON CONFLICT (media_id) + DO UPDATE SET url = $2::TEXT;", + &[&uuid, &url.as_str()], + ) + .await?; + Ok(()) as Result<(), MyError> + } + ); + + res?; + + Ok(uuid) } pub async fn store_bytes(&self, uuid: Uuid, content_type: String, bytes: Bytes) { diff --git a/src/data/node.rs b/src/data/node.rs index b0fc886..c816431 100644 --- a/src/data/node.rs +++ b/src/data/node.rs @@ -118,9 +118,11 @@ impl NodeCache { } async fn do_bust_by_id(&self, id: Uuid) -> Result<(), MyError> { - let conn = self.db.pool().get().await?; - - let row_opt = conn + let row_opt = self + .db + .pool() + .get() + .await? .query_opt( "SELECT ls.actor_id FROM listeners AS ls @@ -140,16 +142,17 @@ impl NodeCache { let listener: String = row.try_get(0)?; let listener: XsdAnyUri = listener.parse()?; - let mut write_guard = self.nodes.write().await; - write_guard.remove(&listener); + self.nodes.write().await.remove(&listener); Ok(()) } async fn do_cache_by_id(&self, id: Uuid) -> Result<(), MyError> { - let conn = self.db.pool().get().await?; - - let row_opt = conn + let row_opt = self + .db + .pool() + .get() + .await? .query_opt( "SELECT ls.actor_id, nd.nodeinfo, nd.instance, nd.contact FROM nodes AS nd @@ -172,19 +175,21 @@ impl NodeCache { let instance: Option> = row.try_get(2)?; let contact: Option> = row.try_get(3)?; - let mut write_guard = self.nodes.write().await; - let node = write_guard - .entry(listener.clone()) - .or_insert(Node::new(listener)); + { + let mut write_guard = self.nodes.write().await; + let node = write_guard + .entry(listener.clone()) + .or_insert(Node::new(listener)); - if let Some(info) = info { - node.info = Some(info.0); - } - if let Some(instance) = instance { - node.instance = Some(instance.0); - } - if let Some(contact) = contact { - node.contact = Some(contact.0); + if let Some(info) = info { + node.info = Some(info.0); + } + if let Some(instance) = instance { + node.instance = Some(instance.0); + } + if let Some(contact) = contact { + node.contact = Some(contact.0); + } } Ok(()) @@ -203,12 +208,15 @@ impl NodeCache { return Ok(()); } - let mut write_guard = self.nodes.write().await; - let node = write_guard - .entry(listener.clone()) - .or_insert(Node::new(listener.clone())); - node.set_info(software, version, reg); - self.save(listener, node).await?; + let node = { + let mut write_guard = self.nodes.write().await; + let node = write_guard + .entry(listener.clone()) + .or_insert(Node::new(listener.clone())); + node.set_info(software, version, reg); + node.clone() + }; + self.save(listener, &node).await?; Ok(()) } @@ -227,12 +235,15 @@ impl NodeCache { return Ok(()); } - let mut write_guard = self.nodes.write().await; - let node = write_guard - .entry(listener.clone()) - .or_insert(Node::new(listener.clone())); - node.set_instance(title, description, version, reg, requires_approval); - self.save(listener, node).await?; + let node = { + let mut write_guard = self.nodes.write().await; + let node = write_guard + .entry(listener.clone()) + .or_insert(Node::new(listener.clone())); + node.set_instance(title, description, version, reg, requires_approval); + node.clone() + }; + self.save(listener, &node).await?; Ok(()) } @@ -250,19 +261,24 @@ impl NodeCache { return Ok(()); } - let mut write_guard = self.nodes.write().await; - let node = write_guard - .entry(listener.clone()) - .or_insert(Node::new(listener.clone())); - node.set_contact(username, display_name, url, avatar); - self.save(listener, node).await?; + let node = { + let mut write_guard = self.nodes.write().await; + let node = write_guard + .entry(listener.clone()) + .or_insert(Node::new(listener.clone())); + node.set_contact(username, display_name, url, avatar); + node.clone() + }; + self.save(listener, &node).await?; Ok(()) } pub async fn save(&self, listener: &XsdAnyUri, node: &Node) -> Result<(), MyError> { - let conn = self.db.pool().get().await?; - - let row_opt = conn + let row_opt = self + .db + .pool() + .get() + .await? .query_opt( "SELECT id FROM listeners WHERE actor_id = $1::TEXT LIMIT 1;", &[&listener.as_str()], @@ -275,8 +291,12 @@ impl NodeCache { return Err(MyError::NotSubscribed(listener.as_str().to_owned())); }; - conn.execute( - "INSERT INTO nodes ( + self.db + .pool() + .get() + .await? + .execute( + "INSERT INTO nodes ( listener_id, nodeinfo, instance, @@ -295,14 +315,14 @@ impl NodeCache { nodeinfo = $2::JSONB, instance = $3::JSONB, contact = $4::JSONB;", - &[ - &id, - &Json(&node.info), - &Json(&node.instance), - &Json(&node.contact), - ], - ) - .await?; + &[ + &id, + &Json(&node.info), + &Json(&node.instance), + &Json(&node.contact), + ], + ) + .await?; Ok(()) } } diff --git a/src/data/state.rs b/src/data/state.rs index 730357d..0f55eb2 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -47,34 +47,29 @@ impl State { } pub async fn bust_whitelist(&self, whitelist: &str) { - let mut write_guard = self.whitelists.write().await; - write_guard.remove(whitelist); + self.whitelists.write().await.remove(whitelist); } pub async fn bust_block(&self, block: &str) { - let mut write_guard = self.blocks.write().await; - write_guard.remove(block); + self.blocks.write().await.remove(block); } pub async fn bust_listener(&self, inbox: &XsdAnyUri) { - let mut write_guard = self.listeners.write().await; - write_guard.remove(inbox); + self.listeners.write().await.remove(inbox); } pub async fn listeners(&self) -> Vec { - let read_guard = self.listeners.read().await; - read_guard.iter().cloned().collect() + self.listeners.read().await.iter().cloned().collect() } pub async fn blocks(&self) -> Vec { - let read_guard = self.blocks.read().await; - read_guard.iter().cloned().collect() + self.blocks.read().await.iter().cloned().collect() } pub async fn listeners_without(&self, inbox: &XsdAnyUri, domain: &str) -> Vec { - let read_guard = self.listeners.read().await; - - read_guard + self.listeners + .read() + .await .iter() .filter_map(|listener| { if let Some(dom) = listener.as_url().domain() { @@ -94,8 +89,7 @@ impl State { } if let Some(host) = actor_id.as_url().host() { - let read_guard = self.whitelists.read().await; - return read_guard.contains(&host.to_string()); + self.whitelists.read().await.contains(&host.to_string()); } false @@ -103,43 +97,34 @@ impl State { pub async fn is_blocked(&self, actor_id: &XsdAnyUri) -> bool { if let Some(host) = actor_id.as_url().host() { - let read_guard = self.blocks.read().await; - return read_guard.contains(&host.to_string()); + self.blocks.read().await.contains(&host.to_string()); } true } pub async fn is_listener(&self, actor_id: &XsdAnyUri) -> bool { - let read_guard = self.listeners.read().await; - read_guard.contains(actor_id) + self.listeners.read().await.contains(actor_id) } pub async fn is_cached(&self, object_id: &XsdAnyUri) -> bool { - let cache = self.actor_id_cache.clone(); - - let read_guard = cache.read().await; - read_guard.contains(object_id) + self.actor_id_cache.read().await.contains(object_id) } pub async fn cache(&self, object_id: XsdAnyUri, actor_id: XsdAnyUri) { - let mut write_guard = self.actor_id_cache.write().await; - write_guard.put(object_id, actor_id); + self.actor_id_cache.write().await.put(object_id, actor_id); } pub async fn cache_block(&self, host: String) { - let mut write_guard = self.blocks.write().await; - write_guard.insert(host); + self.blocks.write().await.insert(host); } pub async fn cache_whitelist(&self, host: String) { - let mut write_guard = self.whitelists.write().await; - write_guard.insert(host); + self.whitelists.write().await.insert(host); } pub async fn cache_listener(&self, listener: XsdAnyUri) { - let mut write_guard = self.listeners.write().await; - write_guard.insert(listener); + self.listeners.write().await.insert(listener); } pub async fn rehydrate(&self, db: &Db) -> Result<(), MyError> { @@ -151,16 +136,13 @@ impl State { join!( async move { - let mut write_guard = self.listeners.write().await; - *write_guard = listeners; + *self.listeners.write().await = listeners; }, async move { - let mut write_guard = self.whitelists.write().await; - *write_guard = whitelists; + *self.whitelists.write().await = whitelists; }, async move { - let mut write_guard = self.blocks.write().await; - *write_guard = blocks; + *self.blocks.write().await = blocks; } ); diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 8de0185..9f86241 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -45,10 +45,10 @@ impl QueryInstance { }; if let Some(mut contact) = instance.contact { - if let Some(uuid) = state.media.get_uuid(&contact.avatar).await { + if let Some(uuid) = state.media.get_uuid(&contact.avatar).await? { contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).parse()?; } else { - let uuid = state.media.store_url(&contact.avatar).await; + let uuid = state.media.store_url(&contact.avatar).await?; contact.avatar = state.config.generate_url(UrlKind::Media(uuid)).parse()?; } diff --git a/src/main.rs b/src/main.rs index 266c598..88d96b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -68,7 +68,7 @@ async fn main() -> Result<(), anyhow::Error> { return Ok(()); } - let media = Media::new(); + let media = Media::new(db.clone()); let state = State::hydrate(config.clone(), &db).await?; let actors = ActorCache::new(db.clone()); let job_server = create_server(db.clone()); diff --git a/src/routes/media.rs b/src/routes/media.rs index 30cfc53..c37f503 100644 --- a/src/routes/media.rs +++ b/src/routes/media.rs @@ -13,7 +13,7 @@ pub async fn route( return Ok(HttpResponse::Ok().content_type(content_type).body(bytes)); } - if let Some(url) = media.get_url(uuid).await { + if let Some(url) = media.get_url(uuid).await? { let (content_type, bytes) = requests.fetch_bytes(url.as_str()).await?; media diff --git a/src/schema.rs b/src/schema.rs index c60ab10..e358f4c 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -43,6 +43,16 @@ table! { } } +table! { + media (id) { + id -> Uuid, + media_id -> Uuid, + url -> Text, + created_at -> Timestamp, + updated_at -> Timestamp, + } +} + table! { nodes (id) { id -> Uuid, @@ -82,6 +92,7 @@ allow_tables_to_appear_in_same_query!( blocks, jobs, listeners, + media, nodes, settings, whitelists,