Remove media caching, just proxy

This commit is contained in:
asonix 2022-11-17 22:39:26 -06:00
parent 094331a447
commit e9303ad9f6
8 changed files with 19 additions and 209 deletions

View file

@ -1,14 +1,7 @@
use crate::{
db::{Db, MediaMeta},
error::Error,
};
use crate::{db::Db, error::Error};
use activitystreams::iri_string::types::IriString;
use actix_web::web::Bytes;
use std::time::{Duration, SystemTime};
use uuid::Uuid;
static MEDIA_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 2);
#[derive(Clone, Debug)]
pub struct MediaCache {
db: Db,
@ -29,32 +22,6 @@ impl MediaCache {
self.db.media_url(uuid).await
}
#[tracing::instrument(level = "debug", name = "Is media outdated", skip(self))]
pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result<bool, Error> {
if let Some(meta) = self.db.media_meta(uuid).await? {
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
return Ok(false);
}
}
Ok(true)
}
#[tracing::instrument(level = "debug", name = "Get media bytes", skip(self))]
pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result<Option<(String, Bytes)>, Error> {
if let Some(meta) = self.db.media_meta(uuid).await? {
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
return self
.db
.media_bytes(uuid)
.await
.map(|opt| opt.map(|bytes| (meta.media_type, bytes)));
}
}
Ok(None)
}
#[tracing::instrument(name = "Store media url", skip_all, fields(url = url.to_string().as_str()))]
pub(crate) async fn store_url(&self, url: IriString) -> Result<Uuid, Error> {
let uuid = Uuid::new_v4();
@ -63,23 +30,4 @@ impl MediaCache {
Ok(uuid)
}
#[tracing::instrument(name = "store media bytes", skip(self, bytes))]
pub(crate) async fn store_bytes(
&self,
uuid: Uuid,
media_type: String,
bytes: Bytes,
) -> Result<(), Error> {
self.db
.save_bytes(
uuid,
MediaMeta {
media_type,
saved_at: SystemTime::now(),
},
bytes,
)
.await
}
}

View file

