Begin work on spawning processes rather than binding to c libs

This commit is contained in:
Aode (Lion) 2021-08-25 21:46:11 -05:00
parent da60bd1248
commit 7fd707c8df
12 changed files with 503 additions and 347 deletions

589
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -14,14 +14,15 @@ edition = "2018"
actix-form-data = "0.6.0-beta.1"
actix-fs = { git = "https://git.asonix.dog/asonix/actix-fs", branch = "asonix/actix-rt-2" }
actix-rt = "2.2.0"
actix-web = { version = "4.0.0-beta.8", default-features = false, features = ["compress-brotli", "compress-gzip", "compress-zstd"] }
actix-web = { version = "4.0.0-beta.8", default-features = false }
anyhow = "1.0"
async-stream = "0.3.0"
awc = { version = "3.0.0-beta.7", default-features = false, features = ["compress-brotli", "compress-gzip", "compress-zstd", "rustls"] }
awc = { version = "3.0.0-beta.7", default-features = false }
base64 = "0.13.0"
futures = "0.3.4"
magick_rust = { version = "0.14.0", git = "https://git.asonix.dog/asonix/magick-rust.git" }
magick_rust = { version = "0.15.0" }
mime = "0.3.1"
num_cpus = "1"
once_cell = "1.4.0"
rand = "0.8.0"
rexiv2 = "0.9.1"
@ -32,20 +33,18 @@ sled = { version = "0.34.6" }
structopt = "0.3.14"
thiserror = "1.0"
time = { version = "0.2.23", features = ["serde"] }
tokio = { version = "1", default-features = false, features = ["sync", "process"] }
tracing = "0.1.15"
tracing-futures = "0.2.4"
tracing-subscriber = { version = "0.2.5", features = ["fmt", "tracing-log"] }
uuid = { version = "0.8", features = ["v4"] }
[dependencies.ffmpeg-next]
version = "4.3.7"
version = "4.4.0-dev"
default-features = false
features = ["codec", "filter", "device", "format", "resampling", "postprocessing", "software-resampling", "software-scaling"]
git = "https://github.com/jwiesler/rust-ffmpeg"
[dependencies.ffmpeg-sys-next]
version = "4.3.5"
git = "https://github.com/jwiesler/rust-ffmpeg-sys"
branch = "master"
[patch.crates-io]
ffmpeg-sys-next = { git = "https://github.com/jwiesler/rust-ffmpeg-sys", branch = "master" }

View file

@ -5,5 +5,5 @@ ARCH=${1:-amd64}
export USER_ID=$(id -u)
export GROUP_ID=$(id -g)
docker-compose build --pull
docker-compose run --service-ports pictrs-$ARCH
sudo docker-compose build --pull
sudo docker-compose run --service-ports pictrs-$ARCH

View file

@ -28,7 +28,7 @@ require "$REPO" repo
require "$TAG" tag
require "$ARCH" arch
docker build \
sudo docker build \
--pull \
--build-arg TAG=$TAG \
-t $REPO:$ARCH-$TAG \

View file

@ -25,10 +25,10 @@ function build_image() {
./build-image.sh asonix/pictrs $tag $arch
docker tag asonix/pictrs:$arch-$tag asonix/pictrs:$arch-latest
sudo docker tag asonix/pictrs:$arch-$tag asonix/pictrs:$arch-latest
docker push asonix/pictrs:$arch-$tag
docker push asonix/pictrs:$arch-latest
sudo docker push asonix/pictrs:$arch-$tag
sudo docker push asonix/pictrs:$arch-latest
}
# Creating the new tag
@ -38,7 +38,7 @@ branch="$2"
require "$new_tag" "tag"
require "$branch" "branch"
if ! docker run --rm -it arm64v8/alpine:3.11 /bin/sh -c 'echo "docker is configured correctly"'
if ! sudo docker run --rm -it arm64v8/alpine:3.11 /bin/sh -c 'echo "docker is configured correctly"'
then
echo "docker is not configured to run on qemu-emulated architectures, fixing will require sudo"
sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes

View file

