mirror of
https://git.asonix.dog/asonix/pict-rs.git
synced 2024-06-08 00:09:31 +00:00
Compare commits
43 commits
Author | SHA1 | Date | |
---|---|---|---|
dd01548309 | |||
f05eed2e36 | |||
69470c8a38 | |||
4e75764110 | |||
6ef9dc404f | |||
af3a6a260a | |||
286bc8b97a | |||
8cf8b2bc05 | |||
1c4e343d9d | |||
d03cc63d2b | |||
260f9a158a | |||
a7c78cd54e | |||
d7dc2e506d | |||
e48f60a6c6 | |||
9d01aeb82c | |||
bddfb3c9d0 | |||
7ae3c0c776 | |||
983e9ce151 | |||
9302062b26 | |||
33e72266f5 | |||
39da69b1aa | |||
64b8635059 | |||
d45e3fa386 | |||
bfd4fd4689 | |||
89f3c447a8 | |||
46cfbf99a5 | |||
58529a2eb2 | |||
700aeb90e0 | |||
ff39c30cc8 | |||
9561c578dc | |||
dc7bdf7eeb | |||
33ba045ee1 | |||
f082e48ed8 | |||
97159e0030 | |||
6d40fbee47 | |||
c4e99ef539 | |||
3428c31f16 | |||
4bb3bad703 | |||
4021458be8 | |||
eca3697410 | |||
d41fca5b6c | |||
e3183c923f | |||
d97cfe2a64 |
1261
Cargo.lock
generated
1261
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
29
Cargo.toml
29
Cargo.toml
|
@ -1,7 +1,7 @@
|
|||
[package]
|
||||
name = "pict-rs"
|
||||
description = "A simple image hosting service"
|
||||
version = "0.5.11"
|
||||
version = "0.5.15"
|
||||
authors = ["asonix <asonix@asonix.dog>"]
|
||||
license = "AGPL-3.0"
|
||||
readme = "README.md"
|
||||
|
@ -20,10 +20,10 @@ random-errors = ["dep:nanorand"]
|
|||
|
||||
[dependencies]
|
||||
actix-form-data = "0.7.0-beta.7"
|
||||
actix-web = { version = "4.0.0", default-features = false, features = ["rustls-0_22"] }
|
||||
actix-web = { version = "4.6.0", default-features = false, features = ["rustls-0_23"] }
|
||||
async-trait = "0.1.51"
|
||||
barrel = { version = "0.7.0", features = ["pg"] }
|
||||
base64 = "0.21.0"
|
||||
base64 = "0.22.0"
|
||||
bb8 = "0.8.3"
|
||||
blurhash-update = "0.1.0"
|
||||
clap = { version = "4.0.2", features = ["derive"] }
|
||||
|
@ -34,12 +34,11 @@ dashmap = "5.1.0"
|
|||
diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] }
|
||||
diesel-async = { version = "0.4.1", features = ["bb8", "postgres"] }
|
||||
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
|
||||
flume = "0.11.0"
|
||||
futures-core = "0.3"
|
||||
hex = "0.4.3"
|
||||
md-5 = "0.10.5"
|
||||
metrics = "0.22.0"
|
||||
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = ["http-listener"] }
|
||||
metrics-exporter-prometheus = { version = "0.14.0", default-features = false, features = ["http-listener"] }
|
||||
mime = "0.3.1"
|
||||
nanorand = { version = "0.7", optional = true }
|
||||
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] }
|
||||
|
@ -47,13 +46,14 @@ opentelemetry = "0.22"
|
|||
opentelemetry-otlp = "0.15"
|
||||
pin-project-lite = "0.2.7"
|
||||
refinery = { version = "0.8.10", features = ["tokio-postgres", "postgres"] }
|
||||
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "stream"] }
|
||||
reqwest-middleware = "0.2.2"
|
||||
reqwest-tracing = "0.4.5"
|
||||
# pinned to tokio-postgres-rustls
|
||||
rustls = "0.22.0"
|
||||
reqwest = { version = "0.12.0", default-features = false, features = ["json", "rustls-tls", "stream"] }
|
||||
reqwest-middleware = "0.3.0"
|
||||
reqwest-tracing = "0.5.0"
|
||||
# pinned to tokio-postgres-generic-rustls
|
||||
# pinned to actix-web
|
||||
rustls = "0.23"
|
||||
# pinned to rustls
|
||||
rustls-channel-resolver = "0.2.0"
|
||||
rustls-channel-resolver = "0.3.0"
|
||||
# pinned to rustls
|
||||
rustls-pemfile = "2.0.0"
|
||||
rusty-s3 = "0.5.0"
|
||||
|
@ -69,7 +69,7 @@ thiserror = "1.0"
|
|||
time = { version = "0.3.0", features = ["serde", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["full", "tracing"] }
|
||||
tokio-postgres = { version = "0.7.10", features = ["with-uuid-1", "with-time-0_3", "with-serde_json-1"] }
|
||||
tokio-postgres-rustls = "0.11.0"
|
||||
tokio-postgres-generic-rustls = { version = "0.1.0", default-features = false, features = ["aws-lc-rs"] }
|
||||
tokio-uring = { version = "0.4", optional = true, features = ["bytes"] }
|
||||
tokio-util = { version = "0.7", default-features = false, features = [
|
||||
"codec",
|
||||
|
@ -96,7 +96,4 @@ webpki-roots = "0.26.0"
|
|||
[dependencies.tracing-actix-web]
|
||||
version = "0.7.10"
|
||||
default-features = false
|
||||
features = ["emit_event_on_error", "opentelemetry_0_22"]
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-uring = { version = "0.4", features = ["bytes"] }
|
||||
features = ["opentelemetry_0_22"]
|
||||
|
|
|
@ -16,6 +16,8 @@ concurrency = 32
|
|||
format = "normal"
|
||||
targets = "info"
|
||||
log_spans = false
|
||||
no_ansi = false
|
||||
log_requests = false
|
||||
|
||||
[tracing.console]
|
||||
buffer_capacity = 102400
|
||||
|
|
12
flake.lock
12
flake.lock
|
@ -5,11 +5,11 @@
|
|||
"systems": "systems"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1705309234,
|
||||
"narHash": "sha256-uNRRNRKmJyCRC/8y1RqBkqWBLM034y4qN7EprSdmgyA=",
|
||||
"lastModified": 1710146030,
|
||||
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"rev": "1ef2e671c3b0c19053962c07dbda38332dcebf26",
|
||||
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
|
@ -20,11 +20,11 @@
|
|||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1705133751,
|
||||
"narHash": "sha256-rCIsyE80jgiOU78gCWN3A0wE0tR2GI5nH6MlS+HaaSQ=",
|
||||
"lastModified": 1717196966,
|
||||
"narHash": "sha256-yZKhxVIKd2lsbOqYd5iDoUIwsRZFqE87smE2Vzf6Ck0=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "9b19f5e77dd906cb52dade0b7bd280339d2a1f3d",
|
||||
"rev": "57610d2f8f0937f39dbd72251e9614b1561942d8",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
|
|
20
flake.nix
20
flake.nix
|
@ -15,13 +15,29 @@
|
|||
in
|
||||
{
|
||||
packages = rec {
|
||||
imagemagick7_pict-rs = pkgs.callPackage ./nix/pkgs/imagemagick_pict-rs {};
|
||||
ffmpeg6_pict-rs = pkgs.callPackage ./nix/pkgs/ffmpeg_pict-rs {};
|
||||
|
||||
pict-rs = pkgs.callPackage ./pict-rs.nix {
|
||||
inherit (pkgs.darwin.apple_sdk.frameworks) Security;
|
||||
inherit imagemagick7_pict-rs ffmpeg6_pict-rs;
|
||||
};
|
||||
|
||||
default = pict-rs;
|
||||
};
|
||||
|
||||
docker = pkgs.dockerTools.buildLayeredImage {
|
||||
name = "pict-rs";
|
||||
tag = "latest";
|
||||
|
||||
contents = [ pkgs.tini self.packages.${system}.pict-rs pkgs.bash ];
|
||||
|
||||
config = {
|
||||
Entrypoint = [ "/bin/tini" "--" "/bin/pict-rs" ];
|
||||
Cmd = [ "run" ];
|
||||
};
|
||||
};
|
||||
|
||||
apps = rec {
|
||||
dev = flake-utils.lib.mkApp { drv = self.packages.${system}.pict-rs; };
|
||||
default = dev;
|
||||
|
@ -36,9 +52,9 @@
|
|||
curl
|
||||
diesel-cli
|
||||
exiftool
|
||||
ffmpeg_6-full
|
||||
garage
|
||||
imagemagick
|
||||
self.packages.${system}.imagemagick7_pict-rs
|
||||
self.packages.${system}.ffmpeg6_pict-rs
|
||||
jq
|
||||
minio-client
|
||||
rust-analyzer
|
||||
|
|
5
nix/pkgs/ffmpeg_pict-rs/default.nix
Normal file
5
nix/pkgs/ffmpeg_pict-rs/default.nix
Normal file
|
@ -0,0 +1,5 @@
|
|||
{ ffmpeg_6-headless }:
|
||||
|
||||
ffmpeg_6-headless.override {
|
||||
withWebp = true;
|
||||
}
|
23
nix/pkgs/imagemagick_pict-rs/default.nix
Normal file
23
nix/pkgs/imagemagick_pict-rs/default.nix
Normal file
|
@ -0,0 +1,23 @@
|
|||
{ imagemagick7 }:
|
||||
|
||||
imagemagick7.override {
|
||||
bzip2Support = true;
|
||||
zlibSupport = true;
|
||||
libX11Support = false;
|
||||
libXtSupport = false;
|
||||
fontconfigSupport = false;
|
||||
freetypeSupport = false;
|
||||
libjpegSupport = true;
|
||||
djvulibreSupport = false;
|
||||
lcms2Support = false;
|
||||
openexrSupport = false;
|
||||
libjxlSupport = true;
|
||||
libpngSupport = true;
|
||||
liblqr1Support = false;
|
||||
librsvgSupport = false;
|
||||
libtiffSupport = false;
|
||||
libxml2Support = false;
|
||||
openjpegSupport = true;
|
||||
libwebpSupport = true;
|
||||
libheifSupport = true;
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
{ exiftool
|
||||
, ffmpeg_6-full
|
||||
, imagemagick
|
||||
, ffmpeg6_pict-rs
|
||||
, imagemagick7_pict-rs
|
||||
, lib
|
||||
, makeWrapper
|
||||
, nixosTests
|
||||
|
@ -11,7 +11,7 @@
|
|||
|
||||
rustPlatform.buildRustPackage {
|
||||
pname = "pict-rs";
|
||||
version = "0.5.11";
|
||||
version = "0.5.15";
|
||||
src = ./.;
|
||||
|
||||
cargoLock = {
|
||||
|
@ -27,7 +27,7 @@ rustPlatform.buildRustPackage {
|
|||
|
||||
postInstall = ''
|
||||
wrapProgram $out/bin/pict-rs \
|
||||
--prefix PATH : "${lib.makeBinPath [ imagemagick ffmpeg_6-full exiftool ]}"
|
||||
--prefix PATH : "${lib.makeBinPath [ imagemagick7_pict-rs ffmpeg6_pict-rs exiftool ]}"
|
||||
'';
|
||||
|
||||
passthru.tests = { inherit (nixosTests) pict-rs; };
|
||||
|
|
10
pict-rs.toml
10
pict-rs.toml
|
@ -98,6 +98,16 @@ targets = 'info'
|
|||
# default: false
|
||||
log_spans = false
|
||||
|
||||
## Optional: whether to disable colorized log output
|
||||
# environment variable: PICTRS__TRACING__LOGGING__NO_ANSI
|
||||
# default: false
|
||||
no_ansi = false
|
||||
|
||||
## Optional: whether to log upon request completion
|
||||
# environment variable: PICTRS__TRACING__LOGGING__LOG_REQUESTS
|
||||
# default: false
|
||||
log_requests = false
|
||||
|
||||
|
||||
## Console configuration
|
||||
[tracing.console]
|
||||
|
|
46
releases/0.5.12.md
Normal file
46
releases/0.5.12.md
Normal file
|
@ -0,0 +1,46 @@
|
|||
# pict-rs 0.5.12
|
||||
|
||||
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
|
||||
animations, and videos, as well as providing basic image processing functionality.
|
||||
|
||||
## Overview
|
||||
|
||||
pict-rs 0.5.12 is a bugfix release to remove two issues that, when compounded, would cause pict-rs
|
||||
to fail to process media.
|
||||
|
||||
### Fixes
|
||||
|
||||
- [Panic Handling in Background Jobs](#panic-handling-in-background-jobs)
|
||||
- [BytesStream Divide-by-Zero](#bytes-stream-divide-by-zero)
|
||||
|
||||
|
||||
## Upgrade Notes
|
||||
|
||||
There are no significant differences from 0.5.11. Upgrading should be as simple as pulling a new
|
||||
version of pict-rs.
|
||||
|
||||
|
||||
## Descriptions
|
||||
|
||||
### Panic Handling in Background Jobs
|
||||
|
||||
pict-rs makes an effort to never use explicitly panicking code, but since there's no static way to
|
||||
guarantee that a given function wont panic, pict-rs needs to be able to deal with that. pict-rs
|
||||
0.5.12 now wraps invocations of jobs in spawned tasks, which can catch and report panics that happen
|
||||
in background jobs.
|
||||
|
||||
Previously, a panic in a background job would bring down that thread's job processor, which resulted
|
||||
in future jobs never being processed. Now job processing should properly continue after panics
|
||||
occur.
|
||||
|
||||
|
||||
### BytesStream Divide-by-Zero
|
||||
|
||||
Part of my rework of BytesStream recently included adding debug logs around how many bytes chunks
|
||||
were in a given stream, and their average length. Unfortunately, if there were no bytes in the
|
||||
stream, this would cause the "average chunk length" calculation to divide by 0. In previous versions
|
||||
of pict-rs, this would generally result in a failed request for processed media, but in pict-rs
|
||||
0.5.11 this would end up killing the background jobs processor.
|
||||
|
||||
This specific panic has been fixed by ensuring we divide by the number of chunks or 1, whichever is
|
||||
greater.
|
62
releases/0.5.13.md
Normal file
62
releases/0.5.13.md
Normal file
|
@ -0,0 +1,62 @@
|
|||
# pict-rs 0.5.13
|
||||
|
||||
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
|
||||
animations, and videos, as well as providing basic image processing functionality.
|
||||
|
||||
## Overview
|
||||
|
||||
pict-rs 0.5.13 is a maintenance release aiming to enable better logging in some scenarios.
|
||||
|
||||
### Features
|
||||
|
||||
- [Colorless Logging](#colorless-logging)
|
||||
|
||||
|
||||
### Changes
|
||||
|
||||
- [Remove Flume](#remove-flume)
|
||||
|
||||
|
||||
## Upgrade Notes
|
||||
|
||||
There are no significant changes from 0.5.12. Upgrading should be as simple as pulling a new version
|
||||
of pict-rs.
|
||||
|
||||
|
||||
## Descriptions
|
||||
|
||||
### Colorless Logging
|
||||
|
||||
When opting to use the `json` logger, the tracing subscriber automatically disables colored output.
|
||||
This didn't remove colors from errors, though, and pict-rs hasn't had a way to disable colors while
|
||||
using other log formats. pict-rs 0.5.13 introduces a new configuration value to remove colored
|
||||
output from all logs regardless of logging format.
|
||||
|
||||
With pict-rs.toml
|
||||
```toml
|
||||
[tracing.logging]
|
||||
no_ansi = true
|
||||
```
|
||||
|
||||
With environment variables
|
||||
```bash
|
||||
PICTRS__TRACING__LOGGING__NO_ANSI=true
|
||||
```
|
||||
|
||||
With commandline flags
|
||||
```bash
|
||||
pict-rs --no-log-ansi run
|
||||
```
|
||||
|
||||
Colors in logs can be useful, so I imagine this option won't be used much. There has been a request
|
||||
for this functionality though and it's little cost to maintain.
|
||||
|
||||
|
||||
### Remove Flume
|
||||
|
||||
Recently I've been debugging a memory usage issue in another project of mine. I wasn't able to fully
|
||||
track down the cause, but I did notice that removing the
|
||||
[flume channel library](https://github.com/zesterer/flume) seemed to make the leak go away. Since I
|
||||
also use flume in pict-rs, I'm opting to replace it with tokio's native channel implementation. This
|
||||
may or may not improve memory usage, but it does reduce the depenency count and therefore build time
|
||||
for pict-rs.
|
28
releases/0.5.14.md
Normal file
28
releases/0.5.14.md
Normal file
|
@ -0,0 +1,28 @@
|
|||
# pict-rs 0.5.14
|
||||
|
||||
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
|
||||
animations, and videos, as well as providing basic image processing functionality.
|
||||
|
||||
## Overview
|
||||
|
||||
pict-rs 0.5.14 includes a bugfix for identifying certain MOV videos, as well as updated dependencies.
|
||||
|
||||
### Fixes
|
||||
|
||||
- [Empty Stream Parsing](#empty-stream-parsing)
|
||||
|
||||
|
||||
## Upgrade Notes
|
||||
|
||||
There are no significant changes from 0.5.13. Upgrading should be as simple as pulling a new version
|
||||
of pict-rs.
|
||||
|
||||
|
||||
## Descriptions
|
||||
|
||||
### Empty Stream Parsing
|
||||
|
||||
Certain videos, when identified with ffprobe, contain stream json objects with no fields. This would
|
||||
cause pict-rs to fail to parse the information for these videos, as it expects streams to at least
|
||||
contain a codec field. In pict-rs 0.5.14, empty streams are now considered valid and are simply
|
||||
ignored.
|
58
releases/0.5.15.md
Normal file
58
releases/0.5.15.md
Normal file
|
@ -0,0 +1,58 @@
|
|||
# pict-rs 0.5.15
|
||||
|
||||
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
|
||||
animations, and videos, as well as providing basic image processing functionality.
|
||||
|
||||
## Overview
|
||||
|
||||
pict-rs 0.5.15 includes a bugfix for cleaning proxied media, updated dependencies, and a new option
|
||||
to log requests.
|
||||
|
||||
### Fixes
|
||||
|
||||
- [Proxied Media Cleanup](#proxied-media-cleanup)
|
||||
|
||||
|
||||
### Additions
|
||||
|
||||
- [Request Logging](#request-logging)
|
||||
|
||||
|
||||
## Upgrade Notes
|
||||
|
||||
There are no significant changes from 0.5.14. Upgrading should be a simple as pulling a new version
|
||||
of pict-rs.
|
||||
|
||||
|
||||
## Descriptions
|
||||
|
||||
### Proxied Media Cleanup
|
||||
|
||||
At some point, the cleanup logic for proxied media got flipped around to try removing the internal
|
||||
alias before removing the proxy record. This works fine with a sled backend, but not with a
|
||||
postgres backend, and postgres would complain about invalidating a foreign key relationship.
|
||||
pict-rs 0.5.15 fixes this by ensuring that the related proxy record is cleaned first.
|
||||
|
||||
|
||||
### Request Logging
|
||||
|
||||
A new configuration option has been added to pict-rs as an option to get more information about
|
||||
what pict-rs is doing. By default, pict-rs only logs what it considers to be errors, but when
|
||||
`log_requests` is enabled, it will also log information about successful requests. This can help
|
||||
with debugging without enabling full debug logs or resorting to logging spans.
|
||||
|
||||
It can be configured via toml
|
||||
```toml
|
||||
[tracing.logging]
|
||||
log_requests = true
|
||||
```
|
||||
|
||||
via environment variables
|
||||
```bash
|
||||
PICTRS__TRACING__LOGGING__LOG_REQUESTS=true
|
||||
```
|
||||
|
||||
or via the commandline
|
||||
```bash
|
||||
pict-rs --log-requests run
|
||||
```
|
|
@ -35,7 +35,7 @@ impl BytesStream {
|
|||
tracing::debug!(
|
||||
"BytesStream with {} chunks, avg length {}",
|
||||
bs.chunks_len(),
|
||||
bs.len() / bs.chunks_len()
|
||||
bs.len() / bs.chunks_len().max(1)
|
||||
);
|
||||
|
||||
Ok(bs)
|
||||
|
|
|
@ -18,6 +18,8 @@ impl Args {
|
|||
log_format,
|
||||
log_targets,
|
||||
log_spans,
|
||||
log_requests,
|
||||
no_log_ansi,
|
||||
console_address,
|
||||
console_buffer_capacity,
|
||||
opentelemetry_url,
|
||||
|
@ -38,6 +40,8 @@ impl Args {
|
|||
format: log_format,
|
||||
targets: log_targets.map(Serde::new),
|
||||
log_spans,
|
||||
no_ansi: no_log_ansi,
|
||||
log_requests,
|
||||
},
|
||||
console: Console {
|
||||
address: console_address,
|
||||
|
@ -581,6 +585,10 @@ struct Logging {
|
|||
targets: Option<Serde<Targets>>,
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
log_spans: bool,
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
log_requests: bool,
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
no_ansi: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, serde::Serialize)]
|
||||
|
@ -924,6 +932,13 @@ pub(super) struct Args {
|
|||
/// Whether to log openning and closing of tracing spans to stdout
|
||||
#[arg(long)]
|
||||
log_spans: bool,
|
||||
/// Whether to log request completions at an INFO level
|
||||
#[arg(long)]
|
||||
log_requests: bool,
|
||||
|
||||
#[arg(long)]
|
||||
/// Whether to disable color-codes in log output
|
||||
no_log_ansi: bool,
|
||||
|
||||
/// Address and port to expose tokio-console metrics
|
||||
#[arg(long)]
|
||||
|
|
|
@ -55,6 +55,8 @@ struct LoggingDefaults {
|
|||
format: LogFormat,
|
||||
targets: Serde<Targets>,
|
||||
log_spans: bool,
|
||||
log_requests: bool,
|
||||
no_ansi: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Serialize)]
|
||||
|
@ -235,6 +237,8 @@ impl Default for LoggingDefaults {
|
|||
format: LogFormat::Normal,
|
||||
targets: "info".parse().expect("Valid targets string"),
|
||||
log_spans: false,
|
||||
log_requests: false,
|
||||
no_ansi: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,6 +163,10 @@ pub(crate) struct Logging {
|
|||
pub(crate) targets: Serde<Targets>,
|
||||
|
||||
pub(crate) log_spans: bool,
|
||||
|
||||
pub(crate) no_ansi: bool,
|
||||
|
||||
pub(crate) log_requests: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||
|
|
|
@ -53,6 +53,7 @@ impl FfMpegStreams {
|
|||
FfMpegStream::Unknown { codec_name } => {
|
||||
tracing::info!("Encountered unknown stream {codec_name}");
|
||||
}
|
||||
FfMpegStream::Empty {} => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -135,6 +136,7 @@ enum FfMpegStream {
|
|||
Audio(FfMpegAudioStream),
|
||||
Video(FfMpegVideoStream),
|
||||
Unknown { codec_name: String },
|
||||
Empty {},
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
|
|
35
src/discover/ffmpeg/ffprobe_6_0_mov_details.json
Normal file
35
src/discover/ffmpeg/ffprobe_6_0_mov_details.json
Normal file
|
@ -0,0 +1,35 @@
|
|||
{
|
||||
"programs": [
|
||||
|
||||
],
|
||||
"streams": [
|
||||
{
|
||||
"codec_name": "hevc",
|
||||
"width": 1920,
|
||||
"height": 1080,
|
||||
"pix_fmt": "yuv420p10le",
|
||||
"nb_read_frames": "187",
|
||||
"side_data_list": [
|
||||
{
|
||||
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"codec_name": "aac",
|
||||
"nb_read_frames": "135"
|
||||
},
|
||||
{
|
||||
|
||||
},
|
||||
{
|
||||
|
||||
},
|
||||
{
|
||||
|
||||
}
|
||||
],
|
||||
"format": {
|
||||
"format_name": "mov,mp4,m4a,3gp,3g2,mj2"
|
||||
}
|
||||
}
|
|
@ -1,11 +1,11 @@
|
|||
use crate::formats::{
|
||||
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat, Mp4Codec,
|
||||
WebmAlphaCodec, WebmCodec,
|
||||
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat,
|
||||
Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmCodec,
|
||||
};
|
||||
|
||||
use super::{Discovery, FfMpegDiscovery, PixelFormatOutput};
|
||||
|
||||
fn details_tests() -> [(&'static str, Option<Discovery>); 13] {
|
||||
fn details_tests() -> [(&'static str, Option<Discovery>); 14] {
|
||||
[
|
||||
(
|
||||
"animated_webp",
|
||||
|
@ -151,6 +151,18 @@ fn details_tests() -> [(&'static str, Option<Discovery>); 13] {
|
|||
frames: None,
|
||||
}),
|
||||
),
|
||||
(
|
||||
"mov",
|
||||
Some(Discovery {
|
||||
input: InputFile::Video(InputVideoFormat::Mp4 {
|
||||
video_codec: Mp4Codec::H265,
|
||||
audio_codec: Some(Mp4AudioCodec::Aac),
|
||||
}),
|
||||
width: 1920,
|
||||
height: 1080,
|
||||
frames: Some(187),
|
||||
}),
|
||||
),
|
||||
]
|
||||
}
|
||||
|
||||
|
|
3
src/http1.rs
Normal file
3
src/http1.rs
Normal file
|
@ -0,0 +1,3 @@
|
|||
pub(crate) fn to_actix_status(status: reqwest::StatusCode) -> actix_web::http::StatusCode {
|
||||
actix_web::http::StatusCode::from_u16(status.as_u16()).expect("status codes are always valid")
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
use crate::config::{LogFormat, OpenTelemetry, Tracing};
|
||||
use color_eyre::config::Theme;
|
||||
use console_subscriber::ConsoleLayer;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
|
@ -11,7 +12,15 @@ use tracing_subscriber::{
|
|||
};
|
||||
|
||||
pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
|
||||
color_eyre::install()?;
|
||||
let eyre_theme = if tracing.logging.no_ansi {
|
||||
Theme::new()
|
||||
} else {
|
||||
Theme::dark()
|
||||
};
|
||||
|
||||
color_eyre::config::HookBuilder::new()
|
||||
.theme(eyre_theme)
|
||||
.install()?;
|
||||
|
||||
LogTracer::init()?;
|
||||
|
||||
|
@ -23,7 +32,9 @@ pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
|
|||
FmtSpan::NONE
|
||||
};
|
||||
|
||||
let format_layer = tracing_subscriber::fmt::layer().with_span_events(fmt_span);
|
||||
let format_layer = tracing_subscriber::fmt::layer()
|
||||
.with_span_events(fmt_span)
|
||||
.with_ansi(!tracing.logging.no_ansi);
|
||||
|
||||
match tracing.logging.format {
|
||||
LogFormat::Compact => with_format(format_layer.compact(), tracing),
|
||||
|
|
59
src/lib.rs
59
src/lib.rs
|
@ -14,6 +14,7 @@ mod file_path;
|
|||
mod formats;
|
||||
mod future;
|
||||
mod generate;
|
||||
mod http1;
|
||||
mod ingest;
|
||||
mod init_metrics;
|
||||
mod init_tracing;
|
||||
|
@ -41,18 +42,12 @@ use actix_web::{
|
|||
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
|
||||
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
|
||||
};
|
||||
use details::{ApiDetails, HumanDate};
|
||||
use future::{WithPollTimer, WithTimeout};
|
||||
use futures_core::Stream;
|
||||
use magick::ArcPolicyDir;
|
||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||
use middleware::{Metrics, Payload};
|
||||
use repo::ArcRepo;
|
||||
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
|
||||
use reqwest_tracing::TracingMiddleware;
|
||||
use rustls_channel_resolver::ChannelSender;
|
||||
use rusty_s3::UrlStyle;
|
||||
use state::State;
|
||||
use std::{
|
||||
marker::PhantomData,
|
||||
path::Path,
|
||||
|
@ -61,8 +56,6 @@ use std::{
|
|||
time::{Duration, SystemTime},
|
||||
};
|
||||
use streem::IntoStreamer;
|
||||
use sync::DropHandle;
|
||||
use tmp_file::{ArcTmpDir, TmpDir};
|
||||
use tokio::sync::Semaphore;
|
||||
use tracing::Instrument;
|
||||
use tracing_actix_web::TracingLogger;
|
||||
|
@ -70,20 +63,25 @@ use tracing_actix_web::TracingLogger;
|
|||
use self::{
|
||||
backgrounded::Backgrounded,
|
||||
config::{Configuration, Operation},
|
||||
details::Details,
|
||||
details::{ApiDetails, Details, HumanDate},
|
||||
either::Either,
|
||||
error::{Error, UploadError},
|
||||
formats::InputProcessableFormat,
|
||||
future::{WithPollTimer, WithTimeout},
|
||||
ingest::Session,
|
||||
init_tracing::init_tracing,
|
||||
middleware::{Deadline, Internal},
|
||||
magick::ArcPolicyDir,
|
||||
middleware::{Deadline, Internal, Log, Metrics, Payload},
|
||||
migrate_store::migrate_store,
|
||||
queue::queue_generate,
|
||||
repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult},
|
||||
repo::{sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, Repo, UploadId, UploadResult},
|
||||
serde_str::Serde,
|
||||
state::State,
|
||||
store::{file_store::FileStore, object_store::ObjectStore, Store},
|
||||
stream::empty,
|
||||
sync::DropHandle,
|
||||
tls::Tls,
|
||||
tmp_file::{ArcTmpDir, TmpDir},
|
||||
};
|
||||
|
||||
pub use self::config::{ConfigSource, PictRsConfiguration};
|
||||
|
@ -521,7 +519,7 @@ struct UrlQuery {
|
|||
url: String,
|
||||
|
||||
#[serde(default)]
|
||||
backgrounded: bool,
|
||||
backgrounded: Serde<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
|
@ -562,7 +560,7 @@ async fn download<S: Store + 'static>(
|
|||
|
||||
let stream = download_stream(&url_query.url, &state).await?;
|
||||
|
||||
if url_query.backgrounded {
|
||||
if *url_query.backgrounded {
|
||||
do_download_backgrounded(stream, state, upload_query).await
|
||||
} else {
|
||||
do_download_inline(stream, &state, &upload_query).await
|
||||
|
@ -580,7 +578,7 @@ async fn download_stream<S>(
|
|||
let res = state.client.get(url).send().await?;
|
||||
|
||||
if !res.status().is_success() {
|
||||
return Err(UploadError::Download(res.status()).into());
|
||||
return Err(UploadError::Download(http1::to_actix_status(res.status())).into());
|
||||
}
|
||||
|
||||
let stream = crate::stream::limit(
|
||||
|
@ -1743,6 +1741,7 @@ async fn launch<
|
|||
spawn_workers(state.clone());
|
||||
|
||||
App::new()
|
||||
.wrap(Log::new(state.config.tracing.logging.log_requests))
|
||||
.wrap(TracingLogger::default())
|
||||
.wrap(Deadline)
|
||||
.wrap(Metrics)
|
||||
|
@ -1763,7 +1762,7 @@ async fn launch<
|
|||
|
||||
tracing::info!("Starting pict-rs with TLS on {address}");
|
||||
|
||||
server.bind_rustls_0_22(address, config)?.run().await?;
|
||||
server.bind_rustls_0_23(address, config)?.run().await?;
|
||||
|
||||
handle.abort();
|
||||
let _ = handle.await;
|
||||
|
@ -1868,7 +1867,8 @@ impl<P: AsRef<Path>, T: serde::Serialize> ConfigSource<P, T> {
|
|||
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let configuration = pict_rs::ConfigSource::memory(serde_json::json!({
|
||||
/// "server": {
|
||||
/// "address": "127.0.0.1:8080"
|
||||
/// "address": "127.0.0.1:8080",
|
||||
/// "temporary_directory": "/tmp/t1"
|
||||
/// },
|
||||
/// "repo": {
|
||||
/// "type": "sled",
|
||||
|
@ -1936,6 +1936,19 @@ impl PictRsConfiguration {
|
|||
Ok(self)
|
||||
}
|
||||
|
||||
/// Install aws-lc-rs as the default crypto provider
|
||||
///
|
||||
/// This would happen automatically anyway unless rustls crate features get mixed up
|
||||
pub fn install_crypto_provider(self) -> Self {
|
||||
if rustls::crypto::aws_lc_rs::default_provider()
|
||||
.install_default()
|
||||
.is_err()
|
||||
{
|
||||
tracing::info!("rustls crypto provider already installed");
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Run the pict-rs application on a tokio `LocalSet`
|
||||
///
|
||||
/// This must be called from within `tokio::main` directly
|
||||
|
@ -1945,13 +1958,16 @@ impl PictRsConfiguration {
|
|||
/// #[tokio::main]
|
||||
/// async fn main() -> color_eyre::Result<()> {
|
||||
/// let pict_rs_server = pict_rs::ConfigSource::memory(serde_json::json!({
|
||||
/// "server": {
|
||||
/// "temporary_directory": "/tmp/t2"
|
||||
/// },
|
||||
/// "repo": {
|
||||
/// "type": "sled",
|
||||
/// "path": "/tmp/pict-rs/run-on-localset/sled-repo",
|
||||
/// "path": "/tmp/pict-rs-run-on-localset/sled-repo",
|
||||
/// },
|
||||
/// "store": {
|
||||
/// "type": "filesystem",
|
||||
/// "path": "/tmp/pict-rs/run-on-localset/files",
|
||||
/// "path": "/tmp/pict-rs-run-on-localset/files",
|
||||
/// },
|
||||
/// }))
|
||||
/// .init::<&str>(None)?
|
||||
|
@ -1976,13 +1992,16 @@ impl PictRsConfiguration {
|
|||
/// fn main() -> color_eyre::Result<()> {
|
||||
/// actix_web::rt::System::new().block_on(async move {
|
||||
/// let pict_rs_server = pict_rs::ConfigSource::memory(serde_json::json!({
|
||||
/// "server": {
|
||||
/// "temporary_directory": "/tmp/t3"
|
||||
/// },
|
||||
/// "repo": {
|
||||
/// "type": "sled",
|
||||
/// "path": "/tmp/pict-rs/run/sled-repo",
|
||||
/// "path": "/tmp/pict-rs-run/sled-repo",
|
||||
/// },
|
||||
/// "store": {
|
||||
/// "type": "filesystem",
|
||||
/// "path": "/tmp/pict-rs/run/files",
|
||||
/// "path": "/tmp/pict-rs-run/files",
|
||||
/// },
|
||||
/// }))
|
||||
/// .init::<&str>(None)?
|
||||
|
|
|
@ -4,6 +4,7 @@ fn main() -> color_eyre::Result<()> {
|
|||
pict_rs::PictRsConfiguration::build_default()?
|
||||
.install_tracing()?
|
||||
.install_metrics()?
|
||||
.install_crypto_provider()
|
||||
.run()
|
||||
.await
|
||||
})
|
||||
|
@ -18,6 +19,7 @@ fn main() -> color_eyre::Result<()> {
|
|||
pict_rs::PictRsConfiguration::build_default()?
|
||||
.install_tracing()?
|
||||
.install_metrics()?
|
||||
.install_crypto_provider()
|
||||
.run_on_localset()
|
||||
.await
|
||||
})
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
mod deadline;
|
||||
mod internal;
|
||||
mod log;
|
||||
mod metrics;
|
||||
mod payload;
|
||||
|
||||
pub(crate) use self::deadline::Deadline;
|
||||
pub(crate) use self::internal::Internal;
|
||||
pub(crate) use self::log::Log;
|
||||
pub(crate) use self::metrics::Metrics;
|
||||
pub(crate) use self::payload::Payload;
|
||||
|
|
223
src/middleware/log.rs
Normal file
223
src/middleware/log.rs
Normal file
|
@ -0,0 +1,223 @@
|
|||
use std::future::{ready, Future, Ready};
|
||||
|
||||
use actix_web::{
|
||||
body::MessageBody,
|
||||
dev::{Service, ServiceRequest, ServiceResponse, Transform},
|
||||
http::StatusCode,
|
||||
ResponseError,
|
||||
};
|
||||
|
||||
pub(crate) struct Log {
|
||||
info: bool,
|
||||
}
|
||||
pub(crate) struct LogMiddleware<S> {
|
||||
info: bool,
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl Log {
|
||||
pub(crate) fn new(info: bool) -> Self {
|
||||
Self { info }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct LogError {
|
||||
info: bool,
|
||||
error: actix_web::Error,
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
pub(crate) struct LogFuture<F> {
|
||||
info: bool,
|
||||
|
||||
#[pin]
|
||||
inner: F,
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
pub(crate) struct LogBody<B> {
|
||||
info: bool,
|
||||
|
||||
status: Option<StatusCode>,
|
||||
|
||||
#[pin]
|
||||
inner: B,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, B> Transform<S, ServiceRequest> for Log
|
||||
where
|
||||
B: MessageBody,
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
|
||||
S::Future: 'static,
|
||||
S::Error: Into<actix_web::Error>,
|
||||
{
|
||||
type Response = ServiceResponse<LogBody<B>>;
|
||||
type Error = actix_web::Error;
|
||||
type InitError = ();
|
||||
type Transform = LogMiddleware<S>;
|
||||
type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
||||
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
ready(Ok(LogMiddleware {
|
||||
info: self.info,
|
||||
inner: service,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, B> Service<ServiceRequest> for LogMiddleware<S>
|
||||
where
|
||||
B: MessageBody,
|
||||
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
|
||||
S::Future: 'static,
|
||||
S::Error: Into<actix_web::Error>,
|
||||
{
|
||||
type Response = ServiceResponse<LogBody<B>>;
|
||||
type Error = actix_web::Error;
|
||||
type Future = LogFuture<S::Future>;
|
||||
|
||||
fn poll_ready(
|
||||
&self,
|
||||
ctx: &mut core::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(ctx).map(|res| {
|
||||
res.map_err(|e| {
|
||||
LogError {
|
||||
info: self.info,
|
||||
error: e.into(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn call(&self, req: ServiceRequest) -> Self::Future {
|
||||
LogFuture {
|
||||
info: self.info,
|
||||
inner: self.inner.call(req),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, B, E> Future for LogFuture<F>
|
||||
where
|
||||
B: MessageBody,
|
||||
F: Future<Output = Result<ServiceResponse<B>, E>>,
|
||||
E: Into<actix_web::Error>,
|
||||
{
|
||||
type Output = Result<ServiceResponse<LogBody<B>>, actix_web::Error>;
|
||||
|
||||
fn poll(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
let info = self.info;
|
||||
let this = self.project();
|
||||
|
||||
std::task::Poll::Ready(match std::task::ready!(this.inner.poll(cx)) {
|
||||
Ok(response) => {
|
||||
let status = response.status();
|
||||
|
||||
let status = if response.response().body().size().is_eof() {
|
||||
emit(status, info);
|
||||
None
|
||||
} else {
|
||||
Some(status)
|
||||
};
|
||||
|
||||
Ok(response.map_body(|_, inner| LogBody {
|
||||
info,
|
||||
status,
|
||||
inner,
|
||||
}))
|
||||
}
|
||||
Err(e) => Err(LogError {
|
||||
info,
|
||||
error: e.into(),
|
||||
}
|
||||
.into()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> MessageBody for LogBody<B>
|
||||
where
|
||||
B: MessageBody,
|
||||
{
|
||||
type Error = B::Error;
|
||||
|
||||
fn size(&self) -> actix_web::body::BodySize {
|
||||
self.inner.size()
|
||||
}
|
||||
|
||||
fn poll_next(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Result<actix_web::web::Bytes, Self::Error>>> {
|
||||
let this = self.project();
|
||||
|
||||
let opt = std::task::ready!(this.inner.poll_next(cx));
|
||||
|
||||
if opt.is_none() {
|
||||
if let Some(status) = this.status.take() {
|
||||
emit(status, *this.info);
|
||||
}
|
||||
}
|
||||
|
||||
std::task::Poll::Ready(opt)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for LogError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.error.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for LogError {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
self.error.source()
|
||||
}
|
||||
}
|
||||
|
||||
impl ResponseError for LogError {
|
||||
fn status_code(&self) -> actix_web::http::StatusCode {
|
||||
self.error.as_response_error().status_code()
|
||||
}
|
||||
|
||||
fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
|
||||
let response = self.error.error_response();
|
||||
let status = response.status();
|
||||
|
||||
if response.body().size().is_eof() {
|
||||
emit(status, self.info);
|
||||
response
|
||||
} else {
|
||||
response.map_body(|_, inner| {
|
||||
LogBody {
|
||||
info: self.info,
|
||||
status: Some(status),
|
||||
inner,
|
||||
}
|
||||
.boxed()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn emit(status: StatusCode, info: bool) {
|
||||
if status.is_server_error() {
|
||||
tracing::error!("server error");
|
||||
} else if status.is_client_error() {
|
||||
tracing::warn!("client error");
|
||||
} else if status.is_redirection() {
|
||||
tracing::info!("redirected");
|
||||
} else if info {
|
||||
tracing::info!("completed");
|
||||
} else {
|
||||
tracing::debug!("completed");
|
||||
}
|
||||
}
|
|
@ -45,10 +45,10 @@ impl Drop for MetricsGuard {
|
|||
}
|
||||
}
|
||||
|
||||
async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
|
||||
async fn drain(mut rx: tokio::sync::mpsc::Receiver<actix_web::dev::Payload>) {
|
||||
let mut set = JoinSet::new();
|
||||
|
||||
while let Ok(payload) = rx.recv_async().await {
|
||||
while let Some(payload) = rx.recv().await {
|
||||
tracing::trace!("drain: looping");
|
||||
|
||||
// draining a payload is a best-effort task - if we can't collect in 2 minutes we bail
|
||||
|
@ -94,18 +94,18 @@ async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
|
|||
struct DrainHandle(Option<Rc<tokio::task::JoinHandle<()>>>);
|
||||
|
||||
pub(crate) struct Payload {
|
||||
sender: flume::Sender<actix_web::dev::Payload>,
|
||||
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
|
||||
handle: DrainHandle,
|
||||
}
|
||||
pub(crate) struct PayloadMiddleware<S> {
|
||||
inner: S,
|
||||
sender: flume::Sender<actix_web::dev::Payload>,
|
||||
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
|
||||
_handle: DrainHandle,
|
||||
}
|
||||
|
||||
pub(crate) struct PayloadStream {
|
||||
inner: Option<actix_web::dev::Payload>,
|
||||
sender: flume::Sender<actix_web::dev::Payload>,
|
||||
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
|
||||
}
|
||||
|
||||
impl DrainHandle {
|
||||
|
|
|
@ -59,9 +59,6 @@ impl Drop for MetricsGuard {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct StatusError(ExitStatus);
|
||||
|
||||
pub(crate) struct Process {
|
||||
command: Arc<str>,
|
||||
child: Child,
|
||||
|
@ -487,11 +484,3 @@ impl ProcessRead {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for StatusError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "Command failed with bad status: {}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for StatusError {}
|
||||
|
|
76
src/queue.rs
76
src/queue.rs
|
@ -11,9 +11,11 @@ use crate::{
|
|||
|
||||
use std::{
|
||||
ops::Deref,
|
||||
rc::Rc,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::Instrument;
|
||||
|
||||
pub(crate) mod cleanup;
|
||||
|
@ -297,54 +299,66 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn job_result(result: &JobResult) -> crate::repo::JobResult {
|
||||
fn job_result(result: &Result<JobResult, JoinError>) -> crate::repo::JobResult {
|
||||
match result {
|
||||
Ok(()) => crate::repo::JobResult::Success,
|
||||
Err(JobError::Retry(_)) => crate::repo::JobResult::Failure,
|
||||
Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted,
|
||||
Ok(Ok(())) => crate::repo::JobResult::Success,
|
||||
Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure,
|
||||
Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted,
|
||||
Err(_) => crate::repo::JobResult::Aborted,
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
|
||||
where
|
||||
S: Store,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
|
||||
S: Store + 'static,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
|
||||
{
|
||||
let worker_id = uuid::Uuid::new_v4();
|
||||
let state = Rc::new(state);
|
||||
|
||||
loop {
|
||||
tracing::trace!("process_jobs: looping");
|
||||
|
||||
crate::sync::cooperate().await;
|
||||
|
||||
let res = job_loop(&state, worker_id, queue, callback)
|
||||
.with_poll_timer("job-loop")
|
||||
.await;
|
||||
// add a panic boundary by spawning a task
|
||||
let res = crate::sync::spawn(
|
||||
"job-loop",
|
||||
job_loop(state.clone(), worker_id, queue, callback),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(e) = res {
|
||||
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||
tracing::warn!("{}", format!("{e:?}"));
|
||||
match res {
|
||||
// clean exit
|
||||
Ok(Ok(())) => break,
|
||||
|
||||
if e.is_disconnected() {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
// job error
|
||||
Ok(Err(e)) => {
|
||||
tracing::warn!("Error processing jobs: {}", format!("{e}"));
|
||||
tracing::warn!("{}", format!("{e:?}"));
|
||||
|
||||
if e.is_disconnected() {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
// job panic
|
||||
Err(_) => {
|
||||
tracing::warn!("Panic while processing jobs");
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
async fn job_loop<S, F>(
|
||||
state: &State<S>,
|
||||
state: Rc<State<S>>,
|
||||
worker_id: uuid::Uuid,
|
||||
queue: &'static str,
|
||||
callback: F,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: Store,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
|
||||
S: Store + 'static,
|
||||
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
|
||||
{
|
||||
loop {
|
||||
tracing::trace!("job_loop: looping");
|
||||
|
@ -360,14 +374,18 @@ where
|
|||
|
||||
let guard = MetricsGuard::guard(worker_id, queue);
|
||||
|
||||
let res = heartbeat(
|
||||
&state.repo,
|
||||
queue,
|
||||
worker_id,
|
||||
job_id,
|
||||
(callback)(state, job),
|
||||
)
|
||||
.with_poll_timer("job-and-heartbeat")
|
||||
let state2 = state.clone();
|
||||
let res = crate::sync::spawn("job-and-heartbeat", async move {
|
||||
let state = state2;
|
||||
heartbeat(
|
||||
&state.repo,
|
||||
queue,
|
||||
worker_id,
|
||||
job_id,
|
||||
(callback)(&state, job),
|
||||
)
|
||||
.await
|
||||
})
|
||||
.await;
|
||||
|
||||
state
|
||||
|
@ -376,7 +394,7 @@ where
|
|||
.with_poll_timer("job-complete")
|
||||
.await?;
|
||||
|
||||
res?;
|
||||
res.map_err(|_| UploadError::Canceled)??;
|
||||
|
||||
guard.disarm();
|
||||
|
||||
|
|
|
@ -147,15 +147,24 @@ async fn hash(repo: &ArcRepo, hash: Hash) -> JobResult {
|
|||
pub(crate) async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> JobResult {
|
||||
let saved_delete_token = repo.delete_token(&alias).await.retry()?;
|
||||
|
||||
if saved_delete_token.is_none() {
|
||||
let hash = repo.hash(&alias).await.retry()?;
|
||||
|
||||
// already deleted
|
||||
if hash.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if !saved_delete_token.is_some_and(|t| t.ct_eq(&token)) {
|
||||
return Err(UploadError::InvalidToken).abort();
|
||||
}
|
||||
|
||||
let hash = repo.hash(&alias).await.retry()?;
|
||||
|
||||
repo.cleanup_alias(&alias).await.retry()?;
|
||||
repo.remove_relation(alias.clone()).await.retry()?;
|
||||
repo.remove_alias_access(alias.clone()).await.retry()?;
|
||||
repo.cleanup_alias(&alias).await.retry()?;
|
||||
|
||||
let hash = hash.ok_or(UploadError::MissingAlias).abort()?;
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use time::Instant;
|
||||
use tracing::{Instrument, Span};
|
||||
|
||||
use crate::{
|
||||
|
@ -13,7 +12,7 @@ use crate::{
|
|||
store::Store,
|
||||
UploadQuery,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use std::{sync::Arc, time::Instant};
|
||||
|
||||
use super::{JobContext, JobFuture, JobResult};
|
||||
|
||||
|
@ -90,7 +89,7 @@ impl UploadGuard {
|
|||
impl Drop for UploadGuard {
|
||||
fn drop(&mut self) {
|
||||
metrics::counter!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST, "completed" => (!self.armed).to_string()).increment(1);
|
||||
metrics::histogram!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST_DURATION, "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_seconds_f64());
|
||||
metrics::histogram!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST_DURATION, "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_secs_f64());
|
||||
|
||||
if self.armed {
|
||||
tracing::warn!(
|
||||
|
|
|
@ -444,7 +444,6 @@ where
|
|||
pub(crate) trait SettingsRepo: BaseRepo {
|
||||
async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError>;
|
||||
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError>;
|
||||
async fn remove(&self, key: &'static str) -> Result<(), RepoError>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
|
@ -459,10 +458,6 @@ where
|
|||
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
|
||||
T::get(self, key).await
|
||||
}
|
||||
|
||||
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
|
||||
T::remove(self, key).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
use dashmap::DashMap;
|
||||
use dashmap::{mapref::entry::Entry, DashMap};
|
||||
use std::{
|
||||
future::Future,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
sync::{Arc, Weak},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::Notify;
|
||||
|
@ -26,7 +23,6 @@ struct NotificationEntryInner {
|
|||
key: Arc<str>,
|
||||
map: Map,
|
||||
notify: Notify,
|
||||
armed: AtomicBool,
|
||||
}
|
||||
|
||||
impl NotificationMap {
|
||||
|
@ -37,30 +33,34 @@ impl NotificationMap {
|
|||
}
|
||||
|
||||
pub(super) fn register_interest(&self, key: Arc<str>) -> NotificationEntry {
|
||||
let new_entry = Arc::new(NotificationEntryInner {
|
||||
key: key.clone(),
|
||||
map: self.map.clone(),
|
||||
notify: crate::sync::bare_notify(),
|
||||
armed: AtomicBool::new(false),
|
||||
});
|
||||
match self.map.entry(key.clone()) {
|
||||
Entry::Occupied(mut occupied) => {
|
||||
if let Some(inner) = occupied.get().upgrade() {
|
||||
NotificationEntry { inner }
|
||||
} else {
|
||||
let inner = Arc::new(NotificationEntryInner {
|
||||
key,
|
||||
map: self.map.clone(),
|
||||
notify: crate::sync::bare_notify(),
|
||||
});
|
||||
|
||||
let mut key_entry = self
|
||||
.map
|
||||
.entry(key)
|
||||
.or_insert_with(|| Arc::downgrade(&new_entry));
|
||||
occupied.insert(Arc::downgrade(&inner));
|
||||
|
||||
let upgraded_entry = key_entry.value().upgrade();
|
||||
NotificationEntry { inner }
|
||||
}
|
||||
}
|
||||
Entry::Vacant(vacant) => {
|
||||
let inner = Arc::new(NotificationEntryInner {
|
||||
key,
|
||||
map: self.map.clone(),
|
||||
notify: crate::sync::bare_notify(),
|
||||
});
|
||||
|
||||
let inner = if let Some(entry) = upgraded_entry {
|
||||
entry
|
||||
} else {
|
||||
*key_entry.value_mut() = Arc::downgrade(&new_entry);
|
||||
new_entry
|
||||
};
|
||||
vacant.insert(Arc::downgrade(&inner));
|
||||
|
||||
inner.armed.store(true, Ordering::Release);
|
||||
|
||||
NotificationEntry { inner }
|
||||
NotificationEntry { inner }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn notify(&self, key: &str) {
|
||||
|
@ -87,8 +87,6 @@ impl Default for NotificationMap {
|
|||
|
||||
impl Drop for NotificationEntryInner {
|
||||
fn drop(&mut self) {
|
||||
if self.armed.load(Ordering::Acquire) {
|
||||
self.map.remove(&self.key);
|
||||
}
|
||||
self.map.remove(&self.key);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ use diesel_async::{
|
|||
use futures_core::Stream;
|
||||
use tokio::sync::Notify;
|
||||
use tokio_postgres::{AsyncMessage, Connection, NoTls, Notification, Socket};
|
||||
use tokio_postgres_rustls::MakeRustlsConnect;
|
||||
use tokio_postgres_generic_rustls::{AwsLcRsDigest, MakeRustlsConnect};
|
||||
use tracing::Instrument;
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
|
@ -173,7 +173,7 @@ impl PostgresError {
|
|||
|
||||
async fn build_tls_connector(
|
||||
certificate_file: Option<PathBuf>,
|
||||
) -> Result<MakeRustlsConnect, TlsError> {
|
||||
) -> Result<MakeRustlsConnect<AwsLcRsDigest>, TlsError> {
|
||||
let mut cert_store = rustls::RootCertStore {
|
||||
roots: Vec::from(webpki_roots::TLS_SERVER_ROOTS),
|
||||
};
|
||||
|
@ -199,14 +199,14 @@ async fn build_tls_connector(
|
|||
.with_root_certificates(cert_store)
|
||||
.with_no_client_auth();
|
||||
|
||||
let tls = MakeRustlsConnect::new(config);
|
||||
let tls = MakeRustlsConnect::new(config, AwsLcRsDigest);
|
||||
|
||||
Ok(tls)
|
||||
}
|
||||
|
||||
async fn connect_for_migrations(
|
||||
postgres_url: &Url,
|
||||
tls_connector: Option<MakeRustlsConnect>,
|
||||
tls_connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
|
||||
) -> Result<
|
||||
(
|
||||
tokio_postgres::Client,
|
||||
|
@ -265,8 +265,8 @@ where
|
|||
|
||||
async fn build_pool(
|
||||
postgres_url: &Url,
|
||||
tx: flume::Sender<Notification>,
|
||||
connector: Option<MakeRustlsConnect>,
|
||||
tx: tokio::sync::mpsc::Sender<Notification>,
|
||||
connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
|
||||
max_size: u32,
|
||||
) -> Result<Pool<AsyncPgConnection>, ConnectPostgresError> {
|
||||
let mut config = ManagerConfig::default();
|
||||
|
@ -319,7 +319,7 @@ impl PostgresRepo {
|
|||
.map(|u| u.into())
|
||||
.unwrap_or(1_usize);
|
||||
|
||||
let (tx, rx) = flume::bounded(10);
|
||||
let (tx, rx) = crate::sync::channel(10);
|
||||
|
||||
let pool = build_pool(
|
||||
&postgres_url,
|
||||
|
@ -621,7 +621,7 @@ type ConfigFn =
|
|||
Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>;
|
||||
|
||||
async fn delegate_notifications(
|
||||
receiver: flume::Receiver<Notification>,
|
||||
mut receiver: tokio::sync::mpsc::Receiver<Notification>,
|
||||
inner: Arc<Inner>,
|
||||
capacity: usize,
|
||||
) {
|
||||
|
@ -636,7 +636,7 @@ async fn delegate_notifications(
|
|||
|
||||
let keyed_notifier_state = KeyedNotifierState { inner: &inner };
|
||||
|
||||
while let Ok(notification) = receiver.recv_async().await {
|
||||
while let Some(notification) = receiver.recv().await {
|
||||
tracing::trace!("delegate_notifications: looping");
|
||||
metrics::counter!(crate::init_metrics::POSTGRES_NOTIFICATION).increment(1);
|
||||
|
||||
|
@ -666,8 +666,8 @@ async fn delegate_notifications(
|
|||
}
|
||||
|
||||
fn build_handler(
|
||||
sender: flume::Sender<Notification>,
|
||||
connector: Option<MakeRustlsConnect>,
|
||||
sender: tokio::sync::mpsc::Sender<Notification>,
|
||||
connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
|
||||
) -> ConfigFn {
|
||||
Box::new(
|
||||
move |config: &str| -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> {
|
||||
|
@ -708,7 +708,7 @@ fn build_handler(
|
|||
}
|
||||
|
||||
fn spawn_db_notification_task<S>(
|
||||
sender: flume::Sender<Notification>,
|
||||
sender: tokio::sync::mpsc::Sender<Notification>,
|
||||
mut conn: Connection<Socket, S>,
|
||||
) where
|
||||
S: tokio_postgres::tls::TlsStream + Send + Unpin + 'static,
|
||||
|
@ -729,7 +729,7 @@ fn spawn_db_notification_task<S>(
|
|||
tracing::warn!("Database Notice {e:?}");
|
||||
}
|
||||
Ok(AsyncMessage::Notification(notification)) => {
|
||||
if sender.send_async(notification).await.is_err() {
|
||||
if sender.send(notification).await.is_err() {
|
||||
tracing::warn!("Missed notification. Are we shutting down?");
|
||||
}
|
||||
}
|
||||
|
@ -1412,24 +1412,6 @@ impl SettingsRepo for PostgresRepo {
|
|||
|
||||
Ok(opt)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> {
|
||||
use schema::settings::dsl::*;
|
||||
|
||||
let mut conn = self.get_connection().await?;
|
||||
|
||||
diesel::delete(settings)
|
||||
.filter(key.eq(input_key))
|
||||
.execute(&mut conn)
|
||||
.with_metrics(crate::init_metrics::POSTGRES_SETTINGS_REMOVE)
|
||||
.with_timeout(Duration::from_secs(5))
|
||||
.await
|
||||
.map_err(|_| PostgresError::DbTimeout)?
|
||||
.map_err(PostgresError::Diesel)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait(?Send)]
|
||||
|
|
|
@ -807,7 +807,7 @@ impl QueueRepo for SledRepo {
|
|||
.read()
|
||||
.unwrap()
|
||||
.get(&queue_name)
|
||||
.map(Arc::clone);
|
||||
.cloned();
|
||||
|
||||
let notify = if let Some(notify) = opt {
|
||||
notify
|
||||
|
@ -945,13 +945,6 @@ impl SettingsRepo for SledRepo {
|
|||
|
||||
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self))]
|
||||
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
|
||||
b!(self.settings, settings.remove(key));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn variant_access_key(hash: &[u8], variant: &str) -> Vec<u8> {
|
||||
|
|
|
@ -72,7 +72,7 @@ impl From<crate::store::object_store::ObjectError> for StoreError {
|
|||
fn from(value: crate::store::object_store::ObjectError) -> Self {
|
||||
match value {
|
||||
e @ crate::store::object_store::ObjectError::Status(
|
||||
actix_web::http::StatusCode::NOT_FOUND,
|
||||
reqwest::StatusCode::NOT_FOUND,
|
||||
_,
|
||||
_,
|
||||
) => Self::ObjectNotFound(e),
|
||||
|
|
|
@ -4,16 +4,16 @@ use crate::{
|
|||
};
|
||||
use actix_web::{
|
||||
error::BlockingError,
|
||||
http::{
|
||||
header::{ByteRangeSpec, Range, CONTENT_LENGTH},
|
||||
StatusCode,
|
||||
},
|
||||
http::header::{ByteRangeSpec, Range},
|
||||
rt::task::JoinError,
|
||||
web::Bytes,
|
||||
};
|
||||
use base64::{prelude::BASE64_STANDARD, Engine};
|
||||
use futures_core::Stream;
|
||||
use reqwest::{header::RANGE, Body, Response};
|
||||
use reqwest::{
|
||||
header::{CONTENT_LENGTH, RANGE},
|
||||
Body, Response, StatusCode,
|
||||
};
|
||||
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
|
||||
use rusty_s3::{
|
||||
actions::{CreateMultipartUpload, S3Action},
|
||||
|
|
|
@ -91,7 +91,7 @@ where
|
|||
S: Stream + 'static,
|
||||
S::Item: Send + Sync,
|
||||
{
|
||||
let (tx, rx) = crate::sync::channel(1);
|
||||
let (tx, mut rx) = crate::sync::channel(1);
|
||||
|
||||
let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move {
|
||||
let stream = std::pin::pin!(stream);
|
||||
|
@ -100,16 +100,14 @@ where
|
|||
while let Some(res) = streamer.next().await {
|
||||
tracing::trace!("make send tx: looping");
|
||||
|
||||
if tx.send_async(res).await.is_err() {
|
||||
if tx.send(res).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
streem::from_fn(|yiedler| async move {
|
||||
let mut stream = rx.into_stream().into_streamer();
|
||||
|
||||
while let Some(res) = stream.next().await {
|
||||
while let Some(res) = rx.recv().await {
|
||||
tracing::trace!("make send rx: looping");
|
||||
|
||||
yiedler.yield_(res).await;
|
||||
|
@ -124,20 +122,18 @@ where
|
|||
I: IntoIterator + Send + 'static,
|
||||
I::Item: Send + Sync,
|
||||
{
|
||||
let (tx, rx) = crate::sync::channel(buffer);
|
||||
let (tx, mut rx) = crate::sync::channel(buffer);
|
||||
|
||||
let handle = crate::sync::spawn_blocking("blocking-iterator", move || {
|
||||
for value in iterator {
|
||||
if tx.send(value).is_err() {
|
||||
if tx.blocking_send(value).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
streem::from_fn(|yielder| async move {
|
||||
let mut stream = rx.into_stream().into_streamer();
|
||||
|
||||
while let Some(res) = stream.next().await {
|
||||
while let Some(res) = rx.recv().await {
|
||||
tracing::trace!("from_iterator: looping");
|
||||
|
||||
yielder.yield_(res).await;
|
||||
|
|
|
@ -39,11 +39,13 @@ impl<T> std::future::Future for DropHandle<T> {
|
|||
}
|
||||
|
||||
#[track_caller]
|
||||
pub(crate) fn channel<T>(bound: usize) -> (flume::Sender<T>, flume::Receiver<T>) {
|
||||
pub(crate) fn channel<T>(
|
||||
bound: usize,
|
||||
) -> (tokio::sync::mpsc::Sender<T>, tokio::sync::mpsc::Receiver<T>) {
|
||||
let span = tracing::trace_span!(parent: None, "make channel");
|
||||
let guard = span.enter();
|
||||
|
||||
let channel = flume::bounded(bound);
|
||||
let channel = tokio::sync::mpsc::channel(bound);
|
||||
|
||||
drop(guard);
|
||||
channel
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use rustls::{crypto::ring::sign::any_supported_type, sign::CertifiedKey, Error};
|
||||
use rustls::{crypto::aws_lc_rs::sign::any_supported_type, sign::CertifiedKey, Error};
|
||||
|
||||
pub(super) struct Tls {
|
||||
certificate: PathBuf,
|
||||
|
|
Loading…
Reference in a new issue