@ -3,7 +3,6 @@ use crate::{
error::{Error, ErrorKind},
};
use activitystreams::iri_string::types::IriString;
use actix_web::web::Bytes;
use rsa::{
pkcs8::{DecodePrivateKey, EncodePrivateKey},
RsaPrivateKey,
@ -26,8 +25,6 @@ struct Inner {
settings: Tree,
media_url_media_id: Tree,
media_id_media_url: Tree,
media_id_media_bytes: Tree,
media_id_media_meta: Tree,
actor_id_info: Tree,
actor_id_instance: Tree,
actor_id_contact: Tree,
@ -63,12 +60,6 @@ impl std::fmt::Debug for Actor {
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct MediaMeta {
pub(crate) media_type: String,
pub(crate) saved_at: SystemTime,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Info {
pub(crate) software: String,
@ -253,8 +244,6 @@ impl Db {
settings: db.open_tree("settings")?,
media_url_media_id: db.open_tree("media-url-media-id")?,
media_id_media_url: db.open_tree("media-id-media-url")?,
media_id_media_bytes: db.open_tree("media-id-media-bytes")?,
media_id_media_meta: db.open_tree("media-id-media-meta")?,
actor_id_info: db.open_tree("actor-id-info")?,
actor_id_instance: db.open_tree("actor-id-instance")?,
actor_id_contact: db.open_tree("actor-id-contact")?,
@ -392,25 +381,6 @@ impl Db {
.await
}
pub(crate) async fn save_bytes(
&self,
id: Uuid,
meta: MediaMeta,
bytes: Bytes,
) -> Result<(), Error> {
self.unblock(move |inner| {
let vec = serde_json::to_vec(&meta)?;
inner
.media_id_media_bytes
.insert(id.as_bytes(), bytes.as_ref())?;
inner.media_id_media_meta.insert(id.as_bytes(), vec)?;
Ok(())
})
.await
}
pub(crate) async fn media_id(&self, url: IriString) -> Result<Option<Uuid>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? {
@ -433,29 +403,6 @@ impl Db {
.await
}
pub(crate) async fn media_bytes(&self, id: Uuid) -> Result<Option<Bytes>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_bytes.get(id.as_bytes())? {
Ok(Some(Bytes::copy_from_slice(&ivec)))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn media_meta(&self, id: Uuid) -> Result<Option<MediaMeta>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_meta.get(id.as_bytes())? {
let meta = serde_json::from_slice(&ivec)?;
Ok(Some(meta))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn blocks(&self) -> Result<Vec<String>, Error> {
self.unblock(|inner| Ok(inner.blocks().collect())).await
}

View file

@ -130,9 +130,6 @@ pub(crate) enum ErrorKind {
#[error("{0}")]
HostMismatch(#[from] CheckError),
#[error("Invalid or missing content type")]
ContentType,
#[error("Couldn't flush buffer")]
FlushBuffer,

View file

@ -1,44 +0,0 @@
use crate::{error::Error, jobs::JobState};
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
use uuid::Uuid;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct CacheMedia {
uuid: Uuid,
}
impl CacheMedia {
pub(crate) fn new(uuid: Uuid) -> Self {
CacheMedia { uuid }
}
#[tracing::instrument(name = "Cache media", skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> {
if !state.media.is_outdated(self.uuid).await? {
return Ok(());
}
if let Some(url) = state.media.get_url(self.uuid).await? {
let (content_type, bytes) = state.requests.fetch_bytes(url.as_str()).await?;
state
.media
.store_bytes(self.uuid, content_type, bytes)
.await?;
}
Ok(())
}
}
impl ActixJob for CacheMedia {
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::CacheMedia";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View file

@ -1,7 +1,7 @@
use crate::{
config::UrlKind,
error::{Error, ErrorKind},
jobs::{cache_media::CacheMedia, Boolish, JobState},
jobs::{Boolish, JobState},
};
use activitystreams::{iri, iri_string::types::IriString};
use background_jobs::ActixJob;
@ -75,8 +75,6 @@ impl QueryInstance {
let avatar = state.config.generate_url(UrlKind::Media(uuid));
state.job_server.queue(CacheMedia::new(uuid)).await?;
state
.node_cache
.set_contact(

View file

@ -1,5 +1,4 @@
pub mod apub;
mod cache_media;
mod contact;
mod deliver;
mod deliver_many;
@ -8,7 +7,7 @@ mod nodeinfo;
mod process_listeners;
pub(crate) use self::{
cache_media::CacheMedia, contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany,
contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany,
instance::QueryInstance, nodeinfo::QueryNodeinfo,
};
@ -59,7 +58,6 @@ pub(crate) fn create_workers(
.register::<QueryNodeinfo>()
.register::<QueryInstance>()
.register::<Listeners>()
.register::<CacheMedia>()
.register::<QueryContact>()
.register::<apub::Announce>()
.register::<apub::Follow>()

View file

@ -1,6 +1,6 @@
use crate::error::{Error, ErrorKind};
use activitystreams::iri_string::types::IriString;
use actix_web::{http::header::Date, web::Bytes};
use actix_web::http::header::Date;
use awc::{error::SendRequestError, Client, ClientResponse};
use dashmap::DashMap;
use http_signature_normalization_actix::prelude::*;
@ -294,11 +294,9 @@ impl Requests {
Ok(serde_json::from_slice(body.as_ref())?)
}
#[tracing::instrument(name = "Fetch Bytes", skip(self), fields(signing_string))]
pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> {
let parsed_url = url.parse::<IriString>()?;
if !self.breakers.should_try(&parsed_url) {
#[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
pub(crate) async fn fetch_response(&self, url: IriString) -> Result<ClientResponse, Error> {
if !self.breakers.should_try(&url) {
return Err(ErrorKind::Breaker.into());
}
@ -307,9 +305,10 @@ impl Requests {
let client: Client = self.client.borrow().clone();
let res = client
.get(url)
.get(url.as_str())
.insert_header(("Accept", "*/*"))
.insert_header(Date(SystemTime::now().into()))
.no_decompress()
.signature(
self.config.clone(),
self.key_id.clone(),
@ -322,26 +321,9 @@ impl Requests {
.send()
.await;
let mut res = self.check_response(&parsed_url, res).await?;
let res = self.check_response(&url, res).await?;
let content_type = if let Some(content_type) = res.headers().get("content-type") {
if let Ok(s) = content_type.to_str() {
s.to_owned()
} else {
return Err(ErrorKind::ContentType.into());
}
} else {
return Err(ErrorKind::ContentType.into());
};
let bytes = match res.body().limit(1024 * 1024 * 4).await {
Err(e) => {
return Err(ErrorKind::ReceiveResponse(url.to_string(), e.to_string()).into());
}
Ok(bytes) => bytes,
};
Ok((content_type, bytes))
Ok(res)
}
#[tracing::instrument(

View file

@ -1,8 +1,5 @@
use crate::{data::MediaCache, error::Error, requests::Requests};
use actix_web::{
http::header::{CacheControl, CacheDirective},
web, HttpResponse,
};
use actix_web::{body::BodyStream, web, HttpResponse};
use uuid::Uuid;
#[tracing::instrument(name = "Media", skip(media, requests))]
@ -13,30 +10,17 @@ pub(crate) async fn route(
) -> Result<HttpResponse, Error> {
let uuid = uuid.into_inner();
if let Some((content_type, bytes)) = media.get_bytes(uuid).await? {
return Ok(cached(content_type, bytes));
}
if let Some(url) = media.get_url(uuid).await? {
let (content_type, bytes) = requests.fetch_bytes(url.as_str()).await?;
let res = requests.fetch_response(url).await?;
media
.store_bytes(uuid, content_type.clone(), bytes.clone())
.await?;
let mut response = HttpResponse::build(res.status());
return Ok(cached(content_type, bytes));
for (name, value) in res.headers().iter().filter(|(h, _)| *h != "connection") {
response.insert_header((name.clone(), value.clone()));
}
return Ok(response.body(BodyStream::new(res)));
}
Ok(HttpResponse::NotFound().finish())
}
fn cached(content_type: String, bytes: web::Bytes) -> HttpResponse {
HttpResponse::Ok()
.insert_header(CacheControl(vec![
CacheDirective::Public,
CacheDirective::MaxAge(60 * 60 * 24),
CacheDirective::Extension("immutable".to_owned(), None),
]))
.content_type(content_type)
.body(bytes)
}