@ -23,18 +23,18 @@ require "$new_tag" "tag"
set -xe
docker manifest create asonix/pictrs:$new_tag \
sudo docker manifest create asonix/pictrs:$new_tag \
-a asonix/pictrs:arm64v8-$new_tag \
-a asonix/pictrs:arm32v7-$new_tag \
-a asonix/pictrs:amd64-$new_tag
docker manifest annotate asonix/pictrs:$new_tag \
sudo docker manifest annotate asonix/pictrs:$new_tag \
asonix/pictrs:arm64v8-$new_tag --os linux --arch arm64 --variant v8
docker manifest annotate asonix/pictrs:$new_tag \
sudo docker manifest annotate asonix/pictrs:$new_tag \
asonix/pictrs:arm32v7-$new_tag --os linux --arch arm --variant v7
docker manifest annotate asonix/pictrs:$new_tag \
sudo docker manifest annotate asonix/pictrs:$new_tag \
asonix/pictrs:amd64-$new_tag --os linux --arch amd64
docker manifest push asonix/pictrs:$new_tag --purge
sudo docker manifest push asonix/pictrs:$new_tag --purge

77
src/exiv2.rs Normal file
View file

@ -0,0 +1,77 @@
#[derive(Debug, thiserror::Error)]
pub(crate) enum FormatError {
#[error("Failed to interface with exiv2")]
IO(#[from] std::io::Error),
#[error("Failed to identify file")]
Status,
#[error("Identify semaphore is closed")]
Closed,
#[error("Requested information is not present")]
Missing,
#[error("Requested information was present, but not supported")]
Unsupported,
}
pub(crate) enum ValidInputType {
Mp4,
Gif,
Png,
Jpeg,
}
static MAX_READS: once_cell::sync::OnceCell<tokio::sync::Semaphore> =
once_cell::sync::OnceCell::new();
fn semaphore() -> &'static tokio::sync::Semaphore {
MAX_READS.get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get() * 4))
}
pub(crate) async fn format<P>(file: P) -> Result<ValidInputType, FormatError>
where
P: AsRef<std::path::Path>,
{
let permit = semaphore().acquire().await?;
let output = tokio::process::Command::new("exiv2")
.args([
&AsRef::<std::ffi::OsStr>::as_ref(&"pr"),
&file.as_ref().as_ref(),
])
.output()
.await?;
drop(permit);
if !output.status.success() {
return Err(FormatError::Status);
}
let s = String::from_utf8_lossy(&output.stdout);
let line = s
.lines()
.find(|line| line.starts_with("MIME"))
.ok_or_else(|| FormatError::Missing)?;
let mut segments = line.rsplit(':');
let mime_type = segments.next().ok_or_else(|| FormatError::Missing)?;
let input_type = match mime_type.trim() {
"video/mp4" => ValidInputType::Mp4,
"image/gif" => ValidInputType::Gif,
"image/png" => ValidInputType::Png,
"image/jpeg" => ValidInputType::Jpeg,
_ => return Err(FormatError::Unsupported),
};
Ok(input_type)
}
impl From<tokio::sync::AcquireError> for FormatError {
fn from(_: tokio::sync::AcquireError) -> FormatError {
FormatError::Closed
}
}

106
src/ffmpeg.rs Normal file
View file

