Make pict-rs generic over file storage

This commit is contained in:
Aode (lion) 2021-10-22 23:48:56 -05:00
parent f67aeb92fa
commit 48557bc2ea
18 changed files with 1088 additions and 761 deletions

1
Cargo.lock generated
View file

@ -1155,6 +1155,7 @@ dependencies = [
"actix-server",
"actix-web",
"anyhow",
"async-trait",
"awc",
"base64",
"dashmap",

View file

@ -19,6 +19,7 @@ actix-rt = "2.2.0"
actix-server = "2.0.0-beta.6"
actix-web = { version = "4.0.0-beta.10", default-features = false }
anyhow = "1.0"
async-trait = "0.1.51"
awc = { version = "3.0.0-beta.9", default-features = false, features = ["rustls"] }
base64 = "0.13.0"
dashmap = "4.0.2"

View file

@ -1,6 +1,8 @@
use std::{collections::HashSet, net::SocketAddr, path::PathBuf};
use url::Url;
use crate::magick::ValidInputType;
#[derive(Clone, Debug, structopt::StructOpt)]
pub(crate) struct Config {
#[structopt(
@ -130,7 +132,7 @@ impl Config {
#[error("Invalid format supplied, {0}")]
pub(crate) struct FormatError(String);
#[derive(Clone, Debug)]
#[derive(Copy, Clone, Debug)]
pub(crate) enum Format {
Jpeg,
Png,
@ -153,6 +155,14 @@ impl Format {
Format::Webp => "WEBP",
}
}
pub(crate) fn to_hint(&self) -> Option<ValidInputType> {
match self {
Format::Jpeg => Some(ValidInputType::Jpeg),
Format::Png => Some(ValidInputType::Png),
Format::Webp => Some(ValidInputType::Webp),
}
}
}
impl std::str::FromStr for Format {

View file

@ -69,6 +69,9 @@ pub(crate) enum UploadError {
#[error(transparent)]
StripPrefix(#[from] std::path::StripPrefixError),
#[error(transparent)]
FileStore(#[from] crate::store::file_store::FileError),
#[error("Provided process path is invalid")]
ParsePath,
@ -114,18 +117,12 @@ pub(crate) enum UploadError {
#[error("Tried to save an image with an already-taken name")]
DuplicateAlias,
#[error("Tried to create file, but file already exists")]
FileExists,
#[error("{0}")]
Json(#[from] serde_json::Error),
#[error("Range header not satisfiable")]
Range,
#[error("Command failed")]
Status,
#[error(transparent)]
Limit(#[from] super::LimitError),
}

View file

@ -1,11 +1,9 @@
use crate::{
error::{Error, UploadError},
process::Process,
};
use crate::{error::Error, process::Process, store::Store};
use actix_web::web::Bytes;
use tokio::{io::AsyncRead, process::Command};
use tokio::io::AsyncRead;
use tracing::instrument;
#[derive(Debug)]
pub(crate) enum InputFormat {
Gif,
Mp4,
@ -71,48 +69,29 @@ pub(crate) fn to_mp4_bytes(
Ok(process.bytes_read(input).unwrap())
}
#[instrument(name = "Create video thumbnail", skip(from, to))]
pub(crate) async fn thumbnail<P1, P2>(
from: P1,
to: P2,
#[instrument(name = "Create video thumbnail")]
pub(crate) async fn thumbnail<S: Store>(
store: S,
from: S::Identifier,
input_format: InputFormat,
format: ThumbnailFormat,
) -> Result<(), Error>
where
P1: AsRef<std::path::Path>,
P2: AsRef<std::path::Path>,
{
let command = "ffmpeg";
let first_arg = "-i";
let args = [
"-vframes",
"1",
"-codec",
format.as_codec(),
"-f",
format.as_format(),
];
) -> Result<impl AsyncRead + Unpin, Error> {
let process = Process::run(
"ffmpeg",
&[
"-f",
input_format.as_format(),
"-i",
"pipe:",
"-vframes",
"1",
"-codec",
format.as_codec(),
"-f",
format.as_format(),
"pipe:",
],
)?;
tracing::info!(
"Spawning command: {} {} {:?} {:?} {:?}",
command,
first_arg,
from.as_ref(),
args,
to.as_ref()
);
let mut child = Command::new(command)
.arg(first_arg)
.arg(from.as_ref())
.args(args)
.arg(to.as_ref())
.spawn()?;
let status = child.wait().await?;
if !status.success() {
return Err(UploadError::Status.into());
}
Ok(())
Ok(process.store_read(store, from).unwrap())
}

View file

@ -2,6 +2,7 @@ use crate::{
config::Format,
error::{Error, UploadError},
process::Process,
store::Store,
};
use actix_web::web::Bytes;
use tokio::{
@ -10,6 +11,15 @@ use tokio::{
};
use tracing::instrument;
pub(crate) fn details_hint(filename: &str) -> Option<ValidInputType> {
if filename.ends_with(".mp4") {
Some(ValidInputType::Mp4)
} else {
None
}
}
#[derive(Debug)]
pub(crate) enum ValidInputType {
Mp4,
Gif,
@ -18,6 +28,18 @@ pub(crate) enum ValidInputType {
Webp,
}
impl ValidInputType {
fn to_str(&self) -> &'static str {
match self {
Self::Mp4 => "MP4",
Self::Gif => "GIF",
Self::Png => "PNG",
Self::Jpeg => "JPEG",
Self::Webp => "WEBP",
}
}
}
#[derive(Debug)]
pub(crate) struct Details {
pub(crate) mime_type: mime::Mime,
@ -32,10 +54,19 @@ pub(crate) fn clear_metadata_bytes_read(input: Bytes) -> std::io::Result<impl As
}
#[instrument(name = "Getting details from input bytes", skip(input))]
pub(crate) async fn details_bytes(input: Bytes) -> Result<Details, Error> {
pub(crate) async fn details_bytes(
input: Bytes,
hint: Option<ValidInputType>,
) -> Result<Details, Error> {
let last_arg = if let Some(expected_format) = hint {
format!("{}:-", expected_format.to_str())
} else {
"-".to_owned()
};
let process = Process::run(
"magick",
&["identify", "-ping", "-format", "%w %h | %m\n", "-"],
&["identify", "-ping", "-format", "%w %h | %m\n", &last_arg],
)?;
let mut reader = process.bytes_read(input).unwrap();
@ -64,22 +95,28 @@ pub(crate) fn convert_bytes_read(
Ok(process.bytes_read(input).unwrap())
}
pub(crate) async fn details<P>(file: P) -> Result<Details, Error>
where
P: AsRef<std::path::Path>,
{
let command = "magick";
let args = ["identify", "-ping", "-format", "%w %h | %m\n"];
let last_arg = file.as_ref();
pub(crate) async fn details_store<S: Store>(
store: S,
identifier: S::Identifier,
expected_format: Option<ValidInputType>,
) -> Result<Details, Error> {
let last_arg = if let Some(expected_format) = expected_format {
format!("{}:-", expected_format.to_str())
} else {
"-".to_owned()
};
tracing::info!("Spawning command: {} {:?} {:?}", command, args, last_arg);
let output = Command::new(command)
.args(args)
.arg(last_arg)
.output()
.await?;
let process = Process::run(
"magick",
&["identify", "-ping", "-format", "%w %h | %m\n", &last_arg],
)?;
let s = String::from_utf8_lossy(&output.stdout);
let mut reader = process.store_read(store, identifier).unwrap();
let mut output = Vec::new();
reader.read_to_end(&mut output).await?;
let s = String::from_utf8_lossy(&output);
parse_details(s)
}
@ -137,12 +174,13 @@ fn parse_details(s: std::borrow::Cow<'_, str>) -> Result<Details, Error> {
#[instrument(name = "Getting input type from bytes", skip(input))]
pub(crate) async fn input_type_bytes(input: Bytes) -> Result<ValidInputType, Error> {
details_bytes(input).await?.validate_input()
details_bytes(input, None).await?.validate_input()
}
#[instrument(name = "Spawning process command", skip(input))]
pub(crate) fn process_image_file_read(
input: crate::file::File,
#[instrument(name = "Spawning process command")]
pub(crate) fn process_image_store_read<S: Store>(
store: S,
identifier: S::Identifier,
args: Vec<String>,
format: Format,
) -> std::io::Result<impl AsyncRead + Unpin> {
@ -157,7 +195,7 @@ pub(crate) fn process_image_file_read(
.arg(last_arg),
)?;
Ok(process.file_read(input).unwrap())
Ok(process.store_read(store, identifier).unwrap())
}
impl Details {

View file

@ -21,7 +21,6 @@ use tracing::{debug, error, info, instrument, Span};
use tracing_actix_web::TracingLogger;
use tracing_awc::Propagate;
use tracing_futures::Instrument;
use uuid::Uuid;
mod concurrent_processor;
mod config;
@ -29,25 +28,29 @@ mod either;
mod error;
mod exiftool;
mod ffmpeg;
mod file;
mod init_tracing;
mod magick;
mod map_error;
mod middleware;
mod migrate;
mod process;
mod processor;
mod range;
mod store;
mod upload_manager;
mod validate;
use crate::{magick::details_hint, store::file_store::FileStore};
use self::{
concurrent_processor::CancelSafeProcessor,
config::{Config, Format},
either::Either,
error::{Error, UploadError},
file::CrateError,
init_tracing::init_tracing,
middleware::{Deadline, Internal},
migrate::LatestDb,
store::Store,
upload_manager::{Details, UploadManager, UploadManagerSession},
validate::{image_webp, video_mp4},
};
@ -57,119 +60,19 @@ const MINUTES: u32 = 60;
const HOURS: u32 = 60 * MINUTES;
const DAYS: u32 = 24 * HOURS;
static TMP_DIR: Lazy<PathBuf> = Lazy::new(|| {
let tmp_nonce = Uuid::new_v4();
let mut path = std::env::temp_dir();
path.push(format!("pict-rs-{}", tmp_nonce));
path
});
static CONFIG: Lazy<Config> = Lazy::new(Config::from_args);
static PROCESS_SEMAPHORE: Lazy<Semaphore> =
Lazy::new(|| Semaphore::new(num_cpus::get().saturating_sub(1).max(1)));
// try moving a file
#[instrument(name = "Moving file")]
async fn safe_move_file(from: PathBuf, to: PathBuf) -> Result<(), Error> {
if let Some(path) = to.parent() {
debug!("Creating directory {:?}", path);
tokio::fs::create_dir_all(path).await?;
}
debug!("Checking if {:?} already exists", to);
if let Err(e) = tokio::fs::metadata(&to).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Err(UploadError::FileExists.into());
}
debug!("Moving {:?} to {:?}", from, to);
tokio::fs::copy(&from, to).await?;
tokio::fs::remove_file(from).await?;
Ok(())
}
async fn safe_create_parent<P>(path: P) -> Result<(), Error>
where
P: AsRef<std::path::Path>,
{
if let Some(path) = path.as_ref().parent() {
debug!("Creating directory {:?}", path);
tokio::fs::create_dir_all(path).await?;
}
Ok(())
}
// Try writing to a file
#[instrument(name = "Saving file", skip(bytes))]
async fn safe_save_file(path: PathBuf, bytes: web::Bytes) -> Result<(), Error> {
if let Some(path) = path.parent() {
// create the directory for the file
debug!("Creating directory {:?}", path);
tokio::fs::create_dir_all(path).await?;
}
// Only write the file if it doesn't already exist
debug!("Checking if {:?} already exists", path);
if let Err(e) = tokio::fs::metadata(&path).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Ok(());
}
// Open the file for writing
debug!("Creating {:?}", path);
let mut file = crate::file::File::create(&path).await?;
// try writing
debug!("Writing to {:?}", path);
if let Err(e) = file.write_from_bytes(bytes).await {
error!("Error writing {:?}, {}", path, e);
// remove file if writing failed before completion
tokio::fs::remove_file(path).await?;
return Err(e.into());
}
debug!("{:?} written", path);
Ok(())
}
pub(crate) fn tmp_file() -> PathBuf {
let s: String = Uuid::new_v4().to_string();
let name = format!("{}.tmp", s);
let mut path = TMP_DIR.clone();
path.push(&name);
path
}
fn to_ext(mime: mime::Mime) -> Result<&'static str, Error> {
if mime == mime::IMAGE_PNG {
Ok(".png")
} else if mime == mime::IMAGE_JPEG {
Ok(".jpg")
} else if mime == video_mp4() {
Ok(".mp4")
} else if mime == image_webp() {
Ok(".webp")
} else {
Err(UploadError::UnsupportedFormat.into())
}
}
/// Handle responding to succesful uploads
#[instrument(name = "Uploaded files", skip(value, manager))]
async fn upload(
value: Value<UploadManagerSession>,
manager: web::Data<UploadManager>,
) -> Result<HttpResponse, Error> {
async fn upload<S: Store>(
value: Value<UploadManagerSession<S>>,
manager: web::Data<UploadManager<S>>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let images = value
.map()
.and_then(|mut m| m.remove("images"))
@ -187,19 +90,23 @@ async fn upload(
let delete_token = image.result.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?;
let path = manager.path_from_filename(name.clone()).await?;
let identifier = manager.identifier_from_filename(name.clone()).await?;
let details = manager.variant_details(path.clone(), name.clone()).await?;
let details = manager
.variant_details(identifier.clone(), name.clone())
.await?;
let details = if let Some(details) = details {
debug!("details exist");
details
} else {
debug!("generating new details from {:?}", path);
let new_details = Details::from_path(path.clone()).await?;
debug!("storing details for {:?} {}", path, name);
debug!("generating new details from {:?}", identifier);
let hint = details_hint(&name);
let new_details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
debug!("storing details for {:?} {}", identifier, name);
manager
.store_variant_details(path, name, &new_details)
.store_variant_details(identifier, name, &new_details)
.await?;
debug!("stored");
new_details
@ -282,11 +189,14 @@ where
/// download an image from a URL
#[instrument(name = "Downloading file", skip(client, manager))]
async fn download(
async fn download<S: Store>(
client: web::Data<Client>,
manager: web::Data<UploadManager>,
manager: web::Data<UploadManager<S>>,
query: web::Query<UrlQuery>,
) -> Result<HttpResponse, Error> {
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let res = client.get(&query.url).propagate().send().await?;
if !res.status().is_success() {
@ -294,7 +204,7 @@ async fn download(
}
let mut stream = Limit::new(
CrateError::new(res),
map_error::map_crate_error(res),
(CONFIG.max_file_size() * MEGABYTES) as u64,
);
@ -308,16 +218,20 @@ async fn download(
let delete_token = session.delete_token().await?;
let name = manager.from_alias(alias.to_owned()).await?;
let path = manager.path_from_filename(name.clone()).await?;
let identifier = manager.identifier_from_filename(name.clone()).await?;
let details = manager.variant_details(path.clone(), name.clone()).await?;
let details = manager
.variant_details(identifier.clone(), name.clone())
.await?;
let details = if let Some(details) = details {
details
} else {
let new_details = Details::from_path(path.clone()).await?;
let hint = details_hint(&name);
let new_details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(path, name, &new_details)
.store_variant_details(identifier, name, &new_details)
.await?;
new_details
};
@ -335,10 +249,13 @@ async fn download(
/// Delete aliases and files
#[instrument(name = "Deleting file", skip(manager))]
async fn delete(
manager: web::Data<UploadManager>,
async fn delete<S: Store>(
manager: web::Data<UploadManager<S>>,
path_entries: web::Path<(String, String)>,
) -> Result<HttpResponse, Error> {
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let (alias, token) = path_entries.into_inner();
manager.delete(token, alias).await?;
@ -348,12 +265,15 @@ async fn delete(
type ProcessQuery = Vec<(String, String)>;
async fn prepare_process(
async fn prepare_process<S: Store>(
query: web::Query<ProcessQuery>,
ext: &str,
manager: &UploadManager,
manager: &UploadManager<S>,
filters: &Option<HashSet<String>>,
) -> Result<(Format, String, PathBuf, Vec<String>), Error> {
) -> Result<(Format, String, PathBuf, Vec<String>), Error>
where
Error: From<S::Error>,
{
let (alias, operations) =
query
.into_inner()
@ -394,21 +314,24 @@ async fn prepare_process(
}
#[instrument(name = "Fetching derived details", skip(manager, filters))]
async fn process_details(
async fn process_details<S: Store>(
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
manager: web::Data<UploadManager<S>>,
filters: web::Data<Option<HashSet<String>>>,
) -> Result<HttpResponse, Error> {
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let (_, name, thumbnail_path, _) =
prepare_process(query, ext.as_str(), &manager, &filters).await?;
let real_path = manager
.variant_path(&thumbnail_path, &name)
let identifier = manager
.variant_identifier(&thumbnail_path, &name)
.await?
.ok_or(UploadError::MissingAlias)?;
let details = manager.variant_details(real_path, name).await?;
let details = manager.variant_details(identifier, name).await?;
let details = details.ok_or(UploadError::NoFiles)?;
@ -417,55 +340,42 @@ async fn process_details(
/// Process files
#[instrument(name = "Serving processed image", skip(manager, filters))]
async fn process(
async fn process<S: Store + 'static>(
range: Option<range::RangeHeader>,
query: web::Query<ProcessQuery>,
ext: web::Path<String>,
manager: web::Data<UploadManager>,
manager: web::Data<UploadManager<S>>,
filters: web::Data<Option<HashSet<String>>>,
) -> Result<HttpResponse, Error> {
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let (format, name, thumbnail_path, thumbnail_args) =
prepare_process(query, ext.as_str(), &manager, &filters).await?;
let real_path_opt = manager.variant_path(&thumbnail_path, &name).await?;
let identifier_opt = manager.variant_identifier(&thumbnail_path, &name).await?;
// If the thumbnail doesn't exist, we need to create it
let real_path_opt = if let Some(real_path) = real_path_opt {
if let Err(e) = tokio::fs::metadata(&real_path)
.instrument(tracing::info_span!("Get thumbnail metadata"))
.await
{
if e.kind() != std::io::ErrorKind::NotFound {
error!("Error looking up processed image, {}", e);
return Err(e.into());
}
None
} else {
Some(real_path)
}
} else {
None
};
if let Some(real_path) = real_path_opt {
if let Some(identifier) = identifier_opt {
let details_opt = manager
.variant_details(real_path.clone(), name.clone())
.variant_details(identifier.clone(), name.clone())
.await?;
let details = if let Some(details) = details_opt {
details
} else {
let details = Details::from_path(real_path.clone()).await?;
let hint = details_hint(&name);
let details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(real_path.clone(), name, &details)
.store_variant_details(identifier.clone(), name, &details)
.await?;
details
};
return ranged_file_resp(real_path, range, details).await;
return ranged_file_resp(manager.store().clone(), identifier, range, details).await;
}
let original_path = manager.still_path_from_filename(name.clone()).await?;
let identifier = manager.still_identifier_from_filename(name.clone()).await?;
let thumbnail_path2 = thumbnail_path.clone();
let process_fut = async {
@ -473,10 +383,12 @@ async fn process(
let permit = PROCESS_SEMAPHORE.acquire().await?;
let file = crate::file::File::open(original_path.clone()).await?;
let mut processed_reader =
crate::magick::process_image_file_read(file, thumbnail_args, format)?;
let mut processed_reader = crate::magick::process_image_store_read(
manager.store().clone(),
identifier,
thumbnail_args,
format,
)?;
let mut vec = Vec::new();
processed_reader.read_to_end(&mut vec).await?;
@ -484,7 +396,7 @@ async fn process(
drop(permit);
let details = Details::from_bytes(bytes.clone()).await?;
let details = Details::from_bytes(bytes.clone(), format.to_hint()).await?;
let save_span = tracing::info_span!(
parent: None,
@ -497,27 +409,22 @@ async fn process(
let bytes2 = bytes.clone();
actix_rt::spawn(
async move {
let real_path = match manager.next_directory() {
Ok(real_path) => real_path.join(&name),
let identifier = match manager.store().save_bytes(bytes2).await {
Ok(identifier) => identifier,
Err(e) => {
tracing::warn!("Failed to generate directory path: {}", e);
return;
}
};
if let Err(e) = safe_save_file(real_path.clone(), bytes2).await {
tracing::warn!("Error saving thumbnail: {}", e);
return;
}
if let Err(e) = manager
.store_variant_details(real_path.clone(), name.clone(), &details2)
.store_variant_details(identifier.clone(), name.clone(), &details2)
.await
{
tracing::warn!("Error saving variant details: {}", e);
return;
}
if let Err(e) = manager
.store_variant(Some(&thumbnail_path), &real_path, &name)
.store_variant(Some(&thumbnail_path), &identifier, &name)
.await
{
tracing::warn!("Error saving variant info: {}", e);
@ -569,21 +476,28 @@ async fn process(
/// Fetch file details
#[instrument(name = "Fetching details", skip(manager))]
async fn details(
async fn details<S: Store>(
alias: web::Path<String>,
manager: web::Data<UploadManager>,
) -> Result<HttpResponse, Error> {
manager: web::Data<UploadManager<S>>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let name = manager.from_alias(alias.into_inner()).await?;
let path = manager.path_from_filename(name.clone()).await?;
let identifier = manager.identifier_from_filename(name.clone()).await?;
let details = manager.variant_details(path.clone(), name.clone()).await?;
let details = manager
.variant_details(identifier.clone(), name.clone())
.await?;
let details = if let Some(details) = details {
details
} else {
let new_details = Details::from_path(path.clone()).await?;
let hint = details_hint(&name);
let new_details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(path.clone(), name, &new_details)
.store_variant_details(identifier, name, &new_details)
.await?;
new_details
};
@ -593,34 +507,45 @@ async fn details(
/// Serve files
#[instrument(name = "Serving file", skip(manager))]
async fn serve(
async fn serve<S: Store>(
range: Option<range::RangeHeader>,
alias: web::Path<String>,
manager: web::Data<UploadManager>,
) -> Result<HttpResponse, Error> {
manager: web::Data<UploadManager<S>>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let name = manager.from_alias(alias.into_inner()).await?;
let path = manager.path_from_filename(name.clone()).await?;
let identifier = manager.identifier_from_filename(name.clone()).await?;
let details = manager.variant_details(path.clone(), name.clone()).await?;
let details = manager
.variant_details(identifier.clone(), name.clone())
.await?;
let details = if let Some(details) = details {
details
} else {
let details = Details::from_path(path.clone()).await?;
let hint = details_hint(&name);
let details =
Details::from_store(manager.store().clone(), identifier.clone(), hint).await?;
manager
.store_variant_details(path.clone(), name, &details)
.store_variant_details(identifier.clone(), name, &details)
.await?;
details
};
ranged_file_resp(path, range, details).await
ranged_file_resp(manager.store().clone(), identifier, range, details).await
}
async fn ranged_file_resp(
path: PathBuf,
async fn ranged_file_resp<S: Store>(
store: S,
identifier: S::Identifier,
range: Option<range::RangeHeader>,
details: Details,
) -> Result<HttpResponse, Error> {
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let (builder, stream) = match range {
//Range header exists - return as ranged
Some(range_header) => {
@ -631,24 +556,27 @@ async fn ranged_file_resp(
if range_header.is_empty() {
return Err(UploadError::Range.into());
} else if range_header.len() == 1 {
let file = crate::file::File::open(path).await?;
let meta = file.metadata().await?;
let len = store.len(&identifier).await?;
let range = range_header.ranges().next().unwrap();
let mut builder = HttpResponse::PartialContent();
builder.insert_header(range.to_content_range(meta.len()));
builder.insert_header(range.to_content_range(len));
(builder, Either::left(range.chop_file(file).await?))
(
builder,
Either::left(map_error::map_crate_error(
range.chop_store(store, identifier).await?,
)),
)
} else {
return Err(UploadError::Range.into());
}
}
//No Range header in the request - return the entire document
None => {
let file = crate::file::File::open(path).await?;
let stream = file.read_to_stream(None, None).await?;
let stream =
map_error::map_crate_error(store.to_stream(&identifier, None, None).await?);
(HttpResponse::Ok(), Either::right(stream))
}
};
@ -696,10 +624,13 @@ enum FileOrAlias {
}
#[instrument(name = "Purging file", skip(upload_manager))]
async fn purge(
async fn purge<S: Store>(
query: web::Query<FileOrAlias>,
upload_manager: web::Data<UploadManager>,
) -> Result<HttpResponse, Error> {
upload_manager: web::Data<UploadManager<S>>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let aliases = match query.into_inner() {
FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?,
FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?,
@ -718,10 +649,13 @@ async fn purge(
}
#[instrument(name = "Fetching aliases", skip(upload_manager))]
async fn aliases(
async fn aliases<S: Store>(
query: web::Query<FileOrAlias>,
upload_manager: web::Data<UploadManager>,
) -> Result<HttpResponse, Error> {
upload_manager: web::Data<UploadManager<S>>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let aliases = match query.into_inner() {
FileOrAlias::File { file } => upload_manager.aliases_by_filename(file).await?,
FileOrAlias::Alias { alias } => upload_manager.aliases_by_alias(alias).await?,
@ -739,10 +673,13 @@ struct ByAlias {
}
#[instrument(name = "Fetching filename", skip(upload_manager))]
async fn filename_by_alias(
async fn filename_by_alias<S: Store>(
query: web::Query<ByAlias>,
upload_manager: web::Data<UploadManager>,
) -> Result<HttpResponse, Error> {
upload_manager: web::Data<UploadManager<S>>,
) -> Result<HttpResponse, Error>
where
Error: From<S::Error>,
{
let filename = upload_manager.from_alias(query.into_inner().alias).await?;
Ok(HttpResponse::Ok().json(&serde_json::json!({
@ -751,12 +688,17 @@ async fn filename_by_alias(
})))
}
#[actix_rt::main]
async fn main() -> anyhow::Result<()> {
let manager = UploadManager::new(CONFIG.data_dir(), CONFIG.format()).await?;
init_tracing("pict-rs", CONFIG.opentelemetry_url())?;
fn transform_error(error: actix_form_data::Error) -> actix_web::Error {
let error: Error = error.into();
let error: actix_web::Error = error.into();
error
}
async fn launch<S: Store>(manager: UploadManager<S>) -> anyhow::Result<()>
where
S::Error: Unpin,
Error: From<S::Error>,
{
// Create a new Multipart Form validator
//
// This form is expecting a single array field, 'images' with at most 10 files in it
@ -764,7 +706,7 @@ async fn main() -> anyhow::Result<()> {
let form = Form::new()
.max_files(10)
.max_file_size(CONFIG.max_file_size() * MEGABYTES)
.transform_error(|e| Error::from(e).into())
.transform_error(transform_error)
.field(
"images",
Field::array(Field::file(move |filename, _, stream| {
@ -775,7 +717,10 @@ async fn main() -> anyhow::Result<()> {
async move {
let permit = PROCESS_SEMAPHORE.acquire().await?;
let res = manager.session().upload(stream).await;
let res = manager
.session()
.upload(map_error::map_crate_error(stream))
.await;
drop(permit);
res
@ -792,7 +737,7 @@ async fn main() -> anyhow::Result<()> {
let import_form = Form::new()
.max_files(10)
.max_file_size(CONFIG.max_file_size() * MEGABYTES)
.transform_error(|e| Error::from(e).into())
.transform_error(transform_error)
.field(
"images",
Field::array(Field::file(move |filename, content_type, stream| {
@ -805,7 +750,12 @@ async fn main() -> anyhow::Result<()> {
let res = manager
.session()
.import(filename, content_type, validate_imports, stream)
.import(
filename,
content_type,
validate_imports,
map_error::map_crate_error(stream),
)
.await;
drop(permit);
@ -832,24 +782,25 @@ async fn main() -> anyhow::Result<()> {
web::resource("")
.guard(guard::Post())
.wrap(form.clone())
.route(web::post().to(upload)),
.route(web::post().to(upload::<S>)),
)
.service(web::resource("/download").route(web::get().to(download)))
.service(web::resource("/download").route(web::get().to(download::<S>)))
.service(
web::resource("/delete/{delete_token}/{filename}")
.route(web::delete().to(delete))
.route(web::get().to(delete)),
.route(web::delete().to(delete::<S>))
.route(web::get().to(delete::<S>)),
)
.service(web::resource("/original/{filename}").route(web::get().to(serve)))
.service(web::resource("/process.{ext}").route(web::get().to(process)))
.service(web::resource("/original/{filename}").route(web::get().to(serve::<S>)))
.service(web::resource("/process.{ext}").route(web::get().to(process::<S>)))
.service(
web::scope("/details")
.service(
web::resource("/original/{filename}").route(web::get().to(details)),
web::resource("/original/{filename}")
.route(web::get().to(details::<S>)),
)
.service(
web::resource("/process.{ext}")
.route(web::get().to(process_details)),
.route(web::get().to(process_details::<S>)),
),
),
)
@ -859,20 +810,34 @@ async fn main() -> anyhow::Result<()> {
.service(
web::resource("/import")
.wrap(import_form.clone())
.route(web::post().to(upload)),
.route(web::post().to(upload::<S>)),
)
.service(web::resource("/purge").route(web::post().to(purge)))
.service(web::resource("/aliases").route(web::get().to(aliases)))
.service(web::resource("/filename").route(web::get().to(filename_by_alias))),
.service(web::resource("/purge").route(web::post().to(purge::<S>)))
.service(web::resource("/aliases").route(web::get().to(aliases::<S>)))
.service(
web::resource("/filename").route(web::get().to(filename_by_alias::<S>)),
),
)
})
.bind(CONFIG.bind_address())?
.run()
.await?;
if tokio::fs::metadata(&*TMP_DIR).await.is_ok() {
tokio::fs::remove_dir_all(&*TMP_DIR).await?;
}
Ok(())
}
#[actix_rt::main]
async fn main() -> anyhow::Result<()> {
init_tracing("pict-rs", CONFIG.opentelemetry_url())?;
let root_dir = CONFIG.data_dir();
let db = LatestDb::exists(root_dir.clone()).migrate()?;
let store = FileStore::build(root_dir, &db)?;
let manager = UploadManager::new(store, db, CONFIG.format()).await?;
// TODO: move restructure to FileStore
manager.restructure().await?;
launch(manager).await
}

43
src/map_error.rs Normal file
View file

@ -0,0 +1,43 @@
use crate::error::Error;
use futures_util::stream::Stream;
use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
pin_project_lite::pin_project! {
pub(super) struct MapError<E, S> {
#[pin]
inner: S,
_error: PhantomData<E>,
}
}
pub(super) fn map_crate_error<S>(inner: S) -> MapError<Error, S> {
map_error(inner)
}
pub(super) fn map_error<S, E>(inner: S) -> MapError<E, S> {
MapError {
inner,
_error: PhantomData,
}
}
impl<T, StreamErr, E, S> Stream for MapError<E, S>
where
S: Stream<Item = Result<T, StreamErr>>,
E: From<StreamErr>,
{
type Item = Result<T, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();
this.inner
.poll_next(cx)
.map(|opt| opt.map(|res| res.map_err(Into::into)))
}
}

View file

@ -1,3 +1,4 @@
use crate::store::Store;
use actix_rt::task::JoinHandle;
use actix_web::web::Bytes;
use std::{
@ -66,7 +67,7 @@ impl Process {
let mut stdin = self.child.stdin.take()?;
let stdout = self.child.stdout.take()?;
let (tx, rx) = channel();
let (tx, rx) = channel::<std::io::Error>();
let span = self.spawn_span();
let mut child = self.child;
@ -102,9 +103,10 @@ impl Process {
})
}
pub(crate) fn file_read(
pub(crate) fn store_read<S: Store>(
mut self,
mut input_file: crate::file::File,
store: S,
identifier: S::Identifier,
) -> Option<impl AsyncRead + Unpin> {
let mut stdin = self.child.stdin.take()?;
let stdout = self.child.stdout.take()?;
@ -115,7 +117,7 @@ impl Process {
let mut child = self.child;
let handle = actix_rt::spawn(
async move {
if let Err(e) = input_file.read_to_async_write(&mut stdin).await {
if let Err(e) = store.read_into(&identifier, &mut stdin).await {
let _ = tx.send(e);
return;
}

View file

@ -14,6 +14,50 @@ pub(crate) trait Processor {
}
pub(crate) struct Identity;
pub(crate) struct Thumbnail(usize);
pub(crate) struct Resize(usize);
pub(crate) struct Crop(usize, usize);
pub(crate) struct Blur(f64);
#[instrument]
pub(crate) fn build_chain(
args: &[(String, String)],
filename: String,
) -> Result<(PathBuf, Vec<String>), Error> {
fn parse<P: Processor>(key: &str, value: &str) -> Result<Option<P>, UploadError> {
if key == P::NAME {
return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?));
}
Ok(None)
}
macro_rules! parse {
($inner:expr, $x:ident, $k:expr, $v:expr) => {{
if let Some(processor) = parse::<$x>($k, $v)? {
return Ok((processor.path($inner.0), processor.command($inner.1)));
};
}};
}
let (path, args) =
args.into_iter()
.fold(Ok((PathBuf::default(), vec![])), |inner, (name, value)| {
if let Ok(inner) = inner {
parse!(inner, Identity, name, value);
parse!(inner, Thumbnail, name, value);
parse!(inner, Resize, name, value);
parse!(inner, Crop, name, value);
parse!(inner, Blur, name, value);
Err(Error::from(UploadError::ParsePath))
} else {
inner
}
})?;
Ok((path.join(filename), args))
}
impl Processor for Identity {
const NAME: &'static str = "identity";
@ -34,8 +78,6 @@ impl Processor for Identity {
}
}
pub(crate) struct Thumbnail(usize);
impl Processor for Thumbnail {
const NAME: &'static str = "thumbnail";
@ -60,8 +102,6 @@ impl Processor for Thumbnail {
}
}
pub(crate) struct Resize(usize);
impl Processor for Resize {
const NAME: &'static str = "resize";
@ -91,8 +131,6 @@ impl Processor for Resize {
}
}
pub(crate) struct Crop(usize, usize);
impl Processor for Crop {
const NAME: &'static str = "crop";
@ -133,8 +171,6 @@ impl Processor for Crop {
}
}
pub(crate) struct Blur(f64);
impl Processor for Blur {
const NAME: &'static str = "blur";
@ -155,43 +191,3 @@ impl Processor for Blur {
args
}
}
#[instrument]
pub(crate) fn build_chain(
args: &[(String, String)],
filename: String,
) -> Result<(PathBuf, Vec<String>), Error> {
fn parse<P: Processor>(key: &str, value: &str) -> Result<Option<P>, UploadError> {
if key == P::NAME {
return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?));
}
Ok(None)
}
macro_rules! parse {
($inner:expr, $x:ident, $k:expr, $v:expr) => {{
if let Some(processor) = parse::<$x>($k, $v)? {
return Ok((processor.path($inner.0), processor.command($inner.1)));
};
}};
}
let (path, args) =
args.into_iter()
.fold(Ok((PathBuf::default(), vec![])), |inner, (name, value)| {
if let Ok(inner) = inner {
parse!(inner, Identity, name, value);
parse!(inner, Thumbnail, name, value);
parse!(inner, Resize, name, value);
parse!(inner, Crop, name, value);
parse!(inner, Blur, name, value);
Err(Error::from(UploadError::ParsePath))
} else {
inner
}
})?;
Ok((path.join(filename), args))
}

View file

@ -1,4 +1,7 @@
use crate::error::{Error, UploadError};
use crate::{
error::{Error, UploadError},
store::Store,
};
use actix_web::{
dev::Payload,
http::{
@ -52,17 +55,22 @@ impl Range {
}
}
pub(crate) async fn chop_file(
pub(crate) async fn chop_store<S: Store>(
&self,
file: crate::file::File,
) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
store: S,
identifier: S::Identifier,
) -> Result<impl Stream<Item = std::io::Result<Bytes>>, Error>
where
Error: From<S::Error>,
{
match self {
Range::Start(start) => file.read_to_stream(Some(*start), None).await,
Range::SuffixLength(from_start) => file.read_to_stream(None, Some(*from_start)).await,
Range::Segment(start, end) => {
file.read_to_stream(Some(*start), Some(end.saturating_sub(*start)))
.await
}
Range::Start(start) => Ok(store.to_stream(&identifier, Some(*start), None).await?),
Range::SuffixLength(from_start) => Ok(store
.to_stream(&identifier, None, Some(*from_start))
.await?),
Range::Segment(start, end) => Ok(store
.to_stream(&identifier, Some(*start), Some(end.saturating_sub(*start)))
.await?),
}
}
}

52
src/store.rs Normal file
View file

@ -0,0 +1,52 @@
use std::fmt::Debug;
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use tokio::io::{AsyncRead, AsyncWrite};
pub(crate) mod file_store;
pub(crate) trait Identifier: Send + Sync + Clone + Debug {
type Error: std::error::Error;
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error>;
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error>
where
Self: Sized;
}
#[async_trait::async_trait(?Send)]
pub(crate) trait Store: Send + Sync + Clone + Debug + 'static {
type Error: std::error::Error;
type Identifier: Identifier<Error = Self::Error>;
type Stream: Stream<Item = std::io::Result<Bytes>>;
async fn save_async_read<Reader>(
&self,
reader: &mut Reader,
) -> Result<Self::Identifier, Self::Error>
where
Reader: AsyncRead + Unpin;
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error>;
async fn to_stream(
&self,
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Self::Error>;
async fn read_into<Writer>(
&self,
identifier: &Self::Identifier,
writer: &mut Writer,
) -> Result<(), std::io::Error>
where
Writer: AsyncWrite + Unpin;
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Self::Error>;
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error>;
}

303
src/store/file_store.rs Normal file
View file

@ -0,0 +1,303 @@
use crate::store::Store;
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use std::{
path::{Path, PathBuf},
pin::Pin,
};
use storage_path_generator::Generator;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, error, instrument};
use uuid::Uuid;
mod file;
mod file_id;
mod restructure;
use file::File;
pub(crate) use file_id::FileId;
// - Settings Tree
// - last-path -> last generated path
// - fs-restructure-01-complete -> bool
const GENERATOR_KEY: &'static [u8] = b"last-path";
#[derive(Debug, thiserror::Error)]
pub(crate) enum FileError {
#[error(transparent)]
Sled(#[from] sled::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
PathGenerator(#[from] storage_path_generator::PathError),
#[error("Error formatting file store identifier")]
IdError,
#[error("Mailformed file store identifier")]
PrefixError,
#[error("Tried to save over existing file")]
FileExists,
}
#[derive(Clone)]
pub(crate) struct FileStore {
path_gen: Generator,
root_dir: PathBuf,
settings_tree: sled::Tree,
}
#[async_trait::async_trait(?Send)]
impl Store for FileStore {
type Error = FileError;
type Identifier = FileId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
async fn save_async_read<Reader>(
&self,
reader: &mut Reader,
) -> Result<Self::Identifier, Self::Error>
where
Reader: AsyncRead + Unpin,
{
let path = self.next_file()?;
if let Err(e) = self.safe_save_reader(&path, reader).await {
self.safe_remove_file(&path).await?;
return Err(e);
}
self.file_id_from_path(path)
}
async fn save_bytes(&self, bytes: Bytes) -> Result<Self::Identifier, Self::Error> {
let path = self.next_file()?;
if let Err(e) = self.safe_save_bytes(&path, bytes).await {
self.safe_remove_file(&path).await?;
return Err(e);
}
self.file_id_from_path(path)
}
async fn to_stream(
&self,
identifier: &Self::Identifier,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<Self::Stream, Self::Error> {
let path = self.path_from_file_id(identifier);
let stream = File::open(path)
.await?
.read_to_stream(from_start, len)
.await?;
Ok(Box::pin(stream))
}
async fn read_into<Writer>(
&self,
identifier: &Self::Identifier,
writer: &mut Writer,
) -> Result<(), std::io::Error>
where
Writer: AsyncWrite + Unpin,
{
let path = self.path_from_file_id(identifier);
File::open(&path).await?.read_to_async_write(writer).await?;
Ok(())
}
async fn len(&self, identifier: &Self::Identifier) -> Result<u64, Self::Error> {
let path = self.path_from_file_id(identifier);
let len = tokio::fs::metadata(path).await?.len();
Ok(len)
}
async fn remove(&self, identifier: &Self::Identifier) -> Result<(), Self::Error> {
let path = self.path_from_file_id(identifier);
self.safe_remove_file(path).await?;
Ok(())
}
}
impl FileStore {
pub fn build(root_dir: PathBuf, db: &sled::Db) -> Result<Self, FileError> {
let settings_tree = db.open_tree("settings")?;
let path_gen = init_generator(&settings_tree)?;
Ok(FileStore {
root_dir,
path_gen,
settings_tree,
})
}
fn next_directory(&self) -> Result<PathBuf, FileError> {
let path = self.path_gen.next();
self.settings_tree
.insert(GENERATOR_KEY, path.to_be_bytes())?;
let mut target_path = self.root_dir.join("files");
for dir in path.to_strings() {
target_path.push(dir)
}
Ok(target_path)
}
fn next_file(&self) -> Result<PathBuf, FileError> {
let target_path = self.next_directory()?;
let filename = Uuid::new_v4().to_string();
Ok(target_path.join(filename))
}
async fn safe_remove_file<P: AsRef<Path>>(&self, path: P) -> Result<(), FileError> {
tokio::fs::remove_file(&path).await?;
self.try_remove_parents(path.as_ref()).await;
Ok(())
}
async fn try_remove_parents(&self, mut path: &Path) {
while let Some(parent) = path.parent() {
if parent.ends_with(&self.root_dir) {
return;
}
if tokio::fs::remove_dir(parent).await.is_err() {
return;
}
path = parent;
}
}
// Try writing to a file
#[instrument(name = "Saving file", skip(bytes), fields(path = tracing::field::debug(&path.as_ref())))]
async fn safe_save_bytes<P: AsRef<Path>>(
&self,
path: P,
bytes: Bytes,
) -> Result<(), FileError> {
safe_create_parent(&path).await?;
// Only write the file if it doesn't already exist
debug!("Checking if {:?} already exists", path.as_ref());
if let Err(e) = tokio::fs::metadata(&path).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Ok(());
}
// Open the file for writing
debug!("Creating {:?}", path.as_ref());
let mut file = File::create(&path).await?;
// try writing
debug!("Writing to {:?}", path.as_ref());
if let Err(e) = file.write_from_bytes(bytes).await {
error!("Error writing {:?}, {}", path.as_ref(), e);
// remove file if writing failed before completion
self.safe_remove_file(path).await?;
return Err(e.into());
}
debug!("{:?} written", path.as_ref());
Ok(())
}
#[instrument(skip(input), fields(to = tracing::field::debug(&to.as_ref())))]
async fn safe_save_reader<P: AsRef<Path>>(
&self,
to: P,
input: &mut (impl AsyncRead + Unpin + ?Sized),
) -> Result<(), FileError> {
safe_create_parent(&to).await?;
debug!("Checking if {:?} already exists", to.as_ref());
if let Err(e) = tokio::fs::metadata(&to).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Err(FileError::FileExists);
}
debug!("Writing stream to {:?}", to.as_ref());
let mut file = File::create(to).await?;
file.write_from_async_read(input).await?;
Ok(())
}
// try moving a file
#[instrument(name = "Moving file", fields(from = tracing::field::debug(&from.as_ref()), to = tracing::field::debug(&to.as_ref())))]
pub(crate) async fn safe_move_file<P: AsRef<Path>, Q: AsRef<Path>>(
&self,
from: P,
to: Q,
) -> Result<(), FileError> {
safe_create_parent(&to).await?;
debug!("Checking if {:?} already exists", to.as_ref());
if let Err(e) = tokio::fs::metadata(&to).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
} else {
return Err(FileError::FileExists);
}
debug!("Moving {:?} to {:?}", from.as_ref(), to.as_ref());
tokio::fs::copy(&from, &to).await?;
self.safe_remove_file(from).await?;
Ok(())
}
}
pub(crate) async fn safe_create_parent<P: AsRef<Path>>(path: P) -> Result<(), FileError> {
if let Some(path) = path.as_ref().parent() {
debug!("Creating directory {:?}", path);
tokio::fs::create_dir_all(path).await?;
}
Ok(())
}
fn init_generator(settings: &sled::Tree) -> Result<Generator, FileError> {
if let Some(ivec) = settings.get(GENERATOR_KEY)? {
Ok(Generator::from_existing(
storage_path_generator::Path::from_be_bytes(ivec.to_vec())?,
))
} else {
Ok(Generator::new())
}
}
impl std::fmt::Debug for FileStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FileStore")
.field("path_gen", &self.path_gen)
.field("root_dir", &self.root_dir)
.finish()
}
}

View file

@ -1,50 +1,15 @@
use futures_util::stream::Stream;
use std::{
pin::Pin,
task::{Context, Poll},
};
#[cfg(feature = "io-uring")]
pub(crate) use io_uring::File;
#[cfg(not(feature = "io-uring"))]
pub(crate) use tokio_file::File;
pin_project_lite::pin_project! {
pub(super) struct CrateError<S> {
#[pin]
inner: S
}
}
impl<S> CrateError<S> {
pub(super) fn new(inner: S) -> Self {
CrateError { inner }
}
}
impl<T, E, S> Stream for CrateError<S>
where
S: Stream<Item = Result<T, E>>,
crate::error::Error: From<E>,
{
type Item = Result<T, crate::error::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();
this.inner
.poll_next(cx)
.map(|opt| opt.map(|res| res.map_err(Into::into)))
}
}
#[cfg(not(feature = "io-uring"))]
mod tokio_file {
use crate::Either;
use crate::{store::file_store::FileError, Either};
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::Stream;
use std::{fs::Metadata, io::SeekFrom, path::Path};
use std::{io::SeekFrom, path::Path};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{BytesCodec, FramedRead};
@ -65,20 +30,13 @@ mod tokio_file {
})
}
pub(crate) async fn metadata(&self) -> std::io::Result<Metadata> {
self.inner.metadata().await
}
pub(crate) async fn write_from_bytes<'a>(
&'a mut self,
mut bytes: Bytes,
) -> std::io::Result<()> {
pub(crate) async fn write_from_bytes(&mut self, mut bytes: Bytes) -> std::io::Result<()> {
self.inner.write_all_buf(&mut bytes).await?;
Ok(())
}
pub(crate) async fn write_from_async_read<'a, R>(
&'a mut self,
pub(crate) async fn write_from_async_read<R>(
&mut self,
mut reader: R,
) -> std::io::Result<()>
where
@ -88,12 +46,9 @@ mod tokio_file {
Ok(())
}
pub(crate) async fn read_to_async_write<'a, W>(
&'a mut self,
writer: &'a mut W,
) -> std::io::Result<()>
pub(crate) async fn read_to_async_write<W>(&mut self, writer: &mut W) -> std::io::Result<()>
where
W: AsyncWrite + Unpin,
W: AsyncWrite + Unpin + ?Sized,
{
tokio::io::copy(&mut self.inner, writer).await?;
Ok(())
@ -103,8 +58,7 @@ mod tokio_file {
mut self,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<impl Stream<Item = Result<Bytes, crate::error::Error>>, crate::error::Error>
{
) -> Result<impl Stream<Item = std::io::Result<Bytes>>, FileError> {
let obj = match (from_start, len) {
(Some(lower), Some(upper)) => {
self.inner.seek(SeekFrom::Start(lower)).await?;
@ -118,10 +72,7 @@ mod tokio_file {
(None, None) => Either::right(self.inner),
};
Ok(super::CrateError::new(BytesFreezer::new(FramedRead::new(
obj,
BytesCodec::new(),
))))
Ok(BytesFreezer::new(FramedRead::new(obj, BytesCodec::new())))
}
}
@ -159,6 +110,7 @@ mod tokio_file {
#[cfg(feature = "io-uring")]
mod io_uring {
use crate::store::file_store::FileError;
use actix_web::web::Bytes;
use futures_util::stream::Stream;
use std::{
@ -182,7 +134,7 @@ mod io_uring {
impl File {
pub(crate) async fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
tracing::info!("Opening io-uring file");
tracing::info!("Opening io-uring file: {:?}", path.as_ref());
Ok(File {
path: path.as_ref().to_owned(),
inner: tokio_uring::fs::File::open(path).await?,
@ -190,21 +142,18 @@ mod io_uring {
}
pub(crate) async fn create(path: impl AsRef<Path>) -> std::io::Result<Self> {
tracing::info!("Creating io-uring file");
tracing::info!("Creating io-uring file: {:?}", path.as_ref());
Ok(File {
path: path.as_ref().to_owned(),
inner: tokio_uring::fs::File::create(path).await?,
})
}
pub(crate) async fn metadata(&self) -> std::io::Result<Metadata> {
async fn metadata(&self) -> std::io::Result<Metadata> {
tokio::fs::metadata(&self.path).await
}
pub(crate) async fn write_from_bytes<'a>(
&'a mut self,
bytes: Bytes,
) -> std::io::Result<()> {
pub(crate) async fn write_from_bytes(&mut self, bytes: Bytes) -> std::io::Result<()> {
let mut buf = bytes.to_vec();
let len: u64 = buf.len().try_into().unwrap();
@ -233,8 +182,8 @@ mod io_uring {
Ok(())
}
pub(crate) async fn write_from_async_read<'a, R>(
&'a mut self,
pub(crate) async fn write_from_async_read<R>(
&mut self,
mut reader: R,
) -> std::io::Result<()>
where
@ -283,12 +232,9 @@ mod io_uring {
Ok(())
}
pub(crate) async fn read_to_async_write<'a, W>(
&'a mut self,
writer: &mut W,
) -> std::io::Result<()>
pub(crate) async fn read_to_async_write<W>(&mut self, writer: &mut W) -> std::io::Result<()>
where
W: AsyncWrite + Unpin,
W: AsyncWrite + Unpin + ?Sized,
{
let metadata = self.metadata().await?;
let size = metadata.len();
@ -323,20 +269,17 @@ mod io_uring {
self,
from_start: Option<u64>,
len: Option<u64>,
) -> Result<impl Stream<Item = Result<Bytes, crate::error::Error>>, crate::error::Error>
{
) -> Result<impl Stream<Item = std::io::Result<Bytes>>, FileError> {
let size = self.metadata().await?.len();
let cursor = from_start.unwrap_or(0);
let size = len.unwrap_or(size - cursor) + cursor;
Ok(super::CrateError {
inner: BytesStream {
state: ReadFileState::File { file: Some(self) },
size,
cursor,
callback: read_file,
},
Ok(BytesStream {
state: ReadFileState::File { file: Some(self) },
size,
cursor,
callback: read_file,
})
}

View file

@ -0,0 +1,48 @@
use crate::store::{
file_store::{FileError, FileStore},
Identifier,
};
use std::path::PathBuf;
#[derive(Clone, Debug)]
pub(crate) struct FileId(PathBuf);
impl Identifier for FileId {
type Error = FileError;
fn to_bytes(&self) -> Result<Vec<u8>, Self::Error> {
let vec = self
.0
.to_str()
.ok_or(FileError::IdError)?
.as_bytes()
.to_vec();
Ok(vec)
}
fn from_bytes(bytes: Vec<u8>) -> Result<Self, Self::Error>
where
Self: Sized,
{
let string = String::from_utf8(bytes).map_err(|_| FileError::IdError)?;
let id = FileId(PathBuf::from(string));
Ok(id)
}
}
impl FileStore {
pub(super) fn file_id_from_path(&self, path: PathBuf) -> Result<FileId, FileError> {
let stripped = path
.strip_prefix(&self.root_dir)
.map_err(|_| FileError::PrefixError)?;
Ok(FileId(stripped.to_path_buf()))
}
pub(super) fn path_from_file_id(&self, file_id: &FileId) -> PathBuf {
self.root_dir.join(&file_id.0)
}
}

View file

@ -1,6 +1,6 @@
use crate::{
error::{Error, UploadError},
safe_move_file,
store::file_store::FileStore,
upload_manager::UploadManager,
};
use std::path::{Path, PathBuf};
@ -8,24 +8,22 @@ use std::path::{Path, PathBuf};
const RESTRUCTURE_COMPLETE: &'static [u8] = b"fs-restructure-01-complete";
const DETAILS: &'static [u8] = b"details";
impl UploadManager {
impl UploadManager<FileStore> {
#[tracing::instrument(skip(self))]
pub(super) async fn restructure(&self) -> Result<(), Error> {
pub(crate) async fn restructure(&self) -> Result<(), Error> {
if self.restructure_complete()? {
return Ok(());
}
for res in self.inner.filename_tree.iter() {
for res in self.inner().filename_tree.iter() {
let (filename, hash) = res?;
let filename = String::from_utf8(filename.to_vec())?;
tracing::info!("Migrating {}", filename);
let mut file_path = self.inner.root_dir.join("files");
file_path.push(filename.clone());
let file_path = self.store().root_dir.join("files").join(&filename);
if tokio::fs::metadata(&file_path).await.is_ok() {
let mut target_path = self.next_directory()?;
target_path.push(filename.clone());
let target_path = self.store().next_directory()?.join(&filename);
let target_path_bytes = self
.generalize_path(&target_path)?
@ -34,24 +32,23 @@ impl UploadManager {
.as_bytes()
.to_vec();
self.inner
.path_tree
self.inner()
.identifier_tree
.insert(filename.as_bytes(), target_path_bytes)?;
safe_move_file(file_path, target_path).await?;
self.store().safe_move_file(file_path, target_path).await?;
}
let (start, end) = variant_key_bounds(&hash);
for res in self.inner.main_tree.range(start..end) {
for res in self.inner().main_tree.range(start..end) {
let (hash_variant_key, variant_path_or_details) = res?;
if !hash_variant_key.ends_with(DETAILS) {
let variant_path =
PathBuf::from(String::from_utf8(variant_path_or_details.to_vec())?);
if tokio::fs::metadata(&variant_path).await.is_ok() {
let mut target_path = self.next_directory()?;
target_path.push(filename.clone());
let target_path = self.store().next_directory()?.join(&filename);
let relative_target_path_bytes = self
.generalize_path(&target_path)?
@ -62,16 +59,18 @@ impl UploadManager {
let variant_key = self.migrate_variant_key(&variant_path, &filename)?;
self.inner
.path_tree
self.inner()
.identifier_tree
.insert(variant_key, relative_target_path_bytes)?;
safe_move_file(variant_path.clone(), target_path).await?;
self.try_remove_parents(&variant_path).await?;
self.store()
.safe_move_file(variant_path.clone(), target_path)
.await?;
self.store().try_remove_parents(&variant_path).await;
}
}
self.inner.main_tree.remove(hash_variant_key)?;
self.inner().main_tree.remove(hash_variant_key)?;
}
}
@ -81,22 +80,22 @@ impl UploadManager {
fn restructure_complete(&self) -> Result<bool, Error> {
Ok(self
.inner
.store()
.settings_tree
.get(RESTRUCTURE_COMPLETE)?
.is_some())
}
fn mark_restructure_complete(&self) -> Result<(), Error> {
self.inner
self.store()
.settings_tree
.insert(RESTRUCTURE_COMPLETE, b"true")?;
Ok(())
}
pub(super) fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> {
Ok(path.strip_prefix(&self.inner.root_dir)?)
fn generalize_path<'a>(&self, path: &'a Path) -> Result<&'a Path, Error> {
Ok(path.strip_prefix(&self.store().root_dir)?)
}
fn migrate_variant_key(

View file

@ -1,22 +1,22 @@
use crate::{
config::Format,
error::{Error, UploadError},
ffmpeg::ThumbnailFormat,
migrate::{alias_id_key, alias_key, alias_key_bounds, LatestDb},
ffmpeg::{InputFormat, ThumbnailFormat},
magick::{details_hint, ValidInputType},
migrate::{alias_id_key, alias_key, alias_key_bounds},
store::{Identifier, Store},
};
use actix_web::web;
use sha2::Digest;
use std::{
ops::{Deref, DerefMut},
path::PathBuf,
string::FromUtf8Error,
sync::Arc,
};
use storage_path_generator::{Generator, Path};
use tracing::{debug, error, info, instrument, warn, Span};
use tracing_futures::Instrument;
mod hasher;
mod restructure;
mod session;
pub(super) use session::UploadManagerSession;
@ -35,33 +35,26 @@ pub(super) use session::UploadManagerSession;
// - Filename Tree
// - filename -> hash
// - Details Tree
// - filename / relative path -> details
// - filename / S::Identifier -> details
// - Path Tree
// - filename -> relative path
// - filename / relative variant path -> relative variant path
// - filename / motion -> relative motion path
// - Settings Tree
// - last-path -> last generated path
// - fs-restructure-01-complete -> bool
const GENERATOR_KEY: &'static [u8] = b"last-path";
// - filename -> S::Identifier
// - filename / variant path -> S::Identifier
// - filename / motion -> S::Identifier
#[derive(Clone)]
pub struct UploadManager {
pub(crate) struct UploadManager<S> {
inner: Arc<UploadManagerInner>,
store: S,
}
struct UploadManagerInner {
pub(crate) struct UploadManagerInner {
format: Option<Format>,
hasher: sha2::Sha256,
root_dir: PathBuf,
alias_tree: sled::Tree,
filename_tree: sled::Tree,
main_tree: sled::Tree,
pub(crate) alias_tree: sled::Tree,
pub(crate) filename_tree: sled::Tree,
pub(crate) main_tree: sled::Tree,
details_tree: sled::Tree,
path_tree: sled::Tree,
settings_tree: sled::Tree,
path_gen: Generator,
pub(crate) identifier_tree: sled::Tree,
db: sled::Db,
}
@ -82,88 +75,85 @@ struct FilenameIVec {
inner: sled::IVec,
}
impl UploadManager {
impl<S> UploadManager<S>
where
S: Store + 'static,
Error: From<S::Error>,
{
/// Create a new UploadManager
pub(crate) async fn new(root_dir: PathBuf, format: Option<Format>) -> Result<Self, Error> {
let root_clone = root_dir.clone();
// sled automatically creates it's own directories
let db = web::block(move || LatestDb::exists(root_clone).migrate()).await??;
// Ensure file dir exists
tokio::fs::create_dir_all(&root_dir).await?;
let settings_tree = db.open_tree("settings")?;
let path_gen = init_generator(&settings_tree)?;
pub(crate) async fn new(store: S, db: sled::Db, format: Option<Format>) -> Result<Self, Error> {
let manager = UploadManager {
inner: Arc::new(UploadManagerInner {
format,
hasher: sha2::Sha256::new(),
root_dir,
alias_tree: db.open_tree("alias")?,
filename_tree: db.open_tree("filename")?,
details_tree: db.open_tree("details")?,
main_tree: db.open_tree("main")?,
path_tree: db.open_tree("path")?,
settings_tree,
path_gen,
identifier_tree: db.open_tree("path")?,
db,
}),
store,
};
manager.restructure().await?;
Ok(manager)
}
pub(crate) async fn still_path_from_filename(
&self,
filename: String,
) -> Result<PathBuf, Error> {
let path = self.path_from_filename(filename.clone()).await?;
let details =
if let Some(details) = self.variant_details(path.clone(), filename.clone()).await? {
details
} else {
Details::from_path(&path).await?
};
if !details.is_motion() {
return Ok(path);
}
if let Some(motion_path) = self.motion_path(&filename).await? {
return Ok(motion_path);
}
let jpeg_path = self.next_directory()?.join(&filename);
crate::safe_create_parent(&jpeg_path).await?;
let permit = crate::PROCESS_SEMAPHORE.acquire().await;
let res = crate::ffmpeg::thumbnail(&path, &jpeg_path, ThumbnailFormat::Jpeg).await;
drop(permit);
if let Err(e) = res {
error!("transcode error: {:?}", e);
self.remove_path(&jpeg_path).await?;
return Err(e);
}
self.store_motion_path(&filename, &jpeg_path).await?;
Ok(jpeg_path)
pub(crate) fn store(&self) -> &S {
&self.store
}
async fn motion_path(&self, filename: &str) -> Result<Option<PathBuf>, Error> {
let path_tree = self.inner.path_tree.clone();
pub(crate) fn inner(&self) -> &UploadManagerInner {
&self.inner
}
pub(crate) async fn still_identifier_from_filename(
&self,
filename: String,
) -> Result<S::Identifier, Error> {
let identifier = self.identifier_from_filename(filename.clone()).await?;
let details = if let Some(details) = self
.variant_details(identifier.clone(), filename.clone())
.await?
{
details
} else {
let hint = details_hint(&filename);
Details::from_store(self.store.clone(), identifier.clone(), hint).await?
};
if !details.is_motion() {
return Ok(identifier);
}
if let Some(motion_identifier) = self.motion_identifier(&filename).await? {
return Ok(motion_identifier);
}
let permit = crate::PROCESS_SEMAPHORE.acquire().await;
let mut reader = crate::ffmpeg::thumbnail(
self.store.clone(),
identifier,
InputFormat::Mp4,
ThumbnailFormat::Jpeg,
)
.await?;
let motion_identifier = self.store.save_async_read(&mut reader).await?;
drop(permit);
self.store_motion_path(&filename, &motion_identifier)
.await?;
Ok(motion_identifier)
}
async fn motion_identifier(&self, filename: &str) -> Result<Option<S::Identifier>, Error> {
let identifier_tree = self.inner.identifier_tree.clone();
let motion_key = format!("{}/motion", filename);
let opt = web::block(move || path_tree.get(motion_key.as_bytes())).await??;
let opt = web::block(move || identifier_tree.get(motion_key.as_bytes())).await??;
if let Some(ivec) = opt {
return Ok(Some(
self.inner.root_dir.join(String::from_utf8(ivec.to_vec())?),
));
return Ok(Some(S::Identifier::from_bytes(ivec.to_vec())?));
}
Ok(None)
@ -172,59 +162,57 @@ impl UploadManager {
async fn store_motion_path(
&self,
filename: &str,
path: impl AsRef<std::path::Path>,
identifier: &S::Identifier,
) -> Result<(), Error> {
let path_bytes = self
.generalize_path(path.as_ref())?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
let identifier_bytes = identifier.to_bytes()?;
let motion_key = format!("{}/motion", filename);
let path_tree = self.inner.path_tree.clone();
let identifier_tree = self.inner.identifier_tree.clone();
web::block(move || path_tree.insert(motion_key.as_bytes(), path_bytes)).await??;
web::block(move || identifier_tree.insert(motion_key.as_bytes(), identifier_bytes))
.await??;
Ok(())
}
#[instrument(skip(self))]
pub(crate) async fn path_from_filename(&self, filename: String) -> Result<PathBuf, Error> {
let path_tree = self.inner.path_tree.clone();
let path_ivec = web::block(move || path_tree.get(filename.as_bytes()))
pub(crate) async fn identifier_from_filename(
&self,
filename: String,
) -> Result<S::Identifier, Error> {
let identifier_tree = self.inner.identifier_tree.clone();
let path_ivec = web::block(move || identifier_tree.get(filename.as_bytes()))
.await??
.ok_or(UploadError::MissingFile)?;
let relative = PathBuf::from(String::from_utf8(path_ivec.to_vec())?);
let identifier = S::Identifier::from_bytes(path_ivec.to_vec())?;
Ok(self.inner.root_dir.join(relative))
Ok(identifier)
}
#[instrument(skip(self))]
async fn store_path(&self, filename: String, path: &std::path::Path) -> Result<(), Error> {
let path_bytes = self
.generalize_path(path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
let path_tree = self.inner.path_tree.clone();
web::block(move || path_tree.insert(filename.as_bytes(), path_bytes)).await??;
async fn store_identifier(
&self,
filename: String,
identifier: &S::Identifier,
) -> Result<(), Error> {
let identifier_bytes = identifier.to_bytes()?;
let identifier_tree = self.inner.identifier_tree.clone();
web::block(move || identifier_tree.insert(filename.as_bytes(), identifier_bytes)).await??;
Ok(())
}
#[instrument(skip(self))]
pub(crate) async fn variant_path(
pub(crate) async fn variant_identifier(
&self,
process_path: &std::path::Path,
filename: &str,
) -> Result<Option<PathBuf>, Error> {
) -> Result<Option<S::Identifier>, Error> {
let key = self.variant_key(process_path, filename)?;
let path_tree = self.inner.path_tree.clone();
let path_opt = web::block(move || path_tree.get(key)).await??;
let identifier_tree = self.inner.identifier_tree.clone();
let path_opt = web::block(move || identifier_tree.get(key)).await??;
if let Some(path_ivec) = path_opt {
let relative = PathBuf::from(String::from_utf8(path_ivec.to_vec())?);
Ok(Some(self.inner.root_dir.join(relative)))
if let Some(ivec) = path_opt {
let identifier = S::Identifier::from_bytes(ivec.to_vec())?;
Ok(Some(identifier))
} else {
Ok(None)
}
@ -235,22 +223,22 @@ impl UploadManager {
pub(crate) async fn store_variant(
&self,
variant_process_path: Option<&std::path::Path>,
real_path: &std::path::Path,
identifier: &S::Identifier,
filename: &str,
) -> Result<(), Error> {
let path_bytes = self
.generalize_path(real_path)?
.to_str()
.ok_or(UploadError::Path)?
.as_bytes()
.to_vec();
let variant_path = variant_process_path.unwrap_or(real_path);
let key = self.variant_key(variant_path, filename)?;
let path_tree = self.inner.path_tree.clone();
let key = if let Some(path) = variant_process_path {
self.variant_key(path, filename)?
} else {
let mut vec = filename.as_bytes().to_vec();
vec.extend(b"/");
vec.extend(&identifier.to_bytes()?);
vec
};
let identifier_tree = self.inner.identifier_tree.clone();
let identifier_bytes = identifier.to_bytes()?;
debug!("Storing variant");
web::block(move || path_tree.insert(key, path_bytes)).await??;
web::block(move || identifier_tree.insert(key, identifier_bytes)).await??;
debug!("Stored variant");
Ok(())
@ -260,10 +248,10 @@ impl UploadManager {
#[instrument(skip(self))]
pub(crate) async fn variant_details(
&self,
path: PathBuf,
identifier: S::Identifier,
filename: String,
) -> Result<Option<Details>, Error> {
let key = self.details_key(&path, &filename)?;
let key = self.details_key(identifier, &filename)?;
let details_tree = self.inner.details_tree.clone();
debug!("Getting details");
@ -282,11 +270,11 @@ impl UploadManager {
#[instrument(skip(self))]
pub(crate) async fn store_variant_details(
&self,
path: PathBuf,
identifier: S::Identifier,
filename: String,
details: &Details,
) -> Result<(), Error> {
let key = self.details_key(&path, &filename)?;
let key = self.details_key(identifier, &filename)?;
let details_tree = self.inner.details_tree.clone();
let details_value = serde_json::to_vec(details)?;
@ -317,21 +305,6 @@ impl UploadManager {
self.aliases_by_hash(&hash).await
}
pub(crate) fn next_directory(&self) -> Result<PathBuf, Error> {
let path = self.inner.path_gen.next();
self.inner
.settings_tree
.insert(GENERATOR_KEY, path.to_be_bytes())?;
let mut target_path = self.inner.root_dir.join("files");
for dir in path.to_strings() {
target_path.push(dir)
}
Ok(target_path)
}
async fn aliases_by_hash(&self, hash: &sled::IVec) -> Result<Vec<String>, Error> {
let (start, end) = alias_key_bounds(hash);
let main_tree = self.inner.main_tree.clone();
@ -386,26 +359,26 @@ impl UploadManager {
debug!("Deleting alias -> delete-token mapping");
let existing_token = alias_tree
.remove(delete_key(&alias2).as_bytes())?
.ok_or_else(|| trans_err(UploadError::MissingAlias))?;
.ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?;
// Bail if invalid token
if existing_token != token {
warn!("Invalid delete token");
return Err(trans_err(UploadError::InvalidToken));
return Err(trans_upload_error(UploadError::InvalidToken));
}
// -- GET ID FOR HASH TREE CLEANUP --
debug!("Deleting alias -> id mapping");
let id = alias_tree
.remove(alias_id_key(&alias2).as_bytes())?
.ok_or_else(|| trans_err(UploadError::MissingAlias))?;
let id = String::from_utf8(id.to_vec()).map_err(trans_err)?;
.ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?;
let id = String::from_utf8(id.to_vec()).map_err(trans_utf8_error)?;
// -- GET HASH FOR HASH TREE CLEANUP --
debug!("Deleting alias -> hash mapping");
let hash = alias_tree
.remove(alias2.as_bytes())?
.ok_or_else(|| trans_err(UploadError::MissingAlias))?;
.ok_or_else(|| trans_upload_error(UploadError::MissingAlias))?;
// -- REMOVE HASH TREE ELEMENT --
debug!("Deleting hash -> alias mapping");
@ -491,7 +464,7 @@ impl UploadManager {
Ok(filename)
}
pub(crate) fn session(&self) -> UploadManagerSession {
pub(crate) fn session(&self) -> UploadManagerSession<S> {
UploadManagerSession::new(self.clone())
}
@ -501,14 +474,14 @@ impl UploadManager {
let filename = filename.inner;
let filename2 = filename.clone();
let path_tree = self.inner.path_tree.clone();
let path = web::block(move || path_tree.remove(filename2)).await??;
let identifier_tree = self.inner.identifier_tree.clone();
let identifier = web::block(move || identifier_tree.remove(filename2)).await??;
let mut errors = Vec::new();
if let Some(path) = path {
let path = self.inner.root_dir.join(String::from_utf8(path.to_vec())?);
debug!("Deleting {:?}", path);
if let Err(e) = self.remove_path(&path).await {
if let Some(identifier) = identifier {
let identifier = S::Identifier::from_bytes(identifier.to_vec())?;
debug!("Deleting {:?}", identifier);
if let Err(e) = self.store.remove(&identifier).await {
errors.push(e.into());
}
}
@ -519,36 +492,34 @@ impl UploadManager {
web::block(move || fname_tree.remove(filename2)).await??;
let path_prefix = filename.clone();
let path_tree = self.inner.path_tree.clone();
let identifier_tree = self.inner.identifier_tree.clone();
debug!("Fetching file variants");
let paths = web::block(move || {
path_tree
let identifiers = web::block(move || {
identifier_tree
.scan_prefix(path_prefix)
.values()
.collect::<Result<Vec<sled::IVec>, sled::Error>>()
})
.await??;
debug!("{} files prepared for deletion", paths.len());
debug!("{} files prepared for deletion", identifiers.len());
for path in paths {
let path = self
.inner
.root_dir
.join(String::from_utf8_lossy(&path).as_ref());
debug!("Deleting {:?}", path);
if let Err(e) = self.remove_path(&path).await {
for id in identifiers {
let identifier = S::Identifier::from_bytes(id.to_vec())?;
debug!("Deleting {:?}", identifier);
if let Err(e) = self.store.remove(&identifier).await {
errors.push(e);
}
}
let path_prefix = filename.clone();
let path_tree = self.inner.path_tree.clone();
let identifier_tree = self.inner.identifier_tree.clone();
debug!("Deleting path info");
web::block(move || {
for res in path_tree.scan_prefix(path_prefix).keys() {
for res in identifier_tree.scan_prefix(path_prefix).keys() {
let key = res?;
path_tree.remove(key)?;
identifier_tree.remove(key)?;
}
Ok(()) as Result<(), Error>
})
@ -560,30 +531,7 @@ impl UploadManager {
Ok(())
}
async fn try_remove_parents(&self, mut path: &std::path::Path) -> Result<(), Error> {
let root = self.inner.root_dir.join("files");
while let Some(parent) = path.parent() {
if parent.ends_with(&root) {
break;
}
if tokio::fs::remove_dir(parent).await.is_err() {
break;
}
path = parent;
}
Ok(())
}
async fn remove_path(&self, path: &std::path::Path) -> Result<(), Error> {
tokio::fs::remove_file(path).await?;
self.try_remove_parents(path).await
}
fn variant_key(
pub(crate) fn variant_key(
&self,
variant_process_path: &std::path::Path,
filename: &str,
@ -597,15 +545,11 @@ impl UploadManager {
Ok(vec)
}
fn details_key(
&self,
variant_path: &std::path::Path,
filename: &str,
) -> Result<Vec<u8>, Error> {
let path = self.generalize_path(variant_path)?;
let path_string = path.to_str().ok_or(UploadError::Path)?.to_string();
fn details_key(&self, identifier: S::Identifier, filename: &str) -> Result<Vec<u8>, Error> {
let mut vec = filename.as_bytes().to_vec();
vec.extend(b"/");
vec.extend(&identifier.to_bytes()?);
let vec = format!("{}/{}", filename, path_string).as_bytes().to_vec();
Ok(vec)
}
}
@ -623,8 +567,11 @@ impl Details {
}
#[tracing::instrument("Details from bytes", skip(input))]
pub(crate) async fn from_bytes(input: web::Bytes) -> Result<Self, Error> {
let details = crate::magick::details_bytes(input).await?;
pub(crate) async fn from_bytes(
input: web::Bytes,
hint: Option<ValidInputType>,
) -> Result<Self, Error> {
let details = crate::magick::details_bytes(input, hint).await?;
Ok(Details::now(
details.width,
@ -633,12 +580,13 @@ impl Details {
))
}
#[tracing::instrument("Details from path", fields(path = &tracing::field::debug(&path.as_ref())))]
pub(crate) async fn from_path<P>(path: P) -> Result<Self, Error>
where
P: AsRef<std::path::Path>,
{
let details = crate::magick::details(&path).await?;
#[tracing::instrument("Details from store")]
pub(crate) async fn from_store<S: Store>(
store: S,
identifier: S::Identifier,
expected_format: Option<ValidInputType>,
) -> Result<Self, Error> {
let details = crate::magick::details_store(store, identifier, expected_format).await?;
Ok(Details::now(
details.width,
@ -671,14 +619,14 @@ impl FilenameIVec {
}
}
fn init_generator(settings: &sled::Tree) -> Result<Generator, Error> {
if let Some(ivec) = settings.get(GENERATOR_KEY)? {
Ok(Generator::from_existing(Path::from_be_bytes(
ivec.to_vec(),
)?))
} else {
Ok(Generator::new())
}
fn trans_upload_error(
upload_error: UploadError,
) -> sled::transaction::ConflictableTransactionError<Error> {
trans_err(upload_error)
}
fn trans_utf8_error(e: FromUtf8Error) -> sled::transaction::ConflictableTransactionError<Error> {
trans_err(e)
}
fn trans_err<E>(e: E) -> sled::transaction::ConflictableTransactionError<Error>
@ -706,7 +654,7 @@ impl<T> DerefMut for Serde<T> {
}
}
impl std::fmt::Debug for UploadManager {
impl<S> std::fmt::Debug for UploadManager<S> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("UploadManager").finish()
}

View file

@ -1,7 +1,7 @@
use crate::{
error::{Error, UploadError},
migrate::{alias_id_key, alias_key},
to_ext,
store::Store,
upload_manager::{
delete_key,
hasher::{Hash, Hasher},
@ -10,20 +10,24 @@ use crate::{
};
use actix_web::web;
use futures_util::stream::{Stream, StreamExt};
use std::path::PathBuf;
use tokio::io::AsyncRead;
use tracing::{debug, instrument, warn, Span};
use tracing_futures::Instrument;
use uuid::Uuid;
pub(crate) struct UploadManagerSession {
manager: UploadManager,
pub(crate) struct UploadManagerSession<S: Store>
where
Error: From<S::Error>,
{
manager: UploadManager<S>,
alias: Option<String>,
finished: bool,
}
impl UploadManagerSession {
pub(super) fn new(manager: UploadManager) -> Self {
impl<S: Store> UploadManagerSession<S>
where
Error: From<S::Error>,
{
pub(super) fn new(manager: UploadManager<S>) -> Self {
UploadManagerSession {
manager,
alias: None,
@ -51,7 +55,10 @@ impl Dup {
}
}
impl Drop for UploadManagerSession {
impl<S: Store> Drop for UploadManagerSession<S>
where
Error: From<S::Error>,
{
fn drop(&mut self) {
if self.finished {
return;
@ -90,7 +97,10 @@ impl Drop for UploadManagerSession {
}
}
impl UploadManagerSession {
impl<S: Store> UploadManagerSession<S>
where
Error: From<S::Error>,
{
/// Generate a delete token for an alias
#[instrument(skip(self))]
pub(crate) async fn delete_token(&self) -> Result<String, Error> {
@ -129,17 +139,13 @@ impl UploadManagerSession {
/// Upload the file while preserving the filename, optionally validating the uploaded image
#[instrument(skip(self, stream))]
pub(crate) async fn import<E>(
pub(crate) async fn import(
mut self,
alias: String,
content_type: mime::Mime,
validate: bool,
mut stream: impl Stream<Item = Result<web::Bytes, E>> + Unpin,
) -> Result<Self, Error>
where
Error: From<E>,
E: Unpin + 'static,
{
mut stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin,
) -> Result<Self, Error> {
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
@ -158,8 +164,11 @@ impl UploadManagerSession {
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let tmpfile = crate::tmp_file();
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
let identifier = self
.manager
.store
.save_async_read(&mut hasher_reader)
.await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Storing alias");
@ -167,7 +176,7 @@ impl UploadManagerSession {
self.add_existing_alias(&hash, &alias).await?;
debug!("Saving file");
self.save_upload(tmpfile, hash, content_type).await?;
self.save_upload(&identifier, hash, content_type).await?;
// Return alias to file
Ok(self)
@ -175,13 +184,10 @@ impl UploadManagerSession {
/// Upload the file, discarding bytes if it's already present, or saving if it's new
#[instrument(skip(self, stream))]
pub(crate) async fn upload<E>(
pub(crate) async fn upload(
mut self,
mut stream: impl Stream<Item = Result<web::Bytes, E>> + Unpin,
) -> Result<Self, Error>
where
Error: From<E>,
{
mut stream: impl Stream<Item = Result<web::Bytes, Error>> + Unpin,
) -> Result<Self, Error> {
let mut bytes_mut = actix_web::web::BytesMut::new();
debug!("Reading stream to memory");
@ -200,15 +206,18 @@ impl UploadManagerSession {
let mut hasher_reader = Hasher::new(validated_reader, self.manager.inner.hasher.clone());
let tmpfile = crate::tmp_file();
safe_save_reader(tmpfile.clone(), &mut hasher_reader).await?;
let identifier = self
.manager
.store
.save_async_read(&mut hasher_reader)
.await?;
let hash = hasher_reader.finalize_reset().await?;
debug!("Adding alias");
self.add_alias(&hash, content_type.clone()).await?;
debug!("Saving file");
self.save_upload(tmpfile, hash, content_type).await?;
self.save_upload(&identifier, hash, content_type).await?;
// Return alias to file
Ok(self)
@ -217,7 +226,7 @@ impl UploadManagerSession {
// check duplicates & store image if new
async fn save_upload(
&self,
tmpfile: PathBuf,
identifier: &S::Identifier,
hash: Hash,
content_type: mime::Mime,
) -> Result<(), Error> {
@ -225,15 +234,13 @@ impl UploadManagerSession {
// bail early with alias to existing file if this is a duplicate
if dup.exists() {
debug!("Duplicate exists, not saving file");
debug!("Duplicate exists, removing file");
self.manager.store.remove(&identifier).await?;
return Ok(());
}
// -- WRITE NEW FILE --
let real_path = self.manager.next_directory()?.join(&name);
self.manager.store_path(name, &real_path).await?;
crate::safe_move_file(tmpfile, real_path).await?;
self.manager.store_identifier(name, &identifier).await?;
Ok(())
}
@ -248,6 +255,7 @@ impl UploadManagerSession {
let main_tree = self.manager.inner.main_tree.clone();
let filename = self.next_file(content_type).await?;
let filename2 = filename.clone();
let hash2 = hash.as_slice().to_vec();
debug!("Inserting filename for hash");
@ -283,13 +291,11 @@ impl UploadManagerSession {
async fn next_file(&self, content_type: mime::Mime) -> Result<String, Error> {
loop {
debug!("Filename generation loop");
let s: String = Uuid::new_v4().to_string();
let filename = file_name(Uuid::new_v4(), content_type.clone())?;
let filename = file_name(s, content_type.clone())?;
let path_tree = self.manager.inner.path_tree.clone();
let identifier_tree = self.manager.inner.identifier_tree.clone();
let filename2 = filename.clone();
let filename_exists = web::block(move || path_tree.get(filename2.as_bytes()))
let filename_exists = web::block(move || identifier_tree.get(filename2.as_bytes()))
.await??
.is_some();
@ -363,8 +369,7 @@ impl UploadManagerSession {
async fn next_alias(&mut self, hash: &Hash, content_type: mime::Mime) -> Result<String, Error> {
loop {
debug!("Alias gen loop");
let s: String = Uuid::new_v4().to_string();
let alias = file_name(s, content_type.clone())?;
let alias = file_name(Uuid::new_v4(), content_type.clone())?;
self.alias = Some(alias.clone());
let res = self.save_alias_hash_mapping(hash, &alias).await?;
@ -402,31 +407,20 @@ impl UploadManagerSession {
}
}
fn file_name(name: String, content_type: mime::Mime) -> Result<String, Error> {
fn file_name(name: Uuid, content_type: mime::Mime) -> Result<String, Error> {
Ok(format!("{}{}", name, to_ext(content_type)?))
}
#[instrument(skip(input))]
async fn safe_save_reader(to: PathBuf, input: &mut (impl AsyncRead + Unpin)) -> Result<(), Error> {
if let Some(path) = to.parent() {
debug!("Creating directory {:?}", path);
tokio::fs::create_dir_all(path.to_owned()).await?;
}
debug!("Checking if {:?} already exists", to);
if let Err(e) = tokio::fs::metadata(to.clone()).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
fn to_ext(mime: mime::Mime) -> Result<&'static str, Error> {
if mime == mime::IMAGE_PNG {
Ok(".png")
} else if mime == mime::IMAGE_JPEG {
Ok(".jpg")
} else if mime == crate::video_mp4() {
Ok(".mp4")
} else if mime == crate::image_webp() {
Ok(".webp")
} else {
return Err(UploadError::FileExists.into());
Err(UploadError::UnsupportedFormat.into())
}
debug!("Writing stream to {:?}", to);
let mut file = crate::file::File::create(to).await?;
file.write_from_async_read(input).await?;
Ok(())
}