Compare commits

...

54 commits

Author SHA1 Message Date
asonix 1c4e343d9d Prepare 0.5.14 2024-05-20 22:23:08 -05:00
asonix d03cc63d2b ffprobe: handle files with empty stream json 2024-05-20 22:08:54 -05:00
asonix 260f9a158a Update dependencies (minor & point) 2024-05-19 10:39:21 -05:00
asonix a7c78cd54e Merge pull request 'Update rustls for tokio-postgres' (#58) from asonix/update-tokio-postgres-rustls into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/58
2024-05-19 15:35:47 +00:00
asonix d7dc2e506d Merge branch 'main' into asonix/update-tokio-postgres-rustls 2024-05-19 10:21:12 -05:00
asonix e48f60a6c6 Merge pull request 'Update rustls for actix-web' (#61) from asonix/update-actix-web-rustls into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/61
2024-05-19 15:18:37 +00:00
asonix 9d01aeb82c Update rustls for actix-web
includes update for rustls-channel-resolver
2024-05-19 10:08:48 -05:00
asonix bddfb3c9d0 Merge branch 'main' into asonix/update-tokio-postgres-rustls 2024-05-19 09:40:45 -05:00
asonix 7ae3c0c776 Merge pull request 'Update reqwest to 0.12' (#59) from asonix/update-reqwest into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/59
2024-05-19 14:37:50 +00:00
asonix 983e9ce151 Merge branch 'main' into asonix/update-reqwest 2024-05-19 09:36:54 -05:00
asonix 9302062b26 Merge pull request 'Update metrics-exporter-prometheus' (#60) from asonix/update-metrics-exporter-prometheus into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/60
2024-05-19 14:35:27 +00:00
asonix 33e72266f5 Add public mechanism for installing aws-lc-rs 2024-05-03 23:05:17 -05:00
asonix 39da69b1aa Use tokio-postgres-generic-rustls 2024-05-03 22:39:30 -05:00
asonix 64b8635059 Update rustls for tokio-postgres
This doesn't update rustls for actix-web (0.22), or rustls for reqwest (0.21)
2024-05-03 22:39:30 -05:00
asonix d45e3fa386 Remove unused 'remove' repo method 2024-05-03 22:35:20 -05:00
asonix bfd4fd4689 Remove unused StatusError type 2024-05-03 22:34:18 -05:00
asonix 89f3c447a8 clippy 2024-05-01 14:57:03 -05:00
asonix 46cfbf99a5 Update metrics-exporter-prometheus
This pulls in hyper 1 and http 1
2024-05-01 14:50:20 -05:00
asonix 58529a2eb2 Update reqwest to 0.12
This pulls in hyper 1 and http 1, but removes rustls 0.21
2024-05-01 14:46:29 -05:00
asonix 700aeb90e0 Fix time deprecation warnings 2024-05-01 14:33:07 -05:00
asonix ff39c30cc8 Update direct base64 dependency 2024-05-01 14:32:26 -05:00
asonix 9561c578dc Update dependencies (minor & point) 2024-05-01 14:30:22 -05:00
asonix dc7bdf7eeb Update flake.lock 2024-04-21 21:02:42 -05:00
asonix 33ba045ee1 Apparently imagemagick needs a shell to delegate to ffmpeg properly 2024-04-21 21:02:31 -05:00
asonix f082e48ed8 Attempt to set up nix-based docker for pict-rs
There's a bug when converting APNG files to WEBP files, which
imagemagick delegates to ffmpeg. When doing 'nix build' and running the
result, or running pict-rs in the dev shell, it works fine. In the
container, this doesn't work at all. imagemagick complains that there's
no media to convert, implying ffmpeg has output a zero-sized file.

This work is helping to narrow down exactly what pict-rs needs to run,
though. This still needs to be tested against h264, h265, vp8, vp9 and
av1.
2024-04-21 14:31:03 -05:00
asonix 97159e0030 Update release document 2024-04-15 21:17:40 -05:00
asonix 6d40fbee47 Prepare 0.5.13 2024-04-15 15:31:31 -05:00
asonix c4e99ef539 Add ability to disable colorized logs 2024-04-15 15:16:10 -05:00
asonix 3428c31f16 Use tokio channels again 2024-04-14 20:06:58 -05:00
asonix 4bb3bad703 Prepare 0.5.12 2024-04-05 13:05:16 -05:00
asonix 4021458be8 Prevent divided-by-zero for empty BytesStreams 2024-04-05 12:57:40 -05:00
asonix eca3697410 Add panic boundaries for background jobs 2024-04-05 12:57:32 -05:00
asonix d41fca5b6c Don't let the doctests step on each other via /tmp 2024-04-04 14:39:30 -05:00
asonix e3183c923f Remove dev-dependency on tokio-uring - unneeded 2024-04-04 12:53:08 -05:00
asonix d97cfe2a64 Remove 'armed' from NotificationEntryInner by only creating them when needed 2024-04-03 13:22:34 -05:00
asonix cef9a68307 Update dependencies (minor & point) 2024-04-01 18:08:57 -05:00
asonix 5f9efb2e1a Prepare 0.5.11 2024-04-01 18:08:46 -05:00
asonix dfb38c7144 Merge pull request 'Background variant processing' (#56) from asonix/backgrounded-variants into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/56
2024-04-01 22:17:30 +00:00
asonix a3bce4c2d3 clippy 2024-04-01 17:06:36 -05:00
asonix c013f697fd Update readme with new API information 2024-04-01 17:01:52 -05:00
asonix 960f6487b7 Queue generation jobs 2024-03-31 20:26:15 -05:00
asonix cd6fb84cc4 Add timeout, metrics back to processor 2024-03-31 16:34:50 -05:00
asonix 056b96d0ad Rename thumbnail_args to variant_args 2024-03-31 16:23:34 -05:00
asonix 74885f2932 Share notification map between sled, postgres 2024-03-31 16:00:23 -05:00
asonix d9d5ac5388 Make postgres work 2024-03-30 14:11:12 -05:00
asonix 612e4017d5 Postgres compiles 2024-03-30 12:10:31 -05:00
asonix b43a435e64 Broken!!!!! 2024-03-30 09:36:31 -05:00
asonix 6e9239fa36 Move variant methods into variant repo trait 2024-03-28 12:04:40 -05:00
asonix 525deffd8d Merge pull request 'Add per-upload limits and per-upload preprocess steps' (#55) from asonix/per-upload-limits-and-operations into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/55
2024-03-28 01:17:32 +00:00
asonix fe5a5723be Merge branch 'main' into asonix/per-upload-limits-and-operations 2024-03-27 19:20:50 -05:00
asonix 3211ce459e Update dependencies (minor & point) 2024-03-27 19:11:41 -05:00
asonix 4b46f1ae2a Use stable actix-form-data 2024-03-27 19:10:58 -05:00
asonix 55bc4b64c1 Add per-upload validations and per-upload preprocess steps 2024-03-27 19:00:54 -05:00
asonix 84a882392a Start threading upload configuration into ingest 2024-03-27 16:57:22 -05:00
51 changed files with 2375 additions and 1356 deletions

1267
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.5.10"
version = "0.5.14"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
@ -19,11 +19,11 @@ poll-timer-warnings = []
random-errors = ["dep:nanorand"]
[dependencies]
actix-form-data = "0.7.0-beta.6"
actix-web = { version = "4.0.0", default-features = false, features = ["rustls-0_22"] }
actix-form-data = "0.7.0-beta.7"
actix-web = { version = "4.6.0", default-features = false, features = ["rustls-0_23"] }
async-trait = "0.1.51"
barrel = { version = "0.7.0", features = ["pg"] }
base64 = "0.21.0"
base64 = "0.22.0"
bb8 = "0.8.3"
blurhash-update = "0.1.0"
clap = { version = "4.0.2", features = ["derive"] }
@ -34,12 +34,11 @@ dashmap = "5.1.0"
diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.4.1", features = ["bb8", "postgres"] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
flume = "0.11.0"
futures-core = "0.3"
hex = "0.4.3"
md-5 = "0.10.5"
metrics = "0.22.0"
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = ["http-listener"] }
metrics-exporter-prometheus = { version = "0.14.0", default-features = false, features = ["http-listener"] }
mime = "0.3.1"
nanorand = { version = "0.7", optional = true }
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] }
@ -47,19 +46,20 @@ opentelemetry = "0.22"
opentelemetry-otlp = "0.15"
pin-project-lite = "0.2.7"
refinery = { version = "0.8.10", features = ["tokio-postgres", "postgres"] }
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "stream"] }
reqwest-middleware = "0.2.2"
reqwest-tracing = "0.4.5"
# pinned to tokio-postgres-rustls
rustls = "0.22.0"
reqwest = { version = "0.12.0", default-features = false, features = ["json", "rustls-tls", "stream"] }
reqwest-middleware = "0.3.0"
reqwest-tracing = "0.5.0"
# pinned to tokio-postgres-generic-rustls
# pinned to actix-web
rustls = "0.23"
# pinned to rustls
rustls-channel-resolver = "0.2.0"
rustls-channel-resolver = "0.3.0"
# pinned to rustls
rustls-pemfile = "2.0.0"
rusty-s3 = "0.5.0"
serde = { version = "1.0", features = ["derive"] }
serde-tuple-vec-map = "1.0.1"
serde_json = "1.0"
serde-tuple-vec-map = "1.0.1"
serde_urlencoded = "0.7.1"
sha2 = "0.10.0"
sled = { version = "0.34.7" }
@ -69,7 +69,7 @@ thiserror = "1.0"
time = { version = "0.3.0", features = ["serde", "serde-well-known"] }
tokio = { version = "1", features = ["full", "tracing"] }
tokio-postgres = { version = "0.7.10", features = ["with-uuid-1", "with-time-0_3", "with-serde_json-1"] }
tokio-postgres-rustls = "0.11.0"
tokio-postgres-generic-rustls = { version = "0.1.0", default-features = false, features = ["aws-lc-rs"] }
tokio-uring = { version = "0.4", optional = true, features = ["bytes"] }
tokio-util = { version = "0.7", default-features = false, features = [
"codec",
@ -97,6 +97,3 @@ webpki-roots = "0.26.0"
version = "0.7.10"
default-features = false
features = ["emit_event_on_error", "opentelemetry_0_22"]
[dev-dependencies]
tokio-uring = { version = "0.4", features = ["bytes"] }

View file

@ -253,9 +253,27 @@ Example:
### API
pict-rs offers the following endpoints:
- `POST /image` for uploading an image. Uploaded content must be valid multipart/form-data with an
- `POST /image?{args}` for uploading an image. Uploaded content must be valid multipart/form-data with an
image array located within the `images[]` key
The {args} query serves multiple purpose for image uploads. The first is to provide
request-level validations for the uploaded media. Available keys are as follows:
- max_width: maximum width, in pixels, allowed for the uploaded media
- max_height: maximum height, in pixels, allowed for the uploaded media
- max_area: maximum area, in pixels, allowed for the uploaded media
- max_frame_count: maximum number of frames permitted for animations and videos
- max_file_size: maximum size, in megabytes, allowed
- allow_image: whether to permit still images in the upload
- allow_animation: whether to permit animations in the upload
- allow_video: whether to permit video in the upload
These validations apply in addition to the validations specified in the pict-rs configuration,
so uploaded media will be rejected if any of the validations fail.
The second purpose for the {args} query is to provide preprocess steps for the uploaded image.
The format is the same as in the process.{ext} endpoint. The images uploaded with these steps
provided will be processed before saving.
This endpoint returns the following JSON structure on success with a 201 Created status
```json
{
@ -294,7 +312,9 @@ pict-rs offers the following endpoints:
"msg": "ok"
}
```
- `POST /image/backgrounded` Upload an image, like the `/image` endpoint, but don't wait to validate and process it.
- `POST /image/backgrounded?{args}` Upload an image, like the `/image` endpoint, but don't wait to validate and process it.
The {args} query is the same format is the inline image upload endpoint.
This endpoint returns the following JSON structure on success with a 202 Accepted status
```json
{

View file

@ -16,6 +16,7 @@ concurrency = 32
format = "normal"
targets = "info"
log_spans = false
no_ansi = false
[tracing.console]
buffer_capacity = 102400

View file

@ -5,11 +5,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1705309234,
"narHash": "sha256-uNRRNRKmJyCRC/8y1RqBkqWBLM034y4qN7EprSdmgyA=",
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "1ef2e671c3b0c19053962c07dbda38332dcebf26",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1705133751,
"narHash": "sha256-rCIsyE80jgiOU78gCWN3A0wE0tR2GI5nH6MlS+HaaSQ=",
"lastModified": 1713537308,
"narHash": "sha256-XtTSSIB2DA6tOv+l0FhvfDMiyCmhoRbNB+0SeInZkbk=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "9b19f5e77dd906cb52dade0b7bd280339d2a1f3d",
"rev": "5c24cf2f0a12ad855f444c30b2421d044120c66f",
"type": "github"
},
"original": {

View file

@ -15,13 +15,29 @@
in
{
packages = rec {
imagemagick7_pict-rs = pkgs.callPackage ./nix/pkgs/imagemagick_pict-rs {};
ffmpeg6_pict-rs = pkgs.callPackage ./nix/pkgs/ffmpeg_pict-rs {};
pict-rs = pkgs.callPackage ./pict-rs.nix {
inherit (pkgs.darwin.apple_sdk.frameworks) Security;
inherit imagemagick7_pict-rs ffmpeg6_pict-rs;
};
default = pict-rs;
};
docker = pkgs.dockerTools.buildLayeredImage {
name = "pict-rs";
tag = "latest";
contents = [ pkgs.tini self.packages.${system}.pict-rs pkgs.bash ];
config = {
Entrypoint = [ "/bin/tini" "--" "/bin/pict-rs" ];
Cmd = [ "run" ];
};
};
apps = rec {
dev = flake-utils.lib.mkApp { drv = self.packages.${system}.pict-rs; };
default = dev;
@ -33,11 +49,13 @@
cargo-outdated
certstrap
clippy
curl
diesel-cli
exiftool
ffmpeg_6-full
garage
imagemagick
self.packages.${system}.imagemagick7_pict-rs
self.packages.${system}.ffmpeg6_pict-rs
jq
minio-client
rust-analyzer
rustc

View file

@ -0,0 +1,5 @@
{ ffmpeg_6-headless }:
ffmpeg_6-headless.override {
withWebp = true;
}

View file

@ -0,0 +1,23 @@
{ imagemagick7 }:
imagemagick7.override {
bzip2Support = true;
zlibSupport = true;
libX11Support = false;
libXtSupport = false;
fontconfigSupport = false;
freetypeSupport = false;
libjpegSupport = true;
djvulibreSupport = false;
lcms2Support = false;
openexrSupport = false;
libjxlSupport = true;
libpngSupport = true;
liblqr1Support = false;
librsvgSupport = false;
libtiffSupport = false;
libxml2Support = false;
openjpegSupport = true;
libwebpSupport = true;
libheifSupport = true;
}

View file

@ -1,6 +1,6 @@
{ exiftool
, ffmpeg_6-full
, imagemagick
, ffmpeg6_pict-rs
, imagemagick7_pict-rs
, lib
, makeWrapper
, nixosTests
@ -11,7 +11,7 @@
rustPlatform.buildRustPackage {
pname = "pict-rs";
version = "0.5.10";
version = "0.5.14";
src = ./.;
cargoLock = {
@ -27,7 +27,7 @@ rustPlatform.buildRustPackage {
postInstall = ''
wrapProgram $out/bin/pict-rs \
--prefix PATH : "${lib.makeBinPath [ imagemagick ffmpeg_6-full exiftool ]}"
--prefix PATH : "${lib.makeBinPath [ imagemagick7_pict-rs ffmpeg6_pict-rs exiftool ]}"
'';
passthru.tests = { inherit (nixosTests) pict-rs; };

View file

@ -98,6 +98,11 @@ targets = 'info'
# default: false
log_spans = false
## Optional: whether to disable colorized log output
# environment variable: PICTRS__TRACING__LOGGING__NO_ANSI
# default: false
no_ansi = false
## Console configuration
[tracing.console]

82
releases/0.5.11.md Normal file
View file

@ -0,0 +1,82 @@
# pict-rs 0.5.11
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.11 introduces new per-upload media validations, and new per-upload media processing.
These features will enable applications to be more precise about their media requirements, such as
allowing different media types and sizes for different endpoints, or pre-processing certain media to
optimize for size.
### Features
- [Upload Validations](#upload-validations)
- [Upload Processing](#upload-processing)
### Changes
- [Backgrounded Variants](#backgrounded-variants)
## Upgrade Notes
For postgres-based installations, a small migration will be run when pict-rs 0.5.11 first launches
to create a new notifications table. No manual intervention is required. Upgrading should be as
simple as pulling a new version of pict-rs.
## Descriptions
### Upload Validations
When ingesting media using `POST /image`, `POST /image/backgrounded`, `POST /internal/import`, or
`GET /image/download`, validations can now be applied per-upload. These can be provided in the
request query. The following query parameters are supported:
- max_width: maximum width, in pixels, allowed for the uploaded media
- max_height: maximum height, in pixels, allowed for the uploaded media
- max_area: maximum area, in pixels, allowed for the uploaded media
- max_frame_count: maximum number of frames permitted for animations and videos
- max_file_size: maximum size, in megabytes, allowed
- allow_image: whether to permit still images in the upload
- allow_animation: whether to permit animations in the upload
- allow_video: whether to permit video in the upload
An example request could look like this: `POST /image/backgrounded?max_area=3200&allow_video=false`
Validations are performed in addition to the validations specified in the pict-rs configuration, so
if uploaded media violates any of the validations, it will fail to ingest.
### Upload Processing
In a similar vein to the upload validations, preprocessing steps can now be applied on a per-upload
basis. These are also provided as query parameters, and will be applied _instead of_ the configured
preprocess steps. The preprocess query parameters are provided and processed the same way as in the
`GET image/process.{ext}` endpoint.
An example request could be `POST /image/backgrounded?blur=2.5&resize=300`, which would blur the
uploaded image and fit it inside a 300x300 box before saving it.
### Backgrounded Variants
When serving images from the /process.{ext} endpoint, pict-rs will now queue the processing to
happen via the job queue, rather than processing media inline. It will still wait up to 30 seconds
for the processing to be complete, and return the processed image the same way it always has.
If processing exceeds 30 seconds, pict-rs will return a timeout error, but the processing will
continue in the background. The same variant can be requested again, and it will wait for the same
background process to complete, rather than trying to process the variant a second time.
pict-rs has historically had a method of reducing variant processing to prevent two requests for the
same variant from doing the same work, but this was only effective in environments that only ran 1
copy of pict-rs. In environments that had multiple replicas, each one could end up processing the
same variant if it was requested more than once at a time. This has been solved by using postgres as
a notification system to enable globally unique processing for a given variant.
In sled-based configurations there shouldn't be a noticible difference, aside from the 30 second
timeout on variant endpoints.

46
releases/0.5.12.md Normal file
View file

@ -0,0 +1,46 @@
# pict-rs 0.5.12
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.12 is a bugfix release to remove two issues that, when compounded, would cause pict-rs
to fail to process media.
### Fixes
- [Panic Handling in Background Jobs](#panic-handling-in-background-jobs)
- [BytesStream Divide-by-Zero](#bytes-stream-divide-by-zero)
## Upgrade Notes
There are no significant differences from 0.5.11. Upgrading should be as simple as pulling a new
version of pict-rs.
## Descriptions
### Panic Handling in Background Jobs
pict-rs makes an effort to never use explicitly panicking code, but since there's no static way to
guarantee that a given function wont panic, pict-rs needs to be able to deal with that. pict-rs
0.5.12 now wraps invocations of jobs in spawned tasks, which can catch and report panics that happen
in background jobs.
Previously, a panic in a background job would bring down that thread's job processor, which resulted
in future jobs never being processed. Now job processing should properly continue after panics
occur.
### BytesStream Divide-by-Zero
Part of my rework of BytesStream recently included adding debug logs around how many bytes chunks
were in a given stream, and their average length. Unfortunately, if there were no bytes in the
stream, this would cause the "average chunk length" calculation to divide by 0. In previous versions
of pict-rs, this would generally result in a failed request for processed media, but in pict-rs
0.5.11 this would end up killing the background jobs processor.
This specific panic has been fixed by ensuring we divide by the number of chunks or 1, whichever is
greater.

62
releases/0.5.13.md Normal file
View file

@ -0,0 +1,62 @@
# pict-rs 0.5.13
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.13 is a maintenance release aiming to enable better logging in some scenarios.
### Features
- [Colorless Logging](#colorless-logging)
### Changes
- [Remove Flume](#remove-flume)
## Upgrade Notes
There are no significant changes from 0.5.12. Upgrading should be as simple as pulling a new version
of pict-rs.
## Descriptions
### Colorless Logging
When opting to use the `json` logger, the tracing subscriber automatically disables colored output.
This didn't remove colors from errors, though, and pict-rs hasn't had a way to disable colors while
using other log formats. pict-rs 0.5.13 introduces a new configuration value to remove colored
output from all logs regardless of logging format.
With pict-rs.toml
```toml
[tracing.logging]
no_ansi = true
```
With environment variables
```bash
PICTRS__TRACING__LOGGING__NO_ANSI=true
```
With commandline flags
```bash
pict-rs --no-log-ansi run
```
Colors in logs can be useful, so I imagine this option won't be used much. There has been a request
for this functionality though and it's little cost to maintain.
### Remove Flume
Recently I've been debugging a memory usage issue in another project of mine. I wasn't able to fully
track down the cause, but I did notice that removing the
[flume channel library](https://github.com/zesterer/flume) seemed to make the leak go away. Since I
also use flume in pict-rs, I'm opting to replace it with tokio's native channel implementation. This
may or may not improve memory usage, but it does reduce the depenency count and therefore build time
for pict-rs.

28
releases/0.5.14.md Normal file
View file

@ -0,0 +1,28 @@
# pict-rs 0.5.14
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.14 includes a bugfix for identifying certain MOV videos, as well as updated dependencies.
### Fixes
- [Empty Stream Parsing](#empty-stream-parsing)
## Upgrade Notes
There are no significant changes from 0.5.13. Upgrading should be as simple as pulling a new version
of pict-rs.
## Descriptions
### Empty Stream Parsing
Certain videos, when identified with ffprobe, contain stream json objects with no fields. This would
cause pict-rs to fail to parse the information for these videos, as it expects streams to at least
contain a codec field. In pict-rs 0.5.14, empty streams are now considered valid and are simply
ignored.

View file

@ -35,7 +35,7 @@ impl BytesStream {
tracing::debug!(
"BytesStream with {} chunks, avg length {}",
bs.chunks_len(),
bs.len() / bs.chunks_len()
bs.len() / bs.chunks_len().max(1)
);
Ok(bs)

View file

@ -1,172 +0,0 @@
use crate::{
details::Details,
error::{Error, UploadError},
repo::Hash,
};
use dashmap::{mapref::entry::Entry, DashMap};
use flume::{r#async::RecvFut, Receiver, Sender};
use std::{
future::Future,
path::PathBuf,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tracing::Span;
type OutcomeReceiver = Receiver<(Details, Arc<str>)>;
type ProcessMapKey = (Hash, PathBuf);
type ProcessMapInner = DashMap<ProcessMapKey, OutcomeReceiver>;
#[derive(Debug, Default, Clone)]
pub(crate) struct ProcessMap {
process_map: Arc<ProcessMapInner>,
}
impl ProcessMap {
pub(super) fn new() -> Self {
Self::default()
}
pub(super) async fn process<Fut>(
&self,
hash: Hash,
path: PathBuf,
fut: Fut,
) -> Result<(Details, Arc<str>), Error>
where
Fut: Future<Output = Result<(Details, Arc<str>), Error>>,
{
let key = (hash.clone(), path.clone());
let (sender, receiver) = flume::bounded(1);
let entry = self.process_map.entry(key.clone());
let (state, span) = match entry {
Entry::Vacant(vacant) => {
vacant.insert(receiver);
let span = tracing::info_span!(
"Processing image",
hash = ?hash,
path = ?path,
completed = &tracing::field::Empty,
);
metrics::counter!(crate::init_metrics::PROCESS_MAP_INSERTED).increment(1);
(CancelState::Sender { sender }, span)
}
Entry::Occupied(receiver) => {
let span = tracing::info_span!(
"Waiting for processed image",
hash = ?hash,
path = ?path,
);
let receiver = receiver.get().clone().into_recv_async();
(CancelState::Receiver { receiver }, span)
}
};
CancelSafeProcessor {
cancel_token: CancelToken {
span,
key,
state,
process_map: self.clone(),
},
fut,
}
.await
}
fn remove(&self, key: &ProcessMapKey) -> Option<OutcomeReceiver> {
self.process_map.remove(key).map(|(_, v)| v)
}
}
struct CancelToken {
span: Span,
key: ProcessMapKey,
state: CancelState,
process_map: ProcessMap,
}
enum CancelState {
Sender {
sender: Sender<(Details, Arc<str>)>,
},
Receiver {
receiver: RecvFut<'static, (Details, Arc<str>)>,
},
}
impl CancelState {
const fn is_sender(&self) -> bool {
matches!(self, Self::Sender { .. })
}
}
pin_project_lite::pin_project! {
struct CancelSafeProcessor<F> {
cancel_token: CancelToken,
#[pin]
fut: F,
}
}
impl<F> Future for CancelSafeProcessor<F>
where
F: Future<Output = Result<(Details, Arc<str>), Error>>,
{
type Output = Result<(Details, Arc<str>), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
let span = &this.cancel_token.span;
let process_map = &this.cancel_token.process_map;
let state = &mut this.cancel_token.state;
let key = &this.cancel_token.key;
let fut = this.fut;
span.in_scope(|| match state {
CancelState::Sender { sender } => {
let res = std::task::ready!(fut.poll(cx));
if process_map.remove(key).is_some() {
metrics::counter!(crate::init_metrics::PROCESS_MAP_REMOVED).increment(1);
}
if let Ok(tup) = &res {
let _ = sender.try_send(tup.clone());
}
Poll::Ready(res)
}
CancelState::Receiver { ref mut receiver } => Pin::new(receiver)
.poll(cx)
.map(|res| res.map_err(|_| UploadError::Canceled.into())),
})
}
}
impl Drop for CancelToken {
fn drop(&mut self) {
if self.state.is_sender() {
let completed = self.process_map.remove(&self.key).is_none();
self.span.record("completed", completed);
if !completed {
metrics::counter!(crate::init_metrics::PROCESS_MAP_REMOVED).increment(1);
}
}
}
}

View file

@ -18,6 +18,7 @@ impl Args {
log_format,
log_targets,
log_spans,
no_log_ansi,
console_address,
console_buffer_capacity,
opentelemetry_url,
@ -38,6 +39,7 @@ impl Args {
format: log_format,
targets: log_targets.map(Serde::new),
log_spans,
no_ansi: no_log_ansi,
},
console: Console {
address: console_address,
@ -581,6 +583,8 @@ struct Logging {
targets: Option<Serde<Targets>>,
#[serde(skip_serializing_if = "std::ops::Not::not")]
log_spans: bool,
#[serde(skip_serializing_if = "std::ops::Not::not")]
no_ansi: bool,
}
#[derive(Debug, Default, serde::Serialize)]
@ -925,6 +929,10 @@ pub(super) struct Args {
#[arg(long)]
log_spans: bool,
#[arg(long)]
/// Whether to disable color-codes in log output
no_log_ansi: bool,
/// Address and port to expose tokio-console metrics
#[arg(long)]
console_address: Option<SocketAddr>,

View file

@ -55,6 +55,7 @@ struct LoggingDefaults {
format: LogFormat,
targets: Serde<Targets>,
log_spans: bool,
no_ansi: bool,
}
#[derive(Clone, Debug, serde::Serialize)]
@ -235,6 +236,7 @@ impl Default for LoggingDefaults {
format: LogFormat::Normal,
targets: "info".parse().expect("Valid targets string"),
log_spans: false,
no_ansi: false,
}
}
}

View file

@ -163,6 +163,8 @@ pub(crate) struct Logging {
pub(crate) targets: Serde<Targets>,
pub(crate) log_spans: bool,
pub(crate) no_ansi: bool,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]

View file

@ -53,6 +53,7 @@ impl FfMpegStreams {
FfMpegStream::Unknown { codec_name } => {
tracing::info!("Encountered unknown stream {codec_name}");
}
FfMpegStream::Empty {} => {}
}
}
@ -135,6 +136,7 @@ enum FfMpegStream {
Audio(FfMpegAudioStream),
Video(FfMpegVideoStream),
Unknown { codec_name: String },
Empty {},
}
#[derive(Debug, serde::Deserialize)]

View file

@ -0,0 +1,35 @@
{
"programs": [
],
"streams": [
{
"codec_name": "hevc",
"width": 1920,
"height": 1080,
"pix_fmt": "yuv420p10le",
"nb_read_frames": "187",
"side_data_list": [
{
}
]
},
{
"codec_name": "aac",
"nb_read_frames": "135"
},
{
},
{
},
{
}
],
"format": {
"format_name": "mov,mp4,m4a,3gp,3g2,mj2"
}
}

View file

@ -1,11 +1,11 @@
use crate::formats::{
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat, Mp4Codec,
WebmAlphaCodec, WebmCodec,
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat,
Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmCodec,
};
use super::{Discovery, FfMpegDiscovery, PixelFormatOutput};
fn details_tests() -> [(&'static str, Option<Discovery>); 13] {
fn details_tests() -> [(&'static str, Option<Discovery>); 14] {
[
(
"animated_webp",
@ -151,6 +151,18 @@ fn details_tests() -> [(&'static str, Option<Discovery>); 13] {
frames: None,
}),
),
(
"mov",
Some(Discovery {
input: InputFile::Video(InputVideoFormat::Mp4 {
video_codec: Mp4Codec::H265,
audio_codec: Some(Mp4AudioCodec::Aac),
}),
width: 1920,
height: 1080,
frames: Some(187),
}),
),
]
}

View file

@ -82,7 +82,7 @@ pub(crate) enum UploadError {
Io(#[from] std::io::Error),
#[error("Error validating upload")]
Validation(#[from] crate::validate::ValidationError),
Validation(#[from] crate::ingest::ValidationError),
#[error("Error in store")]
Store(#[source] crate::store::StoreError),
@ -111,6 +111,12 @@ pub(crate) enum UploadError {
#[error("Invalid job popped from job queue: {1}")]
InvalidJob(#[source] serde_json::Error, String),
#[error("Invalid query supplied")]
InvalidQuery(#[source] actix_web::error::QueryPayloadError),
#[error("Invalid json supplied")]
InvalidJson(#[source] actix_web::error::JsonPayloadError),
#[error("pict-rs is in read-only mode")]
ReadOnly,
@ -209,6 +215,8 @@ impl UploadError {
Self::ProcessTimeout => ErrorCode::COMMAND_TIMEOUT,
Self::FailedExternalValidation => ErrorCode::FAILED_EXTERNAL_VALIDATION,
Self::InvalidJob(_, _) => ErrorCode::INVALID_JOB,
Self::InvalidQuery(_) => ErrorCode::INVALID_QUERY,
Self::InvalidJson(_) => ErrorCode::INVALID_JSON,
#[cfg(feature = "random-errors")]
Self::RandomError => ErrorCode::RANDOM_ERROR,
}
@ -248,7 +256,7 @@ impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match self.kind() {
Some(UploadError::Upload(actix_form_data::Error::FileSize))
| Some(UploadError::Validation(crate::validate::ValidationError::Filesize)) => {
| Some(UploadError::Validation(crate::ingest::ValidationError::Filesize)) => {
StatusCode::PAYLOAD_TOO_LARGE
}
Some(
@ -261,6 +269,8 @@ impl ResponseError for Error {
))
| UploadError::Repo(crate::repo::RepoError::AlreadyClaimed)
| UploadError::Validation(_)
| UploadError::InvalidQuery(_)
| UploadError::InvalidJson(_)
| UploadError::UnsupportedProcessExtension
| UploadError::ReadOnly
| UploadError::FailedExternalValidation

View file

@ -100,6 +100,9 @@ impl ErrorCode {
pub(crate) const VIDEO_DISABLED: ErrorCode = ErrorCode {
code: "video-disabled",
};
pub(crate) const MEDIA_DISALLOWED: ErrorCode = ErrorCode {
code: "media-disallowed",
};
pub(crate) const HTTP_CLIENT_ERROR: ErrorCode = ErrorCode {
code: "http-client-error",
};
@ -147,6 +150,12 @@ impl ErrorCode {
pub(crate) const INVALID_JOB: ErrorCode = ErrorCode {
code: "invalid-job",
};
pub(crate) const INVALID_QUERY: ErrorCode = ErrorCode {
code: "invalid-query",
};
pub(crate) const INVALID_JSON: ErrorCode = ErrorCode {
code: "invalid-json",
};
#[cfg(feature = "random-errors")]
pub(crate) const RANDOM_ERROR: ErrorCode = ErrorCode {
code: "random-error",

View file

@ -2,18 +2,17 @@ mod ffmpeg;
mod magick;
use crate::{
concurrent_processor::ProcessMap,
details::Details,
error::{Error, UploadError},
formats::{ImageFormat, InputProcessableFormat, InternalVideoFormat, ProcessableFormat},
future::{WithMetrics, WithPollTimer, WithTimeout},
repo::{Hash, VariantAlreadyExists},
repo::{Hash, NotificationEntry, VariantAlreadyExists},
state::State,
store::Store,
};
use std::{
path::PathBuf,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
@ -48,13 +47,12 @@ impl Drop for MetricsGuard {
}
}
#[tracing::instrument(skip(state, process_map, original_details, hash))]
#[tracing::instrument(skip(state, original_details, hash))]
pub(crate) async fn generate<S: Store + 'static>(
state: &State<S>,
process_map: &ProcessMap,
format: InputProcessableFormat,
thumbnail_path: PathBuf,
thumbnail_args: Vec<String>,
variant: String,
variant_args: Vec<String>,
original_details: &Details,
hash: Hash,
) -> Result<(Details, Arc<str>), Error> {
@ -67,25 +65,122 @@ pub(crate) async fn generate<S: Store + 'static>(
Ok((original_details.clone(), identifier))
} else {
let process_fut = process(
state,
format,
thumbnail_path.clone(),
thumbnail_args,
original_details,
hash.clone(),
)
.with_poll_timer("process-future");
let mut attempts = 0;
let tup = loop {
if attempts > 2 {
return Err(UploadError::ProcessTimeout.into());
}
let (details, identifier) = process_map
.process(hash, thumbnail_path, process_fut)
.with_poll_timer("process-map-future")
.with_timeout(Duration::from_secs(state.config.media.process_timeout * 4))
.with_metrics(crate::init_metrics::GENERATE_PROCESS)
.await
.map_err(|_| UploadError::ProcessTimeout)??;
match state
.repo
.claim_variant_processing_rights(hash.clone(), variant.clone())
.await?
{
Ok(()) => {
// process
let process_future = process(
state,
format,
variant.clone(),
variant_args,
original_details,
hash.clone(),
)
.with_poll_timer("process-future");
Ok((details, identifier))
let res = heartbeat(state, hash.clone(), variant.clone(), process_future)
.with_poll_timer("heartbeat-future")
.with_timeout(Duration::from_secs(state.config.media.process_timeout * 4))
.with_metrics(crate::init_metrics::GENERATE_PROCESS)
.await
.map_err(|_| Error::from(UploadError::ProcessTimeout));
state
.repo
.notify_variant(hash.clone(), variant.clone())
.await?;
break res???;
}
Err(entry) => {
if let Some(tuple) = wait_timeout(
hash.clone(),
variant.clone(),
entry,
state,
Duration::from_secs(20),
)
.await?
{
break tuple;
}
attempts += 1;
}
}
};
Ok(tup)
}
}
pub(crate) async fn wait_timeout<S: Store + 'static>(
hash: Hash,
variant: String,
mut entry: NotificationEntry,
state: &State<S>,
timeout: Duration,
) -> Result<Option<(Details, Arc<str>)>, Error> {
let notified = entry.notified_timeout(timeout);
if let Some(identifier) = state
.repo
.variant_identifier(hash.clone(), variant.clone())
.await?
{
let details = crate::ensure_details_identifier(state, &identifier).await?;
return Ok(Some((details, identifier)));
}
match notified.await {
Ok(()) => tracing::debug!("notified"),
Err(_) => tracing::debug!("timeout"),
}
Ok(None)
}
async fn heartbeat<S, O>(
state: &State<S>,
hash: Hash,
variant: String,
future: impl Future<Output = O>,
) -> Result<O, Error> {
let repo = state.repo.clone();
let handle = crate::sync::abort_on_drop(crate::sync::spawn("heartbeat-task", async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
if let Err(e) = repo.variant_heartbeat(hash.clone(), variant.clone()).await {
break Error::from(e);
}
}
}));
let future = std::pin::pin!(future);
tokio::select! {
biased;
output = future => {
Ok(output)
}
res = handle => {
Err(res.map_err(|_| UploadError::Canceled)?)
}
}
}
@ -93,8 +188,8 @@ pub(crate) async fn generate<S: Store + 'static>(
async fn process<S: Store + 'static>(
state: &State<S>,
output_format: InputProcessableFormat,
thumbnail_path: PathBuf,
thumbnail_args: Vec<String>,
variant: String,
variant_args: Vec<String>,
original_details: &Details,
hash: Hash,
) -> Result<(Details, Arc<str>), Error> {
@ -120,7 +215,7 @@ async fn process<S: Store + 'static>(
let stream = state.store.to_stream(&identifier, None, None).await?;
let bytes =
crate::magick::process_image_command(state, thumbnail_args, input_format, format, quality)
crate::magick::process_image_command(state, variant_args, input_format, format, quality)
.await?
.drive_with_stream(stream)
.into_bytes_stream()
@ -142,19 +237,21 @@ async fn process<S: Store + 'static>(
)
.await?;
if let Err(VariantAlreadyExists) = state
let identifier = if let Err(VariantAlreadyExists) = state
.repo
.relate_variant_identifier(
hash,
thumbnail_path.to_string_lossy().to_string(),
&identifier,
)
.relate_variant_identifier(hash.clone(), variant.clone(), &identifier)
.await?
{
state.store.remove(&identifier).await?;
}
state.repo.relate_details(&identifier, &details).await?;
state
.repo
.variant_identifier(hash, variant)
.await?
.ok_or(UploadError::MissingIdentifier)?
} else {
state.repo.relate_details(&identifier, &details).await?;
identifier
};
guard.disarm();

3
src/http1.rs Normal file
View file

@ -0,0 +1,3 @@
pub(crate) fn to_actix_status(status: reqwest::StatusCode) -> actix_web::http::StatusCode {
actix_web::http::StatusCode::from_u16(status.as_u16()).expect("status codes are always valid")
}

View file

@ -1,3 +1,6 @@
mod hasher;
mod validate;
use std::{cell::RefCell, rc::Rc, sync::Arc, time::Duration};
use crate::{
@ -9,16 +12,17 @@ use crate::{
repo::{Alias, ArcRepo, DeleteToken, Hash},
state::State,
store::Store,
UploadQuery,
};
use actix_web::web::Bytes;
use futures_core::Stream;
use reqwest::Body;
use tracing::{Instrument, Span};
mod hasher;
use hasher::Hasher;
pub(crate) use validate::ValidationError;
#[derive(Debug)]
pub(crate) struct Session {
repo: ArcRepo,
@ -31,6 +35,7 @@ pub(crate) struct Session {
async fn process_ingest<S>(
state: &State<S>,
stream: impl Stream<Item = Result<Bytes, Error>>,
upload_query: &UploadQuery,
) -> Result<
(
InternalFormat,
@ -54,11 +59,18 @@ where
let permit = crate::process_semaphore().acquire().await?;
tracing::trace!("Validating bytes");
let (input_type, process_read) = crate::validate::validate_bytes_stream(state, bytes)
.with_poll_timer("validate-bytes-stream")
.await?;
let (input_type, process_read) =
validate::validate_bytes_stream(state, bytes, &upload_query.limits)
.with_poll_timer("validate-bytes-stream")
.await?;
let process_read = if let Some(operations) = state.config.media.preprocess_steps() {
let operations = if upload_query.operations.is_empty() {
state.config.media.preprocess_steps()
} else {
Some(upload_query.operations.as_ref())
};
let process_read = if let Some(operations) = operations {
if let Some(format) = input_type.processable_format() {
let (_, magick_args) =
crate::processor::build_chain(operations, format.file_extension())?;
@ -159,6 +171,7 @@ pub(crate) async fn ingest<S>(
state: &State<S>,
stream: impl Stream<Item = Result<Bytes, Error>>,
declared_alias: Option<Alias>,
upload_query: &UploadQuery,
) -> Result<Session, Error>
where
S: Store,
@ -166,7 +179,7 @@ where
let (input_type, identifier, details, hash_state) = if state.config.server.danger_dummy_mode {
dummy_ingest(state, stream).await?
} else {
process_ingest(state, stream)
process_ingest(state, stream, upload_query)
.with_poll_timer("ingest-future")
.await?
};

View file

@ -14,6 +14,7 @@ use crate::{
future::WithPollTimer,
process::{Process, ProcessRead},
state::State,
UploadLimits,
};
#[derive(Debug, thiserror::Error)]
@ -38,6 +39,9 @@ pub(crate) enum ValidationError {
#[error("Video is disabled")]
VideoDisabled,
#[error("Media type wasn't allowed for this upload")]
MediaDisallowed,
}
impl ValidationError {
@ -50,6 +54,7 @@ impl ValidationError {
Self::Empty => ErrorCode::VALIDATE_FILE_EMPTY,
Self::Filesize => ErrorCode::VALIDATE_FILE_SIZE,
Self::VideoDisabled => ErrorCode::VIDEO_DISABLED,
Self::MediaDisallowed => ErrorCode::MEDIA_DISALLOWED,
}
}
}
@ -60,6 +65,7 @@ const MEGABYTES: usize = 1024 * 1024;
pub(crate) async fn validate_bytes_stream<S>(
state: &State<S>,
bytes: BytesStream,
upload_limits: &UploadLimits,
) -> Result<(InternalFormat, ProcessRead), Error> {
if bytes.is_empty() {
return Err(ValidationError::Empty.into());
@ -74,14 +80,16 @@ pub(crate) async fn validate_bytes_stream<S>(
.with_poll_timer("discover-bytes-stream")
.await?;
validate_upload(bytes.len(), width, height, frames, upload_limits)?;
match &input {
InputFile::Image(input) => {
InputFile::Image(input) if *upload_limits.allow_image => {
let (format, process) =
process_image_command(state, *input, bytes.len(), width, height).await?;
Ok((format, process.drive_with_stream(bytes.into_io_stream())))
}
InputFile::Animation(input) => {
InputFile::Animation(input) if *upload_limits.allow_animation => {
let (format, process) = process_animation_command(
state,
*input,
@ -94,20 +102,67 @@ pub(crate) async fn validate_bytes_stream<S>(
Ok((format, process.drive_with_stream(bytes.into_io_stream())))
}
InputFile::Video(input) => {
InputFile::Video(input) if *upload_limits.allow_video => {
let (format, process_read) =
process_video(state, bytes, *input, width, height, frames.unwrap_or(1)).await?;
Ok((format, process_read))
}
_ => Err(ValidationError::MediaDisallowed.into()),
}
}
fn validate_upload(
size: usize,
width: u16,
height: u16,
frames: Option<u32>,
upload_limits: &UploadLimits,
) -> Result<(), ValidationError> {
if upload_limits
.max_width
.is_some_and(|max_width| width > *max_width)
{
return Err(ValidationError::Width);
}
if upload_limits
.max_height
.is_some_and(|max_height| height > *max_height)
{
return Err(ValidationError::Height);
}
if upload_limits
.max_frame_count
.zip(frames)
.is_some_and(|(max_frame_count, frames)| frames > *max_frame_count)
{
return Err(ValidationError::Frames);
}
if upload_limits
.max_area
.is_some_and(|max_area| u32::from(width) * u32::from(height) > *max_area)
{
return Err(ValidationError::Area);
}
if upload_limits
.max_file_size
.is_some_and(|max_file_size| size > *max_file_size * MEGABYTES)
{
return Err(ValidationError::Filesize);
}
Ok(())
}
#[tracing::instrument(skip(state))]
async fn process_image_command<S>(
state: &State<S>,
input: ImageInput,
length: usize,
size: usize,
width: u16,
height: u16,
) -> Result<(InternalFormat, Process), Error> {
@ -122,7 +177,7 @@ async fn process_image_command<S>(
if u32::from(width) * u32::from(height) > validations.max_area {
return Err(ValidationError::Area.into());
}
if length > validations.max_file_size * MEGABYTES {
if size > validations.max_file_size * MEGABYTES {
return Err(ValidationError::Filesize.into());
}
@ -172,14 +227,14 @@ fn validate_animation(
async fn process_animation_command<S>(
state: &State<S>,
input: AnimationFormat,
length: usize,
size: usize,
width: u16,
height: u16,
frames: u32,
) -> Result<(InternalFormat, Process), Error> {
let validations = &state.config.media.animation;
validate_animation(length, width, height, frames, validations)?;
validate_animation(size, width, height, frames, validations)?;
let AnimationOutput {
format,

View file

@ -1,4 +1,5 @@
use crate::config::{LogFormat, OpenTelemetry, Tracing};
use color_eyre::config::Theme;
use console_subscriber::ConsoleLayer;
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
@ -11,7 +12,15 @@ use tracing_subscriber::{
};
pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
color_eyre::install()?;
let eyre_theme = if tracing.logging.no_ansi {
Theme::new()
} else {
Theme::dark()
};
color_eyre::config::HookBuilder::new()
.theme(eyre_theme)
.install()?;
LogTracer::init()?;
@ -23,7 +32,9 @@ pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
FmtSpan::NONE
};
let format_layer = tracing_subscriber::fmt::layer().with_span_events(fmt_span);
let format_layer = tracing_subscriber::fmt::layer()
.with_span_events(fmt_span)
.with_ansi(!tracing.logging.no_ansi);
match tracing.logging.format {
LogFormat::Compact => with_format(format_layer.compact(), tracing),

View file

@ -1,7 +1,6 @@
mod backgrounded;
mod blurhash;
mod bytes_stream;
mod concurrent_processor;
mod config;
mod details;
mod discover;
@ -15,6 +14,7 @@ mod file_path;
mod formats;
mod future;
mod generate;
mod http1;
mod ingest;
mod init_metrics;
mod init_tracing;
@ -35,7 +35,6 @@ mod stream;
mod sync;
mod tls;
mod tmp_file;
mod validate;
use actix_form_data::{Field, Form, FormData, Multipart, Value};
use actix_web::{
@ -58,7 +57,7 @@ use state::State;
use std::{
marker::PhantomData,
path::Path,
path::PathBuf,
rc::Rc,
sync::{Arc, OnceLock},
time::{Duration, SystemTime},
};
@ -71,7 +70,6 @@ use tracing_actix_web::TracingLogger;
use self::{
backgrounded::Backgrounded,
concurrent_processor::ProcessMap,
config::{Configuration, Operation},
details::Details,
either::Either,
@ -123,6 +121,7 @@ async fn ensure_details<S: Store + 'static>(
ensure_details_identifier(state, &identifier).await
}
#[tracing::instrument(skip(state))]
async fn ensure_details_identifier<S: Store + 'static>(
state: &State<S>,
identifier: &Arc<str>,
@ -147,22 +146,64 @@ async fn ensure_details_identifier<S: Store + 'static>(
}
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(default)]
struct UploadLimits {
max_width: Option<Serde<u16>>,
max_height: Option<Serde<u16>>,
max_area: Option<Serde<u32>>,
max_frame_count: Option<Serde<u32>>,
max_file_size: Option<Serde<usize>>,
allow_image: Serde<bool>,
allow_animation: Serde<bool>,
allow_video: Serde<bool>,
}
impl Default for UploadLimits {
fn default() -> Self {
Self {
max_width: None,
max_height: None,
max_area: None,
max_frame_count: None,
max_file_size: None,
allow_image: Serde::new(true),
allow_animation: Serde::new(true),
allow_video: Serde::new(true),
}
}
}
#[derive(Clone, Default, Debug, serde::Deserialize, serde::Serialize)]
struct UploadQuery {
#[serde(flatten)]
limits: UploadLimits,
#[serde(with = "tuple_vec_map", flatten)]
operations: Vec<(String, String)>,
}
struct Upload<S>(Value<Session>, PhantomData<S>);
impl<S: Store + 'static> FormData for Upload<S> {
type Item = Session;
type Error = Error;
fn form(req: &HttpRequest) -> Form<Self::Item, Self::Error> {
fn form(req: &HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
let state = req
.app_data::<web::Data<State<S>>>()
.expect("No state in request")
.clone();
let web::Query(upload_query) = web::Query::<UploadQuery>::from_query(req.query_string())
.map_err(UploadError::InvalidQuery)?;
let upload_query = Rc::new(upload_query);
// Create a new Multipart Form validator
//
// This form is expecting a single array field, 'images' with at most 10 files in it
Form::new()
Ok(Form::new()
.max_files(state.config.server.max_file_count)
.max_file_size(state.config.media.max_file_size * MEGABYTES)
.transform_error(transform_error)
@ -170,6 +211,7 @@ impl<S: Store + 'static> FormData for Upload<S> {
"images",
Field::array(Field::file(move |filename, _, stream| {
let state = state.clone();
let upload_query = upload_query.clone();
metrics::counter!(crate::init_metrics::FILES, "upload" => "inline")
.increment(1);
@ -184,13 +226,13 @@ impl<S: Store + 'static> FormData for Upload<S> {
let stream = crate::stream::from_err(stream);
ingest::ingest(&state, stream, None).await
ingest::ingest(&state, stream, None, &upload_query).await
}
.with_poll_timer("file-upload")
.instrument(span),
)
})),
)
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error> {
@ -204,16 +246,21 @@ impl<S: Store + 'static> FormData for Import<S> {
type Item = Session;
type Error = Error;
fn form(req: &actix_web::HttpRequest) -> Form<Self::Item, Self::Error> {
fn form(req: &actix_web::HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
let state = req
.app_data::<web::Data<State<S>>>()
.expect("No state in request")
.clone();
let web::Query(upload_query) = web::Query::<UploadQuery>::from_query(req.query_string())
.map_err(UploadError::InvalidQuery)?;
let upload_query = Rc::new(upload_query);
// Create a new Multipart Form validator for internal imports
//
// This form is expecting a single array field, 'images' with at most 10 files in it
Form::new()
Ok(Form::new()
.max_files(state.config.server.max_file_count)
.max_file_size(state.config.media.max_file_size * MEGABYTES)
.transform_error(transform_error)
@ -221,6 +268,7 @@ impl<S: Store + 'static> FormData for Import<S> {
"images",
Field::array(Field::file(move |filename, _, stream| {
let state = state.clone();
let upload_query = upload_query.clone();
metrics::counter!(crate::init_metrics::FILES, "import" => "inline")
.increment(1);
@ -235,14 +283,19 @@ impl<S: Store + 'static> FormData for Import<S> {
let stream = crate::stream::from_err(stream);
ingest::ingest(&state, stream, Some(Alias::from_existing(&filename)))
.await
ingest::ingest(
&state,
stream,
Some(Alias::from_existing(&filename)),
&upload_query,
)
.await
}
.with_poll_timer("file-import")
.instrument(span),
)
})),
)
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
@ -320,16 +373,16 @@ impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
type Item = Backgrounded;
type Error = Error;
fn form(req: &actix_web::HttpRequest) -> Form<Self::Item, Self::Error> {
// Create a new Multipart Form validator for backgrounded uploads
//
// This form is expecting a single array field, 'images' with at most 10 files in it
fn form(req: &actix_web::HttpRequest) -> Result<Form<Self::Item, Self::Error>, Self::Error> {
let state = req
.app_data::<web::Data<State<S>>>()
.expect("No state in request")
.clone();
Form::new()
// Create a new Multipart Form validator for backgrounded uploads
//
// This form is expecting a single array field, 'images' with at most 10 files in it
Ok(Form::new()
.max_files(state.config.server.max_file_count)
.max_file_size(state.config.media.max_file_size * MEGABYTES)
.transform_error(transform_error)
@ -357,7 +410,7 @@ impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
.instrument(span),
)
})),
)
))
}
fn extract(value: Value<Self::Item>) -> Result<Self, Self::Error>
@ -372,7 +425,10 @@ impl<S: Store + 'static> FormData for BackgroundedUpload<S> {
async fn upload_backgrounded<S: Store>(
Multipart(BackgroundedUpload(value, _)): Multipart<BackgroundedUpload<S>>,
state: web::Data<State<S>>,
upload_query: web::Query<UploadQuery>,
) -> Result<HttpResponse, Error> {
let upload_query = upload_query.into_inner();
let images = value
.map()
.and_then(|mut m| m.remove("images"))
@ -389,7 +445,14 @@ async fn upload_backgrounded<S: Store>(
let upload_id = image.result.upload_id().expect("Upload ID exists");
let identifier = image.result.identifier().expect("Identifier exists");
queue::queue_ingest(&state.repo, identifier, upload_id, None).await?;
queue::queue_ingest(
&state.repo,
identifier,
upload_id,
None,
upload_query.clone(),
)
.await?;
files.push(serde_json::json!({
"upload_id": upload_id.to_string(),
@ -462,11 +525,21 @@ struct UrlQuery {
backgrounded: bool,
}
#[derive(Debug, serde::Deserialize)]
struct DownloadQuery {
#[serde(flatten)]
url_query: UrlQuery,
#[serde(flatten)]
upload_query: UploadQuery,
}
async fn ingest_inline<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>>,
state: &State<S>,
upload_query: &UploadQuery,
) -> Result<(Alias, DeleteToken, Details), Error> {
let session = ingest::ingest(state, stream, None).await?;
let session = ingest::ingest(state, stream, None, upload_query).await?;
let alias = session.alias().expect("alias should exist").to_owned();
@ -480,15 +553,20 @@ async fn ingest_inline<S: Store + 'static>(
/// download an image from a URL
#[tracing::instrument(name = "Downloading file", skip(state))]
async fn download<S: Store + 'static>(
query: web::Query<UrlQuery>,
download_query: web::Query<DownloadQuery>,
state: web::Data<State<S>>,
) -> Result<HttpResponse, Error> {
let stream = download_stream(&query.url, &state).await?;
let DownloadQuery {
url_query,
upload_query,
} = download_query.into_inner();
if query.backgrounded {
do_download_backgrounded(stream, state).await
let stream = download_stream(&url_query.url, &state).await?;
if url_query.backgrounded {
do_download_backgrounded(stream, state, upload_query).await
} else {
do_download_inline(stream, &state).await
do_download_inline(stream, &state, &upload_query).await
}
}
@ -503,7 +581,7 @@ async fn download_stream<S>(
let res = state.client.get(url).send().await?;
if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into());
return Err(UploadError::Download(http1::to_actix_status(res.status())).into());
}
let stream = crate::stream::limit(
@ -518,10 +596,11 @@ async fn download_stream<S>(
async fn do_download_inline<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>>,
state: &State<S>,
upload_query: &UploadQuery,
) -> Result<HttpResponse, Error> {
metrics::counter!(crate::init_metrics::FILES, "download" => "inline").increment(1);
let (alias, delete_token, details) = ingest_inline(stream, state).await?;
let (alias, delete_token, details) = ingest_inline(stream, state, upload_query).await?;
Ok(HttpResponse::Created().json(&serde_json::json!({
"msg": "ok",
@ -537,6 +616,7 @@ async fn do_download_inline<S: Store + 'static>(
async fn do_download_backgrounded<S: Store + 'static>(
stream: impl Stream<Item = Result<web::Bytes, Error>>,
state: web::Data<State<S>>,
upload_query: UploadQuery,
) -> Result<HttpResponse, Error> {
metrics::counter!(crate::init_metrics::FILES, "download" => "background").increment(1);
@ -545,7 +625,7 @@ async fn do_download_backgrounded<S: Store + 'static>(
let upload_id = backgrounded.upload_id().expect("Upload ID exists");
let identifier = backgrounded.identifier().expect("Identifier exists");
queue::queue_ingest(&state.repo, identifier, upload_id, None).await?;
queue::queue_ingest(&state.repo, identifier, upload_id, None, upload_query).await?;
backgrounded.disarm();
@ -694,7 +774,7 @@ fn prepare_process(
config: &Configuration,
operations: Vec<(String, String)>,
ext: &str,
) -> Result<(InputProcessableFormat, PathBuf, Vec<String>), Error> {
) -> Result<(InputProcessableFormat, String, Vec<String>), Error> {
let operations = operations
.into_iter()
.filter(|(k, _)| config.media.filters.contains(&k.to_lowercase()))
@ -704,10 +784,9 @@ fn prepare_process(
.parse::<InputProcessableFormat>()
.map_err(|_| UploadError::UnsupportedProcessExtension)?;
let (thumbnail_path, thumbnail_args) =
self::processor::build_chain(&operations, &format.to_string())?;
let (variant, variant_args) = self::processor::build_chain(&operations, &format.to_string())?;
Ok((format, thumbnail_path, thumbnail_args))
Ok((format, variant, variant_args))
}
#[tracing::instrument(name = "Fetching derived details", skip(state))]
@ -718,7 +797,7 @@ async fn process_details<S: Store>(
) -> Result<HttpResponse, Error> {
let alias = alias_from_query(source.into(), &state).await?;
let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?;
let (_, variant, _) = prepare_process(&state.config, operations, ext.as_str())?;
let hash = state
.repo
@ -726,18 +805,16 @@ async fn process_details<S: Store>(
.await?
.ok_or(UploadError::MissingAlias)?;
let thumbnail_string = thumbnail_path.to_string_lossy().to_string();
if !state.config.server.read_only {
state
.repo
.accessed_variant(hash.clone(), thumbnail_string.clone())
.accessed_variant(hash.clone(), variant.clone())
.await?;
}
let identifier = state
.repo
.variant_identifier(hash, thumbnail_string)
.variant_identifier(hash, variant)
.await?
.ok_or(UploadError::MissingAlias)?;
@ -767,20 +844,16 @@ async fn not_found_hash(repo: &ArcRepo) -> Result<Option<(Alias, Hash)>, Error>
}
/// Process files
#[tracing::instrument(name = "Serving processed image", skip(state, process_map))]
#[tracing::instrument(name = "Serving processed image", skip(state))]
async fn process<S: Store + 'static>(
range: Option<web::Header<Range>>,
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
ext: web::Path<String>,
state: web::Data<State<S>>,
process_map: web::Data<ProcessMap>,
) -> Result<HttpResponse, Error> {
let alias = proxy_alias_from_query(source.into(), &state).await?;
let (format, thumbnail_path, thumbnail_args) =
prepare_process(&state.config, operations, ext.as_str())?;
let path_string = thumbnail_path.to_string_lossy().to_string();
let (format, variant, variant_args) = prepare_process(&state.config, operations, ext.as_str())?;
let (hash, alias, not_found) = if let Some(hash) = state.repo.hash(&alias).await? {
(hash, alias, false)
@ -795,13 +868,13 @@ async fn process<S: Store + 'static>(
if !state.config.server.read_only {
state
.repo
.accessed_variant(hash.clone(), path_string.clone())
.accessed_variant(hash.clone(), variant.clone())
.await?;
}
let identifier_opt = state
.repo
.variant_identifier(hash.clone(), path_string)
.variant_identifier(hash.clone(), variant.clone())
.await?;
let (details, identifier) = if let Some(identifier) = identifier_opt {
@ -813,18 +886,34 @@ async fn process<S: Store + 'static>(
return Err(UploadError::ReadOnly.into());
}
let original_details = ensure_details(&state, &alias).await?;
queue_generate(&state.repo, format, alias, variant.clone(), variant_args).await?;
generate::generate(
&state,
&process_map,
format,
thumbnail_path,
thumbnail_args,
&original_details,
hash,
)
.await?
let mut attempts = 0;
loop {
if attempts > 6 {
return Err(UploadError::ProcessTimeout.into());
}
let entry = state
.repo
.variant_waiter(hash.clone(), variant.clone())
.await?;
let opt = generate::wait_timeout(
hash.clone(),
variant.clone(),
entry,
&state,
Duration::from_secs(5),
)
.await?;
if let Some(tuple) = opt {
break tuple;
}
attempts += 1;
}
};
if let Some(public_url) = state.store.public_url(&identifier) {
@ -855,9 +944,8 @@ async fn process_head<S: Store + 'static>(
}
};
let (_, thumbnail_path, _) = prepare_process(&state.config, operations, ext.as_str())?;
let (_, variant, _) = prepare_process(&state.config, operations, ext.as_str())?;
let path_string = thumbnail_path.to_string_lossy().to_string();
let Some(hash) = state.repo.hash(&alias).await? else {
// Invalid alias
return Ok(HttpResponse::NotFound().finish());
@ -866,14 +954,11 @@ async fn process_head<S: Store + 'static>(
if !state.config.server.read_only {
state
.repo
.accessed_variant(hash.clone(), path_string.clone())
.accessed_variant(hash.clone(), variant.clone())
.await?;
}
let identifier_opt = state
.repo
.variant_identifier(hash.clone(), path_string)
.await?;
let identifier_opt = state.repo.variant_identifier(hash.clone(), variant).await?;
if let Some(identifier) = identifier_opt {
let details = ensure_details_identifier(&state, &identifier).await?;
@ -892,7 +977,7 @@ async fn process_head<S: Store + 'static>(
/// Process files
#[tracing::instrument(name = "Spawning image process", skip(state))]
async fn process_backgrounded<S: Store>(
async fn process_backgrounded<S: Store + 'static>(
web::Query(ProcessQuery { source, operations }): web::Query<ProcessQuery>,
ext: web::Path<String>,
state: web::Data<State<S>>,
@ -909,10 +994,9 @@ async fn process_backgrounded<S: Store>(
}
};
let (target_format, process_path, process_args) =
let (target_format, variant, variant_args) =
prepare_process(&state.config, operations, ext.as_str())?;
let path_string = process_path.to_string_lossy().to_string();
let Some(hash) = state.repo.hash(&source).await? else {
// Invalid alias
return Ok(HttpResponse::BadRequest().finish());
@ -920,7 +1004,7 @@ async fn process_backgrounded<S: Store>(
let identifier_opt = state
.repo
.variant_identifier(hash.clone(), path_string)
.variant_identifier(hash.clone(), variant.clone())
.await?;
if identifier_opt.is_some() {
@ -931,14 +1015,7 @@ async fn process_backgrounded<S: Store>(
return Err(UploadError::ReadOnly.into());
}
queue_generate(
&state.repo,
target_format,
source,
process_path,
process_args,
)
.await?;
queue_generate(&state.repo, target_format, source, variant, variant_args).await?;
Ok(HttpResponse::Accepted().finish())
}
@ -1212,7 +1289,7 @@ async fn proxy_alias_from_query<S: Store + 'static>(
} else if !state.config.server.read_only {
let stream = download_stream(proxy.as_str(), state).await?;
let (alias, _, _) = ingest_inline(stream, state).await?;
let (alias, _, _) = ingest_inline(stream, state, &Default::default()).await?;
state.repo.relate_url(proxy, alias.clone()).await?;
@ -1497,15 +1574,25 @@ fn build_client() -> Result<ClientWithMiddleware, Error> {
.build())
}
fn query_config() -> web::QueryConfig {
web::QueryConfig::default()
.error_handler(|err, _| Error::from(UploadError::InvalidQuery(err)).into())
}
fn json_config() -> web::JsonConfig {
web::JsonConfig::default()
.error_handler(|err, _| Error::from(UploadError::InvalidJson(err)).into())
}
fn configure_endpoints<S: Store + 'static, F: Fn(&mut web::ServiceConfig)>(
config: &mut web::ServiceConfig,
state: State<S>,
process_map: ProcessMap,
extra_config: F,
) {
config
.app_data(query_config())
.app_data(json_config())
.app_data(web::Data::new(state.clone()))
.app_data(web::Data::new(process_map.clone()))
.route("/healthz", web::get().to(healthz::<S>))
.service(
web::scope("/image")
@ -1613,12 +1700,12 @@ fn spawn_cleanup<S>(state: State<S>) {
});
}
fn spawn_workers<S>(state: State<S>, process_map: ProcessMap)
fn spawn_workers<S>(state: State<S>)
where
S: Store + 'static,
{
crate::sync::spawn("cleanup-worker", queue::process_cleanup(state.clone()));
crate::sync::spawn("process-worker", queue::process_images(state, process_map));
crate::sync::spawn("process-worker", queue::process_images(state));
}
fn watch_keys(tls: Tls, sender: ChannelSender) -> DropHandle<()> {
@ -1644,8 +1731,6 @@ async fn launch<
state: State<S>,
extra_config: F,
) -> color_eyre::Result<()> {
let process_map = ProcessMap::new();
let address = state.config.server.address;
let tls = Tls::from_config(&state.config);
@ -1655,18 +1740,15 @@ async fn launch<
let server = HttpServer::new(move || {
let extra_config = extra_config.clone();
let state = state.clone();
let process_map = process_map.clone();
spawn_workers(state.clone(), process_map.clone());
spawn_workers(state.clone());
App::new()
.wrap(TracingLogger::default())
.wrap(Deadline)
.wrap(Metrics)
.wrap(Payload::new())
.configure(move |sc| {
configure_endpoints(sc, state.clone(), process_map.clone(), extra_config)
})
.configure(move |sc| configure_endpoints(sc, state.clone(), extra_config))
});
if let Some(tls) = tls {
@ -1682,7 +1764,7 @@ async fn launch<
tracing::info!("Starting pict-rs with TLS on {address}");
server.bind_rustls_0_22(address, config)?.run().await?;
server.bind_rustls_0_23(address, config)?.run().await?;
handle.abort();
let _ = handle.await;
@ -1787,7 +1869,8 @@ impl<P: AsRef<Path>, T: serde::Serialize> ConfigSource<P, T> {
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let configuration = pict_rs::ConfigSource::memory(serde_json::json!({
/// "server": {
/// "address": "127.0.0.1:8080"
/// "address": "127.0.0.1:8080",
/// "temporary_directory": "/tmp/t1"
/// },
/// "repo": {
/// "type": "sled",
@ -1855,6 +1938,19 @@ impl PictRsConfiguration {
Ok(self)
}
/// Install aws-lc-rs as the default crypto provider
///
/// This would happen automatically anyway unless rustls crate features get mixed up
pub fn install_crypto_provider(self) -> Self {
if rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.is_err()
{
tracing::info!("rustls crypto provider already installed");
}
self
}
/// Run the pict-rs application on a tokio `LocalSet`
///
/// This must be called from within `tokio::main` directly
@ -1864,13 +1960,16 @@ impl PictRsConfiguration {
/// #[tokio::main]
/// async fn main() -> color_eyre::Result<()> {
/// let pict_rs_server = pict_rs::ConfigSource::memory(serde_json::json!({
/// "server": {
/// "temporary_directory": "/tmp/t2"
/// },
/// "repo": {
/// "type": "sled",
/// "path": "/tmp/pict-rs/run-on-localset/sled-repo",
/// "path": "/tmp/pict-rs-run-on-localset/sled-repo",
/// },
/// "store": {
/// "type": "filesystem",
/// "path": "/tmp/pict-rs/run-on-localset/files",
/// "path": "/tmp/pict-rs-run-on-localset/files",
/// },
/// }))
/// .init::<&str>(None)?
@ -1895,13 +1994,16 @@ impl PictRsConfiguration {
/// fn main() -> color_eyre::Result<()> {
/// actix_web::rt::System::new().block_on(async move {
/// let pict_rs_server = pict_rs::ConfigSource::memory(serde_json::json!({
/// "server": {
/// "temporary_directory": "/tmp/t3"
/// },
/// "repo": {
/// "type": "sled",
/// "path": "/tmp/pict-rs/run/sled-repo",
/// "path": "/tmp/pict-rs-run/sled-repo",
/// },
/// "store": {
/// "type": "filesystem",
/// "path": "/tmp/pict-rs/run/files",
/// "path": "/tmp/pict-rs-run/files",
/// },
/// }))
/// .init::<&str>(None)?

View file

@ -4,6 +4,7 @@ fn main() -> color_eyre::Result<()> {
pict_rs::PictRsConfiguration::build_default()?
.install_tracing()?
.install_metrics()?
.install_crypto_provider()
.run()
.await
})
@ -18,6 +19,7 @@ fn main() -> color_eyre::Result<()> {
pict_rs::PictRsConfiguration::build_default()?
.install_tracing()?
.install_metrics()?
.install_crypto_provider()
.run_on_localset()
.await
})

View file

@ -45,10 +45,10 @@ impl Drop for MetricsGuard {
}
}
async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
async fn drain(mut rx: tokio::sync::mpsc::Receiver<actix_web::dev::Payload>) {
let mut set = JoinSet::new();
while let Ok(payload) = rx.recv_async().await {
while let Some(payload) = rx.recv().await {
tracing::trace!("drain: looping");
// draining a payload is a best-effort task - if we can't collect in 2 minutes we bail
@ -94,18 +94,18 @@ async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
struct DrainHandle(Option<Rc<tokio::task::JoinHandle<()>>>);
pub(crate) struct Payload {
sender: flume::Sender<actix_web::dev::Payload>,
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
handle: DrainHandle,
}
pub(crate) struct PayloadMiddleware<S> {
inner: S,
sender: flume::Sender<actix_web::dev::Payload>,
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
_handle: DrainHandle,
}
pub(crate) struct PayloadStream {
inner: Option<actix_web::dev::Payload>,
sender: flume::Sender<actix_web::dev::Payload>,
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
}
impl DrainHandle {

View file

@ -59,9 +59,6 @@ impl Drop for MetricsGuard {
}
}
#[derive(Debug)]
struct StatusError(ExitStatus);
pub(crate) struct Process {
command: Arc<str>,
child: Child,
@ -487,11 +484,3 @@ impl ProcessRead {
}
}
}
impl std::fmt::Display for StatusError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Command failed with bad status: {}", self.0)
}
}
impl std::error::Error for StatusError {}

View file

@ -91,7 +91,7 @@ impl ResizeKind {
pub(crate) fn build_chain(
args: &[(String, String)],
ext: &str,
) -> Result<(PathBuf, Vec<String>), Error> {
) -> Result<(String, Vec<String>), Error> {
fn parse<P: Processor>(key: &str, value: &str) -> Result<Option<P>, Error> {
if key == P::NAME {
return Ok(Some(P::parse(key, value).ok_or(UploadError::ParsePath)?));
@ -122,7 +122,7 @@ pub(crate) fn build_chain(
path.push(ext);
Ok((path, args))
Ok((path.to_string_lossy().to_string(), args))
}
impl Processor for Identity {

View file

@ -1,5 +1,4 @@
use crate::{
concurrent_processor::ProcessMap,
error::{Error, UploadError},
formats::InputProcessableFormat,
future::{LocalBoxFuture, WithPollTimer},
@ -7,14 +6,16 @@ use crate::{
serde_str::Serde,
state::State,
store::Store,
UploadQuery,
};
use std::{
ops::Deref,
path::PathBuf,
rc::Rc,
sync::Arc,
time::{Duration, Instant},
};
use tokio::task::JoinError;
use tracing::Instrument;
pub(crate) mod cleanup;
@ -56,11 +57,13 @@ enum Process {
identifier: String,
upload_id: Serde<UploadId>,
declared_alias: Option<Serde<Alias>>,
#[serde(default)]
upload_query: UploadQuery,
},
Generate {
target_format: InputProcessableFormat,
source: Serde<Alias>,
process_path: PathBuf,
process_path: String,
process_args: Vec<String>,
},
}
@ -158,11 +161,13 @@ pub(crate) async fn queue_ingest(
identifier: &Arc<str>,
upload_id: UploadId,
declared_alias: Option<Alias>,
upload_query: UploadQuery,
) -> Result<(), Error> {
let job = serde_json::to_value(Process::Ingest {
identifier: identifier.to_string(),
declared_alias: declared_alias.map(Serde::new),
upload_id: Serde::new(upload_id),
upload_query,
})
.map_err(UploadError::PushJob)?;
repo.push(PROCESS_QUEUE, job, None).await?;
@ -173,13 +178,13 @@ pub(crate) async fn queue_generate(
repo: &ArcRepo,
target_format: InputProcessableFormat,
source: Alias,
process_path: PathBuf,
variant: String,
process_args: Vec<String>,
) -> Result<(), Error> {
let job = serde_json::to_value(Process::Generate {
target_format,
source: Serde::new(source),
process_path,
process_path: variant,
process_args,
})
.map_err(UploadError::PushJob)?;
@ -191,8 +196,8 @@ pub(crate) async fn process_cleanup<S: Store + 'static>(state: State<S>) {
process_jobs(state, CLEANUP_QUEUE, cleanup::perform).await
}
pub(crate) async fn process_images<S: Store + 'static>(state: State<S>, process_map: ProcessMap) {
process_image_jobs(state, process_map, PROCESS_QUEUE, process::perform).await
pub(crate) async fn process_images<S: Store + 'static>(state: State<S>) {
process_jobs(state, PROCESS_QUEUE, process::perform).await
}
struct MetricsGuard {
@ -294,54 +299,66 @@ where
}
}
fn job_result(result: &JobResult) -> crate::repo::JobResult {
fn job_result(result: &Result<JobResult, JoinError>) -> crate::repo::JobResult {
match result {
Ok(()) => crate::repo::JobResult::Success,
Err(JobError::Retry(_)) => crate::repo::JobResult::Failure,
Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted,
Ok(Ok(())) => crate::repo::JobResult::Success,
Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure,
Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted,
Err(_) => crate::repo::JobResult::Aborted,
}
}
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
where
S: Store,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
S: Store + 'static,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
{
let worker_id = uuid::Uuid::new_v4();
let state = Rc::new(state);
loop {
tracing::trace!("process_jobs: looping");
crate::sync::cooperate().await;
let res = job_loop(&state, worker_id, queue, callback)
.with_poll_timer("job-loop")
.await;
// add a panic boundary by spawning a task
let res = crate::sync::spawn(
"job-loop",
job_loop(state.clone(), worker_id, queue, callback),
)
.await;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}"));
match res {
// clean exit
Ok(Ok(())) => break,
if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await;
// job error
Ok(Err(e)) => {
tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}"));
if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
continue;
// job panic
Err(_) => {
tracing::warn!("Panic while processing jobs");
}
}
break;
}
}
async fn job_loop<S, F>(
state: &State<S>,
state: Rc<State<S>>,
worker_id: uuid::Uuid,
queue: &'static str,
callback: F,
) -> Result<(), Error>
where
S: Store,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
S: Store + 'static,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
{
loop {
tracing::trace!("job_loop: looping");
@ -352,114 +369,32 @@ where
let (job_id, job) = state
.repo
.pop(queue, worker_id)
.with_poll_timer("pop-cleanup")
.with_poll_timer("pop-job")
.await?;
let guard = MetricsGuard::guard(worker_id, queue);
let res = heartbeat(
&state.repo,
queue,
worker_id,
job_id,
(callback)(state, job),
)
.with_poll_timer("cleanup-job-and-heartbeat")
let state2 = state.clone();
let res = crate::sync::spawn("job-and-heartbeat", async move {
let state = state2;
heartbeat(
&state.repo,
queue,
worker_id,
job_id,
(callback)(&state, job),
)
.await
})
.await;
state
.repo
.complete_job(queue, worker_id, job_id, job_result(&res))
.with_poll_timer("cleanup-job-complete")
.with_poll_timer("job-complete")
.await?;
res?;
guard.disarm();
Ok(()) as Result<(), Error>
}
.instrument(tracing::info_span!("tick", %queue, %worker_id))
.await?;
}
}
async fn process_image_jobs<S, F>(
state: State<S>,
process_map: ProcessMap,
queue: &'static str,
callback: F,
) where
S: Store,
for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> JobFuture<'a> + Copy,
{
let worker_id = uuid::Uuid::new_v4();
loop {
tracing::trace!("process_image_jobs: looping");
crate::sync::cooperate().await;
let res = image_job_loop(&state, &process_map, worker_id, queue, callback)
.with_poll_timer("image-job-loop")
.await;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}"));
if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await;
}
continue;
}
break;
}
}
async fn image_job_loop<S, F>(
state: &State<S>,
process_map: &ProcessMap,
worker_id: uuid::Uuid,
queue: &'static str,
callback: F,
) -> Result<(), Error>
where
S: Store,
for<'a> F: Fn(&'a State<S>, &'a ProcessMap, serde_json::Value) -> JobFuture<'a> + Copy,
{
loop {
tracing::trace!("image_job_loop: looping");
crate::sync::cooperate().await;
async {
let (job_id, job) = state
.repo
.pop(queue, worker_id)
.with_poll_timer("pop-process")
.await?;
let guard = MetricsGuard::guard(worker_id, queue);
let res = heartbeat(
&state.repo,
queue,
worker_id,
job_id,
(callback)(state, process_map, job),
)
.with_poll_timer("process-job-and-heartbeat")
.await;
state
.repo
.complete_job(queue, worker_id, job_id, job_result(&res))
.await?;
res?;
res.map_err(|_| UploadError::Canceled)??;
guard.disarm();

View file

@ -1,8 +1,6 @@
use time::Instant;
use tracing::{Instrument, Span};
use crate::{
concurrent_processor::ProcessMap,
error::{Error, UploadError},
formats::InputProcessableFormat,
future::WithPollTimer,
@ -12,16 +10,13 @@ use crate::{
serde_str::Serde,
state::State,
store::Store,
UploadQuery,
};
use std::{path::PathBuf, sync::Arc};
use std::{sync::Arc, time::Instant};
use super::{JobContext, JobFuture, JobResult};
pub(super) fn perform<'a, S>(
state: &'a State<S>,
process_map: &'a ProcessMap,
job: serde_json::Value,
) -> JobFuture<'a>
pub(super) fn perform<S>(state: &State<S>, job: serde_json::Value) -> JobFuture<'_>
where
S: Store + 'static,
{
@ -37,12 +32,14 @@ where
identifier,
upload_id,
declared_alias,
upload_query,
} => {
process_ingest(
state,
Arc::from(identifier),
Serde::into_inner(upload_id),
declared_alias.map(Serde::into_inner),
upload_query,
)
.with_poll_timer("process-ingest")
.await?
@ -55,7 +52,6 @@ where
} => {
generate(
state,
process_map,
target_format,
Serde::into_inner(source),
process_path,
@ -93,7 +89,7 @@ impl UploadGuard {
impl Drop for UploadGuard {
fn drop(&mut self) {
metrics::counter!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST, "completed" => (!self.armed).to_string()).increment(1);
metrics::histogram!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST_DURATION, "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_seconds_f64());
metrics::histogram!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST_DURATION, "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_secs_f64());
if self.armed {
tracing::warn!(
@ -110,6 +106,7 @@ async fn process_ingest<S>(
unprocessed_identifier: Arc<str>,
upload_id: UploadId,
declared_alias: Option<Alias>,
upload_query: UploadQuery,
) -> JobResult
where
S: Store + 'static,
@ -129,7 +126,8 @@ where
let stream =
crate::stream::from_err(state2.store.to_stream(&ident, None, None).await?);
let session = crate::ingest::ingest(&state2, stream, declared_alias).await?;
let session =
crate::ingest::ingest(&state2, stream, declared_alias, &upload_query).await?;
Ok(session) as Result<Session, Error>
}
@ -173,13 +171,12 @@ where
Ok(())
}
#[tracing::instrument(skip(state, process_map, process_path, process_args))]
#[tracing::instrument(skip(state, variant, process_args))]
async fn generate<S: Store + 'static>(
state: &State<S>,
process_map: &ProcessMap,
target_format: InputProcessableFormat,
source: Alias,
process_path: PathBuf,
variant: String,
process_args: Vec<String>,
) -> JobResult {
let hash = state
@ -190,10 +187,9 @@ async fn generate<S: Store + 'static>(
.ok_or(UploadError::MissingAlias)
.abort()?;
let path_string = process_path.to_string_lossy().to_string();
let identifier_opt = state
.repo
.variant_identifier(hash.clone(), path_string)
.variant_identifier(hash.clone(), variant.clone())
.await
.retry()?;
@ -206,9 +202,8 @@ async fn generate<S: Store + 'static>(
crate::generate::generate(
state,
process_map,
target_format,
process_path,
variant,
process_args,
&original_details,
hash,

View file

@ -3,6 +3,7 @@ mod delete_token;
mod hash;
mod metrics;
mod migrate;
mod notification_map;
use crate::{
config,
@ -23,6 +24,7 @@ pub(crate) use alias::Alias;
pub(crate) use delete_token::DeleteToken;
pub(crate) use hash::Hash;
pub(crate) use migrate::{migrate_04, migrate_repo};
pub(crate) use notification_map::NotificationEntry;
pub(crate) type ArcRepo = Arc<dyn FullRepo>;
@ -103,6 +105,7 @@ pub(crate) trait FullRepo:
+ AliasRepo
+ QueueRepo
+ HashRepo
+ VariantRepo
+ StoreMigrationRepo
+ AliasAccessRepo
+ VariantAccessRepo
@ -441,7 +444,6 @@ where
pub(crate) trait SettingsRepo: BaseRepo {
async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError>;
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError>;
async fn remove(&self, key: &'static str) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
@ -456,10 +458,6 @@ where
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
T::get(self, key).await
}
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
T::remove(self, key).await
}
}
#[async_trait::async_trait(?Send)]
@ -653,20 +651,6 @@ pub(crate) trait HashRepo: BaseRepo {
async fn identifier(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError>;
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError>;
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<Arc<str>>, RepoError>;
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError>;
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError>;
async fn blurhash(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError>;
@ -726,6 +710,96 @@ where
T::identifier(self, hash).await
}
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError> {
T::relate_blurhash(self, hash, blurhash).await
}
async fn blurhash(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
T::blurhash(self, hash).await
}
async fn relate_motion_identifier(
&self,
hash: Hash,
identifier: &Arc<str>,
) -> Result<(), RepoError> {
T::relate_motion_identifier(self, hash, identifier).await
}
async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
T::motion_identifier(self, hash).await
}
async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> {
T::cleanup_hash(self, hash).await
}
}
#[async_trait::async_trait(?Send)]
pub(crate) trait VariantRepo: BaseRepo {
async fn claim_variant_processing_rights(
&self,
hash: Hash,
variant: String,
) -> Result<Result<(), NotificationEntry>, RepoError>;
async fn variant_waiter(
&self,
hash: Hash,
variant: String,
) -> Result<NotificationEntry, RepoError>;
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError>;
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<Arc<str>>, RepoError>;
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError>;
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
impl<T> VariantRepo for Arc<T>
where
T: VariantRepo,
{
async fn claim_variant_processing_rights(
&self,
hash: Hash,
variant: String,
) -> Result<Result<(), NotificationEntry>, RepoError> {
T::claim_variant_processing_rights(self, hash, variant).await
}
async fn variant_waiter(
&self,
hash: Hash,
variant: String,
) -> Result<NotificationEntry, RepoError> {
T::variant_waiter(self, hash, variant).await
}
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
T::variant_heartbeat(self, hash, variant).await
}
async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
T::notify_variant(self, hash, variant).await
}
async fn relate_variant_identifier(
&self,
hash: Hash,
@ -750,30 +824,6 @@ where
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
T::remove_variant(self, hash, variant).await
}
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError> {
T::relate_blurhash(self, hash, blurhash).await
}
async fn blurhash(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
T::blurhash(self, hash).await
}
async fn relate_motion_identifier(
&self,
hash: Hash,
identifier: &Arc<str>,
) -> Result<(), RepoError> {
T::relate_motion_identifier(self, hash, identifier).await
}
async fn motion_identifier(&self, hash: Hash) -> Result<Option<Arc<str>>, RepoError> {
T::motion_identifier(self, hash).await
}
async fn cleanup_hash(&self, hash: Hash) -> Result<(), RepoError> {
T::cleanup_hash(self, hash).await
}
}
#[async_trait::async_trait(?Send)]

View file

@ -0,0 +1,92 @@
use dashmap::{mapref::entry::Entry, DashMap};
use std::{
future::Future,
sync::{Arc, Weak},
time::Duration,
};
use tokio::sync::Notify;
use crate::future::WithTimeout;
type Map = Arc<DashMap<Arc<str>, Weak<NotificationEntryInner>>>;
#[derive(Clone)]
pub(super) struct NotificationMap {
map: Map,
}
pub(crate) struct NotificationEntry {
inner: Arc<NotificationEntryInner>,
}
struct NotificationEntryInner {
key: Arc<str>,
map: Map,
notify: Notify,
}
impl NotificationMap {
pub(super) fn new() -> Self {
Self {
map: Arc::new(DashMap::new()),
}
}
pub(super) fn register_interest(&self, key: Arc<str>) -> NotificationEntry {
match self.map.entry(key.clone()) {
Entry::Occupied(mut occupied) => {
if let Some(inner) = occupied.get().upgrade() {
NotificationEntry { inner }
} else {
let inner = Arc::new(NotificationEntryInner {
key,
map: self.map.clone(),
notify: crate::sync::bare_notify(),
});
occupied.insert(Arc::downgrade(&inner));
NotificationEntry { inner }
}
}
Entry::Vacant(vacant) => {
let inner = Arc::new(NotificationEntryInner {
key,
map: self.map.clone(),
notify: crate::sync::bare_notify(),
});
vacant.insert(Arc::downgrade(&inner));
NotificationEntry { inner }
}
}
}
pub(super) fn notify(&self, key: &str) {
if let Some(notifier) = self.map.get(key).and_then(|v| v.upgrade()) {
notifier.notify.notify_waiters();
}
}
}
impl NotificationEntry {
pub(crate) fn notified_timeout(
&mut self,
duration: Duration,
) -> impl Future<Output = Result<(), tokio::time::error::Elapsed>> + '_ {
self.inner.notify.notified().with_timeout(duration)
}
}
impl Default for NotificationMap {
fn default() -> Self {
Self::new()
}
}
impl Drop for NotificationEntryInner {
fn drop(&mut self) {
self.map.remove(&self.key);
}
}

View file

@ -4,6 +4,7 @@ mod schema;
use std::{
collections::{BTreeSet, VecDeque},
future::Future,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
@ -25,7 +26,7 @@ use diesel_async::{
use futures_core::Stream;
use tokio::sync::Notify;
use tokio_postgres::{AsyncMessage, Connection, NoTls, Notification, Socket};
use tokio_postgres_rustls::MakeRustlsConnect;
use tokio_postgres_generic_rustls::{AwsLcRsDigest, MakeRustlsConnect};
use tracing::Instrument;
use url::Url;
use uuid::Uuid;
@ -43,10 +44,11 @@ use self::job_status::JobStatus;
use super::{
metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard},
notification_map::{NotificationEntry, NotificationMap},
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo,
FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
UploadResult, VariantAccessRepo, VariantAlreadyExists,
UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
};
#[derive(Clone)]
@ -62,6 +64,7 @@ struct Inner {
notifier_pool: Pool<AsyncPgConnection>,
queue_notifications: DashMap<String, Arc<Notify>>,
upload_notifications: DashMap<UploadId, Weak<Notify>>,
keyed_notifications: NotificationMap,
}
struct UploadInterest {
@ -81,6 +84,10 @@ struct UploadNotifierState<'a> {
inner: &'a Inner,
}
struct KeyedNotifierState<'a> {
inner: &'a Inner,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ConnectPostgresError {
#[error("Failed to connect to postgres for migrations")]
@ -102,7 +109,7 @@ pub(crate) enum PostgresError {
Pool(#[source] RunError),
#[error("Error in database")]
Diesel(#[source] diesel::result::Error),
Diesel(#[from] diesel::result::Error),
#[error("Error deserializing hex value")]
Hex(#[source] hex::FromHexError),
@ -166,7 +173,7 @@ impl PostgresError {
async fn build_tls_connector(
certificate_file: Option<PathBuf>,
) -> Result<MakeRustlsConnect, TlsError> {
) -> Result<MakeRustlsConnect<AwsLcRsDigest>, TlsError> {
let mut cert_store = rustls::RootCertStore {
roots: Vec::from(webpki_roots::TLS_SERVER_ROOTS),
};
@ -192,14 +199,14 @@ async fn build_tls_connector(
.with_root_certificates(cert_store)
.with_no_client_auth();
let tls = MakeRustlsConnect::new(config);
let tls = MakeRustlsConnect::new(config, AwsLcRsDigest);
Ok(tls)
}
async fn connect_for_migrations(
postgres_url: &Url,
tls_connector: Option<MakeRustlsConnect>,
tls_connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
) -> Result<
(
tokio_postgres::Client,
@ -258,8 +265,8 @@ where
async fn build_pool(
postgres_url: &Url,
tx: flume::Sender<Notification>,
connector: Option<MakeRustlsConnect>,
tx: tokio::sync::mpsc::Sender<Notification>,
connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
max_size: u32,
) -> Result<Pool<AsyncPgConnection>, ConnectPostgresError> {
let mut config = ManagerConfig::default();
@ -312,7 +319,7 @@ impl PostgresRepo {
.map(|u| u.into())
.unwrap_or(1_usize);
let (tx, rx) = flume::bounded(10);
let (tx, rx) = crate::sync::channel(10);
let pool = build_pool(
&postgres_url,
@ -331,6 +338,7 @@ impl PostgresRepo {
notifier_pool,
queue_notifications: DashMap::new(),
upload_notifications: DashMap::new(),
keyed_notifications: NotificationMap::new(),
});
let handle = crate::sync::abort_on_drop(crate::sync::spawn_sendable(
@ -363,8 +371,97 @@ impl PostgresRepo {
.with_poll_timer("postgres-get-notifier-connection")
.await
}
async fn insert_keyed_notifier(
&self,
input_key: &str,
) -> Result<Result<(), AlreadyInserted>, PostgresError> {
use schema::keyed_notifications::dsl::*;
let mut conn = self.get_connection().await?;
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
diesel::delete(keyed_notifications)
.filter(heartbeat.le(timestamp.saturating_sub(time::Duration::minutes(2))))
.execute(&mut conn)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
let res = diesel::insert_into(keyed_notifications)
.values(key.eq(input_key))
.execute(&mut conn)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(AlreadyInserted)),
Err(e) => Err(PostgresError::Diesel(e)),
}
}
async fn keyed_notifier_heartbeat(&self, input_key: &str) -> Result<(), PostgresError> {
use schema::keyed_notifications::dsl::*;
let mut conn = self.get_connection().await?;
let timestamp = to_primitive(time::OffsetDateTime::now_utc());
diesel::update(keyed_notifications)
.filter(key.eq(input_key))
.set(heartbeat.eq(timestamp))
.execute(&mut conn)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
fn listen_on_key(&self, key: Arc<str>) -> NotificationEntry {
self.inner.keyed_notifications.register_interest(key)
}
async fn register_interest(&self) -> Result<(), PostgresError> {
let mut notifier_conn = self.get_notifier_connection().await?;
diesel::sql_query("LISTEN keyed_notification_channel;")
.execute(&mut notifier_conn)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
async fn clear_keyed_notifier(&self, input_key: String) -> Result<(), PostgresError> {
use schema::keyed_notifications::dsl::*;
let mut conn = self.get_connection().await?;
diesel::delete(keyed_notifications)
.filter(key.eq(input_key))
.execute(&mut conn)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
}
struct AlreadyInserted;
struct GetConnectionMetricsGuard {
start: Instant,
armed: bool,
@ -437,13 +534,15 @@ impl Inner {
}
impl UploadInterest {
async fn notified_timeout(&self, timeout: Duration) -> Result<(), tokio::time::error::Elapsed> {
fn notified_timeout(
&self,
timeout: Duration,
) -> impl Future<Output = Result<(), tokio::time::error::Elapsed>> + '_ {
self.interest
.as_ref()
.expect("interest exists")
.notified()
.with_timeout(timeout)
.await
}
}
@ -511,12 +610,18 @@ impl<'a> UploadNotifierState<'a> {
}
}
impl<'a> KeyedNotifierState<'a> {
fn handle(&self, key: &str) {
self.inner.keyed_notifications.notify(key);
}
}
type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
type ConfigFn =
Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>;
async fn delegate_notifications(
receiver: flume::Receiver<Notification>,
mut receiver: tokio::sync::mpsc::Receiver<Notification>,
inner: Arc<Inner>,
capacity: usize,
) {
@ -529,7 +634,9 @@ async fn delegate_notifications(
let upload_notifier_state = UploadNotifierState { inner: &inner };
while let Ok(notification) = receiver.recv_async().await {
let keyed_notifier_state = KeyedNotifierState { inner: &inner };
while let Some(notification) = receiver.recv().await {
tracing::trace!("delegate_notifications: looping");
metrics::counter!(crate::init_metrics::POSTGRES_NOTIFICATION).increment(1);
@ -542,6 +649,10 @@ async fn delegate_notifications(
// new upload finished
upload_notifier_state.handle(notification.payload());
}
"keyed_notification_channel" => {
// new keyed notification
keyed_notifier_state.handle(notification.payload());
}
channel => {
tracing::info!(
"Unhandled postgres notification: {channel}: {}",
@ -555,8 +666,8 @@ async fn delegate_notifications(
}
fn build_handler(
sender: flume::Sender<Notification>,
connector: Option<MakeRustlsConnect>,
sender: tokio::sync::mpsc::Sender<Notification>,
connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
) -> ConfigFn {
Box::new(
move |config: &str| -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> {
@ -597,7 +708,7 @@ fn build_handler(
}
fn spawn_db_notification_task<S>(
sender: flume::Sender<Notification>,
sender: tokio::sync::mpsc::Sender<Notification>,
mut conn: Connection<Socket, S>,
) where
S: tokio_postgres::tls::TlsStream + Send + Unpin + 'static,
@ -618,7 +729,7 @@ fn spawn_db_notification_task<S>(
tracing::warn!("Database Notice {e:?}");
}
Ok(AsyncMessage::Notification(notification)) => {
if sender.send_async(notification).await.is_err() {
if sender.send(notification).await.is_err() {
tracing::warn!("Missed notification. Are we shutting down?");
}
}
@ -863,110 +974,6 @@ impl HashRepo for PostgresRepo {
Ok(opt.map(Arc::from))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn relate_variant_identifier(
&self,
input_hash: Hash,
input_variant: String,
input_identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let res = diesel::insert_into(variants)
.values((
hash.eq(&input_hash),
variant.eq(&input_variant),
identifier.eq(input_identifier.as_ref()),
))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(VariantAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
}
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variant_identifier(
&self,
input_hash: Hash,
input_variant: String,
) -> Result<Option<Arc<str>>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let opt = variants
.select(identifier)
.filter(hash.eq(&input_hash))
.filter(variant.eq(&input_variant))
.get_result::<String>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_IDENTIFIER)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?
.map(Arc::from);
Ok(opt)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let vec = variants
.select((variant, identifier))
.filter(hash.eq(&input_hash))
.get_results::<(String, String)>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_FOR_HASH)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?
.into_iter()
.map(|(s, i)| (s, Arc::from(i)))
.collect();
Ok(vec)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn remove_variant(
&self,
input_hash: Hash,
input_variant: String,
) -> Result<(), RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
diesel::delete(variants)
.filter(hash.eq(&input_hash))
.filter(variant.eq(&input_variant))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_REMOVE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
async fn relate_blurhash(
&self,
@ -1083,6 +1090,167 @@ impl HashRepo for PostgresRepo {
}
}
#[async_trait::async_trait(?Send)]
impl VariantRepo for PostgresRepo {
#[tracing::instrument(level = "debug", skip(self))]
async fn claim_variant_processing_rights(
&self,
hash: Hash,
variant: String,
) -> Result<Result<(), NotificationEntry>, RepoError> {
let key = Arc::from(format!("{}{variant}", hash.to_base64()));
let entry = self.listen_on_key(Arc::clone(&key));
self.register_interest().await?;
if self
.variant_identifier(hash.clone(), variant.clone())
.await?
.is_some()
{
return Ok(Err(entry));
}
match self.insert_keyed_notifier(&key).await? {
Ok(()) => Ok(Ok(())),
Err(AlreadyInserted) => Ok(Err(entry)),
}
}
async fn variant_waiter(
&self,
hash: Hash,
variant: String,
) -> Result<NotificationEntry, RepoError> {
let key = Arc::from(format!("{}{variant}", hash.to_base64()));
let entry = self.listen_on_key(key);
self.register_interest().await?;
Ok(entry)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let key = format!("{}{variant}", hash.to_base64());
self.keyed_notifier_heartbeat(&key)
.await
.map_err(Into::into)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let key = format!("{}{variant}", hash.to_base64());
self.clear_keyed_notifier(key).await.map_err(Into::into)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn relate_variant_identifier(
&self,
input_hash: Hash,
input_variant: String,
input_identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let res = diesel::insert_into(variants)
.values((
hash.eq(&input_hash),
variant.eq(&input_variant),
identifier.eq(input_identifier.to_string()),
))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(VariantAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
}
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variant_identifier(
&self,
input_hash: Hash,
input_variant: String,
) -> Result<Option<Arc<str>>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let opt = variants
.select(identifier)
.filter(hash.eq(&input_hash))
.filter(variant.eq(&input_variant))
.get_result::<String>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_IDENTIFIER)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.optional()
.map_err(PostgresError::Diesel)?
.map(Arc::from);
Ok(opt)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variants(&self, input_hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
let vec = variants
.select((variant, identifier))
.filter(hash.eq(&input_hash))
.get_results::<(String, String)>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_FOR_HASH)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?
.into_iter()
.map(|(s, i)| (s, Arc::from(i)))
.collect();
Ok(vec)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn remove_variant(
&self,
input_hash: Hash,
input_variant: String,
) -> Result<(), RepoError> {
use schema::variants::dsl::*;
let mut conn = self.get_connection().await?;
diesel::delete(variants)
.filter(hash.eq(&input_hash))
.filter(variant.eq(&input_variant))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_REMOVE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
impl AliasRepo for PostgresRepo {
#[tracing::instrument(level = "debug", skip(self))]
@ -1244,24 +1412,6 @@ impl SettingsRepo for PostgresRepo {
Ok(opt)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> {
use schema::settings::dsl::*;
let mut conn = self.get_connection().await?;
diesel::delete(settings)
.filter(key.eq(input_key))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_SETTINGS_REMOVE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
@ -1279,16 +1429,22 @@ impl DetailsRepo for PostgresRepo {
let value =
serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?;
diesel::insert_into(details)
let res = diesel::insert_into(details)
.values((identifier.eq(input_identifier.as_ref()), json.eq(&value)))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_DETAILS_RELATE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
.map_err(|_| PostgresError::DbTimeout)?;
Ok(())
match res {
Ok(_)
| Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(()),
Err(e) => Err(PostgresError::Diesel(e).into()),
}
}
#[tracing::instrument(level = "debug", skip(self))]

View file

@ -0,0 +1,50 @@
use barrel::backend::Pg;
use barrel::functions::AutogenFunction;
use barrel::{types, Migration};
pub(crate) fn migration() -> String {
let mut m = Migration::new();
m.create_table("keyed_notifications", |t| {
t.add_column(
"key",
types::text().primary(true).unique(true).nullable(false),
);
t.add_column(
"heartbeat",
types::datetime()
.nullable(false)
.default(AutogenFunction::CurrentTimestamp),
);
t.add_index(
"keyed_notifications_heartbeat_index",
types::index(["heartbeat"]),
);
});
m.inject_custom(
r#"
CREATE OR REPLACE FUNCTION keyed_notify()
RETURNS trigger AS
$$
BEGIN
PERFORM pg_notify('keyed_notification_channel', OLD.key);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"#
.trim(),
);
m.inject_custom(
r#"
CREATE TRIGGER keyed_notification_removed
AFTER DELETE
ON keyed_notifications
FOR EACH ROW
EXECUTE PROCEDURE keyed_notify();
"#,
);
m.make::<Pg>().to_string()
}

View file

@ -48,6 +48,13 @@ diesel::table! {
}
}
diesel::table! {
keyed_notifications (key) {
key -> Text,
heartbeat -> Timestamp,
}
}
diesel::table! {
proxies (url) {
url -> Text,
@ -109,6 +116,7 @@ diesel::allow_tables_to_appear_in_same_query!(
details,
hashes,
job_queue,
keyed_notifications,
proxies,
refinery_schema_history,
settings,

View file

@ -5,6 +5,7 @@ use crate::{
serde_str::Serde,
stream::{from_iterator, LocalBoxStream},
};
use dashmap::DashMap;
use sled::{transaction::TransactionError, Db, IVec, Transactional, Tree};
use std::{
collections::HashMap,
@ -22,10 +23,11 @@ use uuid::Uuid;
use super::{
hash::Hash,
metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard},
notification_map::{NotificationEntry, NotificationMap},
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details,
DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, JobResult, OrderedHash,
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
UploadResult, VariantAccessRepo, VariantAlreadyExists,
UploadResult, VariantAccessRepo, VariantAlreadyExists, VariantRepo,
};
macro_rules! b {
@ -113,6 +115,8 @@ pub(crate) struct SledRepo {
migration_identifiers: Tree,
cache_capacity: u64,
export_path: PathBuf,
variant_process_map: DashMap<(Hash, String), time::OffsetDateTime>,
notifications: NotificationMap,
db: Db,
}
@ -156,6 +160,8 @@ impl SledRepo {
migration_identifiers: db.open_tree("pict-rs-migration-identifiers-tree")?,
cache_capacity,
export_path,
variant_process_map: DashMap::new(),
notifications: NotificationMap::new(),
db,
})
}
@ -801,7 +807,7 @@ impl QueueRepo for SledRepo {
.read()
.unwrap()
.get(&queue_name)
.map(Arc::clone);
.cloned();
let notify = if let Some(notify) = opt {
notify
@ -939,13 +945,6 @@ impl SettingsRepo for SledRepo {
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
}
#[tracing::instrument(level = "trace", skip(self))]
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
b!(self.settings, settings.remove(key));
Ok(())
}
}
fn variant_access_key(hash: &[u8], variant: &str) -> Vec<u8> {
@ -1331,88 +1330,6 @@ impl HashRepo for SledRepo {
Ok(opt.map(try_into_arc_str).transpose()?)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
let value = identifier.clone();
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
crate::sync::spawn_blocking("sled-io", move || {
hash_variant_identifiers
.compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes()))
.map(|res| res.map_err(|_| VariantAlreadyExists))
})
.await
.map_err(|_| RepoError::Canceled)?
.map_err(SledError::from)
.map_err(RepoError::from)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<Arc<str>>, RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
let opt = b!(
self.hash_variant_identifiers,
hash_variant_identifiers.get(key)
);
Ok(opt.map(try_into_arc_str).transpose()?)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
let hash = hash.to_ivec();
let vec = b!(
self.hash_variant_identifiers,
Ok(hash_variant_identifiers
.scan_prefix(hash.clone())
.filter_map(|res| res.ok())
.filter_map(|(key, ivec)| {
let identifier = try_into_arc_str(ivec).ok();
let variant = variant_from_key(&hash, &key);
if variant.is_none() {
tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key));
}
Some((variant?, identifier?))
})
.collect::<Vec<_>>()) as Result<Vec<_>, SledError>
);
Ok(vec)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
b!(
self.hash_variant_identifiers,
hash_variant_identifiers.remove(key)
);
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_blurhash(&self, hash: Hash, blurhash: Arc<str>) -> Result<(), RepoError> {
b!(
@ -1528,6 +1445,167 @@ impl HashRepo for SledRepo {
}
}
#[async_trait::async_trait(?Send)]
impl VariantRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))]
async fn claim_variant_processing_rights(
&self,
hash: Hash,
variant: String,
) -> Result<Result<(), NotificationEntry>, RepoError> {
let key = (hash.clone(), variant.clone());
let now = time::OffsetDateTime::now_utc();
let entry = self
.notifications
.register_interest(Arc::from(format!("{}{variant}", hash.to_base64())));
match self.variant_process_map.entry(key.clone()) {
dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) => {
if occupied_entry
.get()
.saturating_add(time::Duration::minutes(2))
> now
{
return Ok(Err(entry));
}
occupied_entry.insert(now);
}
dashmap::mapref::entry::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(now);
}
}
if self.variant_identifier(hash, variant).await?.is_some() {
self.variant_process_map.remove(&key);
return Ok(Err(entry));
}
Ok(Ok(()))
}
async fn variant_waiter(
&self,
hash: Hash,
variant: String,
) -> Result<NotificationEntry, RepoError> {
let entry = self
.notifications
.register_interest(Arc::from(format!("{}{variant}", hash.to_base64())));
Ok(entry)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn variant_heartbeat(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let key = (hash, variant);
let now = time::OffsetDateTime::now_utc();
if let dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) =
self.variant_process_map.entry(key)
{
occupied_entry.insert(now);
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn notify_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let key = (hash.clone(), variant.clone());
self.variant_process_map.remove(&key);
let key = format!("{}{variant}", hash.to_base64());
self.notifications.notify(&key);
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
async fn relate_variant_identifier(
&self,
hash: Hash,
variant: String,
identifier: &Arc<str>,
) -> Result<Result<(), VariantAlreadyExists>, RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
let value = identifier.clone();
let hash_variant_identifiers = self.hash_variant_identifiers.clone();
let out = crate::sync::spawn_blocking("sled-io", move || {
hash_variant_identifiers
.compare_and_swap(key, Option::<&[u8]>::None, Some(value.as_bytes()))
.map(|res| res.map_err(|_| VariantAlreadyExists))
})
.await
.map_err(|_| RepoError::Canceled)?
.map_err(SledError::from)
.map_err(RepoError::from)?;
Ok(out)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn variant_identifier(
&self,
hash: Hash,
variant: String,
) -> Result<Option<Arc<str>>, RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
let opt = b!(
self.hash_variant_identifiers,
hash_variant_identifiers.get(key)
);
Ok(opt.map(try_into_arc_str).transpose()?)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn variants(&self, hash: Hash) -> Result<Vec<(String, Arc<str>)>, RepoError> {
let hash = hash.to_ivec();
let vec = b!(
self.hash_variant_identifiers,
Ok(hash_variant_identifiers
.scan_prefix(hash.clone())
.filter_map(|res| res.ok())
.filter_map(|(key, ivec)| {
let identifier = try_into_arc_str(ivec).ok();
let variant = variant_from_key(&hash, &key);
if variant.is_none() {
tracing::warn!("Skipping a variant: {}", String::from_utf8_lossy(&key));
}
Some((variant?, identifier?))
})
.collect::<Vec<_>>()) as Result<Vec<_>, SledError>
);
Ok(vec)
}
#[tracing::instrument(level = "trace", skip(self))]
async fn remove_variant(&self, hash: Hash, variant: String) -> Result<(), RepoError> {
let hash = hash.to_bytes();
let key = variant_key(&hash, &variant);
b!(
self.hash_variant_identifiers,
hash_variant_identifiers.remove(key)
);
Ok(())
}
}
fn hash_alias_key(hash: &IVec, alias: &IVec) -> Vec<u8> {
let mut v = hash.to_vec();
v.extend_from_slice(alias);

View file

@ -3,7 +3,7 @@ use std::{
str::FromStr,
};
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct Serde<T> {
inner: T,
}
@ -44,6 +44,17 @@ impl<T> DerefMut for Serde<T> {
}
}
impl<T> Default for Serde<T>
where
T: Default,
{
fn default() -> Self {
Serde {
inner: T::default(),
}
}
}
impl<T> FromStr for Serde<T>
where
T: FromStr,

View file

@ -72,7 +72,7 @@ impl From<crate::store::object_store::ObjectError> for StoreError {
fn from(value: crate::store::object_store::ObjectError) -> Self {
match value {
e @ crate::store::object_store::ObjectError::Status(
actix_web::http::StatusCode::NOT_FOUND,
reqwest::StatusCode::NOT_FOUND,
_,
_,
) => Self::ObjectNotFound(e),

View file

@ -4,16 +4,16 @@ use crate::{
};
use actix_web::{
error::BlockingError,
http::{
header::{ByteRangeSpec, Range, CONTENT_LENGTH},
StatusCode,
},
http::header::{ByteRangeSpec, Range},
rt::task::JoinError,
web::Bytes,
};
use base64::{prelude::BASE64_STANDARD, Engine};
use futures_core::Stream;
use reqwest::{header::RANGE, Body, Response};
use reqwest::{
header::{CONTENT_LENGTH, RANGE},
Body, Response, StatusCode,
};
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
use rusty_s3::{
actions::{CreateMultipartUpload, S3Action},

View file

@ -91,7 +91,7 @@ where
S: Stream + 'static,
S::Item: Send + Sync,
{
let (tx, rx) = crate::sync::channel(1);
let (tx, mut rx) = crate::sync::channel(1);
let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move {
let stream = std::pin::pin!(stream);
@ -100,16 +100,14 @@ where
while let Some(res) = streamer.next().await {
tracing::trace!("make send tx: looping");
if tx.send_async(res).await.is_err() {
if tx.send(res).await.is_err() {
break;
}
}
}));
streem::from_fn(|yiedler| async move {
let mut stream = rx.into_stream().into_streamer();
while let Some(res) = stream.next().await {
while let Some(res) = rx.recv().await {
tracing::trace!("make send rx: looping");
yiedler.yield_(res).await;
@ -124,20 +122,18 @@ where
I: IntoIterator + Send + 'static,
I::Item: Send + Sync,
{
let (tx, rx) = crate::sync::channel(buffer);
let (tx, mut rx) = crate::sync::channel(buffer);
let handle = crate::sync::spawn_blocking("blocking-iterator", move || {
for value in iterator {
if tx.send(value).is_err() {
if tx.blocking_send(value).is_err() {
break;
}
}
});
streem::from_fn(|yielder| async move {
let mut stream = rx.into_stream().into_streamer();
while let Some(res) = stream.next().await {
while let Some(res) = rx.recv().await {
tracing::trace!("from_iterator: looping");
yielder.yield_(res).await;

View file

@ -39,11 +39,13 @@ impl<T> std::future::Future for DropHandle<T> {
}
#[track_caller]
pub(crate) fn channel<T>(bound: usize) -> (flume::Sender<T>, flume::Receiver<T>) {
pub(crate) fn channel<T>(
bound: usize,
) -> (tokio::sync::mpsc::Sender<T>, tokio::sync::mpsc::Receiver<T>) {
let span = tracing::trace_span!(parent: None, "make channel");
let guard = span.enter();
let channel = flume::bounded(bound);
let channel = tokio::sync::mpsc::channel(bound);
drop(guard);
channel

View file

@ -1,6 +1,6 @@
use std::path::PathBuf;
use rustls::{crypto::ring::sign::any_supported_type, sign::CertifiedKey, Error};
use rustls::{crypto::aws_lc_rs::sign::any_supported_type, sign::CertifiedKey, Error};
pub(super) struct Tls {
certificate: PathBuf,