@ -0,0 +1,106 @@
#[derive(Debug, thiserror::Error)]
pub(crate) enum VideoError {
#[error("Failed to interface with transcode process")]
IO(#[from] std::io::Error),
#[error("Failed to convert file")]
Status,
#[error("Transcode semaphore is closed")]
Closed,
}
static MAX_TRANSCODES: once_cell::sync::OnceCell<tokio::sync::Semaphore> =
once_cell::sync::OnceCell::new();
fn semaphore() -> &'static tokio::sync::Semaphore {
MAX_TRANSCODES
.get_or_init(|| tokio::sync::Semaphore::new(num_cpus::get().saturating_sub(1).max(1)))
}
pub(crate) async fn thumbnail_jpeg<P1, P2>(from: P1, to: P2) -> Result<(), VideoError>
where
P1: AsRef<std::path::Path>,
P2: AsRef<std::path::Path>,
{
thumbnail(from, to, "mjpeg").await
}
pub(crate) async fn thumbnail_png<P1, P2>(from: P1, to: P2) -> Result<(), VideoError>
where
P1: AsRef<std::path::Path>,
P2: AsRef<std::path::Path>,
{
thumbnail(from, to, "png").await
}
pub(crate) async fn to_mp4<P1, P2>(from: P1, to: P2) -> Result<(), VideoError>
where
P1: AsRef<std::path::Path>,
P2: AsRef<std::path::Path>,
{
let permit = semaphore().acquire().await?;
let mut child = tokio::process::Command::new("ffmpeg")
.args([
&AsRef::<std::ffi::OsStr>::as_ref(&"-i"),
&from.as_ref().as_ref(),
&"-movflags".as_ref(),
&"faststart".as_ref(),
&"-pix_fmt".as_ref(),
&"yuv420p".as_ref(),
&"-vf".as_ref(),
&"scale=trunc(iw/2)*2:truc(ih/2)*2".as_ref(),
&"-an".as_ref(),
&"-codec".as_ref(),
&"h264".as_ref(),
&to.as_ref().as_ref(),
])
.spawn()?;
let status = child.wait().await?;
drop(permit);
if !status.success() {
return Err(VideoError::Status);
}
Ok(())
}
async fn thumbnail<P1, P2>(from: P1, to: P2, codec: &str) -> Result<(), VideoError>
where
P1: AsRef<std::path::Path>,
P2: AsRef<std::path::Path>,
{
let permit = semaphore().acquire().await?;
let mut child = tokio::process::Command::new("ffmpeg")
.args([
&AsRef::<std::ffi::OsStr>::as_ref(&"-i"),
&from.as_ref().as_ref(),
&"-ss".as_ref(),
&"00:00:01.000".as_ref(),
&"-vframes".as_ref(),
&"1".as_ref(),
&"-codec".as_ref(),
&codec.as_ref(),
&to.as_ref().as_ref(),
])
.spawn()?;
let status = child.wait().await?;
drop(permit);
if !status.success() {
return Err(VideoError::Status);
}
Ok(())
}
impl From<tokio::sync::AcquireError> for VideoError {
fn from(_: tokio::sync::AcquireError) -> VideoError {
VideoError::Closed
}
}

28
src/magick.rs Normal file
View file

@ -0,0 +1,28 @@
fn thumbnail_args(max_dimension: usize) -> [String; 2] {
[
"-sample".to_string(),
format!("{}x{}>", max_dimension, max_dimension),
]
}
fn resize_args(max_dimension: usize) -> [String; 4] {
[
"-filter".to_string(),
"Lanczos".to_string(),
"-resize".to_string(),
format!("{}x{}>", max_dimension, max_dimension),
]
}
fn crop_args(width: usize, height: usize) -> [String; 4] {
[
"-gravity".to_string(),
"center".to_string(),
"-crop".to_string(),
format!("{}x{}>", width, height),
]
}
fn blur_args(radius: f64) -> [String; 2] {
["-gaussian-blur".to_string(), radius.to_string()]
}

View file

@ -2,7 +2,7 @@ use actix_form_data::{Field, Form, Value};
use actix_web::{
guard,
http::header::{CacheControl, CacheDirective, LastModified, ACCEPT_RANGES},
middleware::{Compress, Logger},
middleware::Logger,
web, App, HttpResponse, HttpResponseBuilder, HttpServer,
};
use awc::Client;
@ -17,6 +17,8 @@ use tracing_subscriber::EnvFilter;
mod config;
mod error;
mod exiv2;
mod ffmpeg;
mod middleware;
mod migrate;
mod processor;
@ -564,6 +566,7 @@ async fn ranged_file_resp(
None => {
let stream = actix_fs::read_to_stream(path)
.await?
.faster()
.map_err(UploadError::from);
let stream: Pin<Box<dyn Stream<Item = Result<web::Bytes, UploadError>>>> =
Box::pin(stream);
@ -747,7 +750,6 @@ async fn main() -> Result<(), anyhow::Error> {
.finish();
App::new()
.wrap(Compress::default())
.wrap(Logger::default())
.wrap(Tracing)
.app_data(web::Data::new(manager.clone()))

View file

@ -67,6 +67,7 @@ impl Range {
Ok(Box::pin(
actix_fs::file::read_to_stream(file)
.await?
.faster()
.map_err(UploadError::from),
))
}

View file

@ -681,7 +681,7 @@ impl UploadManager {
let mut hasher = self.inner.hasher.clone();
let file = actix_fs::file::open(tmpfile).await?;
let mut stream = Box::pin(actix_fs::file::read_to_stream(file).await?);
let mut stream = Box::pin(actix_fs::file::read_to_stream(file).await?.faster());
while let Some(res) = stream.next().await {
let bytes = res?;
@ -913,7 +913,7 @@ where
let file = actix_fs::file::create(to).await?;
actix_fs::file::write_stream(file, stream.map_err(UploadError::from)).await?;
actix_fs::file::write_stream_faster(file, stream.map_err(UploadError::from)).await?;
Ok(())
}