Compare commits

...

43 commits

Author SHA1 Message Date
asonix dd01548309 Fix backgrounded query boolean parsing 2024-06-05 12:54:05 -05:00
asonix f05eed2e36 Prepare 0.5.15 2024-06-04 11:49:13 -05:00
asonix 69470c8a38 Update defaults.toml 2024-06-04 11:34:28 -05:00
asonix 4e75764110 Add configuration option to control request logging 2024-06-03 22:15:52 -05:00
asonix 6ef9dc404f Update flake 2024-06-03 21:47:39 -05:00
asonix af3a6a260a Update dependencies (minor & point) 2024-06-03 21:40:08 -05:00
asonix 286bc8b97a Log for completed requests 2024-06-03 21:38:41 -05:00
asonix 8cf8b2bc05 Remove proxies row before deleting alias
This fixes the issue of invalidating the proxies alias foreign key
2024-06-03 16:29:13 -05:00
asonix 1c4e343d9d Prepare 0.5.14 2024-05-20 22:23:08 -05:00
asonix d03cc63d2b ffprobe: handle files with empty stream json 2024-05-20 22:08:54 -05:00
asonix 260f9a158a Update dependencies (minor & point) 2024-05-19 10:39:21 -05:00
asonix a7c78cd54e Merge pull request 'Update rustls for tokio-postgres' (#58) from asonix/update-tokio-postgres-rustls into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/58
2024-05-19 15:35:47 +00:00
asonix d7dc2e506d Merge branch 'main' into asonix/update-tokio-postgres-rustls 2024-05-19 10:21:12 -05:00
asonix e48f60a6c6 Merge pull request 'Update rustls for actix-web' (#61) from asonix/update-actix-web-rustls into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/61
2024-05-19 15:18:37 +00:00
asonix 9d01aeb82c Update rustls for actix-web
includes update for rustls-channel-resolver
2024-05-19 10:08:48 -05:00
asonix bddfb3c9d0 Merge branch 'main' into asonix/update-tokio-postgres-rustls 2024-05-19 09:40:45 -05:00
asonix 7ae3c0c776 Merge pull request 'Update reqwest to 0.12' (#59) from asonix/update-reqwest into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/59
2024-05-19 14:37:50 +00:00
asonix 983e9ce151 Merge branch 'main' into asonix/update-reqwest 2024-05-19 09:36:54 -05:00
asonix 9302062b26 Merge pull request 'Update metrics-exporter-prometheus' (#60) from asonix/update-metrics-exporter-prometheus into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/60
2024-05-19 14:35:27 +00:00
asonix 33e72266f5 Add public mechanism for installing aws-lc-rs 2024-05-03 23:05:17 -05:00
asonix 39da69b1aa Use tokio-postgres-generic-rustls 2024-05-03 22:39:30 -05:00
asonix 64b8635059 Update rustls for tokio-postgres
This doesn't update rustls for actix-web (0.22), or rustls for reqwest (0.21)
2024-05-03 22:39:30 -05:00
asonix d45e3fa386 Remove unused 'remove' repo method 2024-05-03 22:35:20 -05:00
asonix bfd4fd4689 Remove unused StatusError type 2024-05-03 22:34:18 -05:00
asonix 89f3c447a8 clippy 2024-05-01 14:57:03 -05:00
asonix 46cfbf99a5 Update metrics-exporter-prometheus
This pulls in hyper 1 and http 1
2024-05-01 14:50:20 -05:00
asonix 58529a2eb2 Update reqwest to 0.12
This pulls in hyper 1 and http 1, but removes rustls 0.21
2024-05-01 14:46:29 -05:00
asonix 700aeb90e0 Fix time deprecation warnings 2024-05-01 14:33:07 -05:00
asonix ff39c30cc8 Update direct base64 dependency 2024-05-01 14:32:26 -05:00
asonix 9561c578dc Update dependencies (minor & point) 2024-05-01 14:30:22 -05:00
asonix dc7bdf7eeb Update flake.lock 2024-04-21 21:02:42 -05:00
asonix 33ba045ee1 Apparently imagemagick needs a shell to delegate to ffmpeg properly 2024-04-21 21:02:31 -05:00
asonix f082e48ed8 Attempt to set up nix-based docker for pict-rs
There's a bug when converting APNG files to WEBP files, which
imagemagick delegates to ffmpeg. When doing 'nix build' and running the
result, or running pict-rs in the dev shell, it works fine. In the
container, this doesn't work at all. imagemagick complains that there's
no media to convert, implying ffmpeg has output a zero-sized file.

This work is helping to narrow down exactly what pict-rs needs to run,
though. This still needs to be tested against h264, h265, vp8, vp9 and
av1.
2024-04-21 14:31:03 -05:00
asonix 97159e0030 Update release document 2024-04-15 21:17:40 -05:00
asonix 6d40fbee47 Prepare 0.5.13 2024-04-15 15:31:31 -05:00
asonix c4e99ef539 Add ability to disable colorized logs 2024-04-15 15:16:10 -05:00
asonix 3428c31f16 Use tokio channels again 2024-04-14 20:06:58 -05:00
asonix 4bb3bad703 Prepare 0.5.12 2024-04-05 13:05:16 -05:00
asonix 4021458be8 Prevent divided-by-zero for empty BytesStreams 2024-04-05 12:57:40 -05:00
asonix eca3697410 Add panic boundaries for background jobs 2024-04-05 12:57:32 -05:00
asonix d41fca5b6c Don't let the doctests step on each other via /tmp 2024-04-04 14:39:30 -05:00
asonix e3183c923f Remove dev-dependency on tokio-uring - unneeded 2024-04-04 12:53:08 -05:00
asonix d97cfe2a64 Remove 'armed' from NotificationEntryInner by only creating them when needed 2024-04-03 13:22:34 -05:00
40 changed files with 1475 additions and 736 deletions

1261
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.5.11"
version = "0.5.15"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
@ -20,10 +20,10 @@ random-errors = ["dep:nanorand"]
[dependencies]
actix-form-data = "0.7.0-beta.7"
actix-web = { version = "4.0.0", default-features = false, features = ["rustls-0_22"] }
actix-web = { version = "4.6.0", default-features = false, features = ["rustls-0_23"] }
async-trait = "0.1.51"
barrel = { version = "0.7.0", features = ["pg"] }
base64 = "0.21.0"
base64 = "0.22.0"
bb8 = "0.8.3"
blurhash-update = "0.1.0"
clap = { version = "4.0.2", features = ["derive"] }
@ -34,12 +34,11 @@ dashmap = "5.1.0"
diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.4.1", features = ["bb8", "postgres"] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
flume = "0.11.0"
futures-core = "0.3"
hex = "0.4.3"
md-5 = "0.10.5"
metrics = "0.22.0"
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = ["http-listener"] }
metrics-exporter-prometheus = { version = "0.14.0", default-features = false, features = ["http-listener"] }
mime = "0.3.1"
nanorand = { version = "0.7", optional = true }
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] }
@ -47,13 +46,14 @@ opentelemetry = "0.22"
opentelemetry-otlp = "0.15"
pin-project-lite = "0.2.7"
refinery = { version = "0.8.10", features = ["tokio-postgres", "postgres"] }
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "stream"] }
reqwest-middleware = "0.2.2"
reqwest-tracing = "0.4.5"
# pinned to tokio-postgres-rustls
rustls = "0.22.0"
reqwest = { version = "0.12.0", default-features = false, features = ["json", "rustls-tls", "stream"] }
reqwest-middleware = "0.3.0"
reqwest-tracing = "0.5.0"
# pinned to tokio-postgres-generic-rustls
# pinned to actix-web
rustls = "0.23"
# pinned to rustls
rustls-channel-resolver = "0.2.0"
rustls-channel-resolver = "0.3.0"
# pinned to rustls
rustls-pemfile = "2.0.0"
rusty-s3 = "0.5.0"
@ -69,7 +69,7 @@ thiserror = "1.0"
time = { version = "0.3.0", features = ["serde", "serde-well-known"] }
tokio = { version = "1", features = ["full", "tracing"] }
tokio-postgres = { version = "0.7.10", features = ["with-uuid-1", "with-time-0_3", "with-serde_json-1"] }
tokio-postgres-rustls = "0.11.0"
tokio-postgres-generic-rustls = { version = "0.1.0", default-features = false, features = ["aws-lc-rs"] }
tokio-uring = { version = "0.4", optional = true, features = ["bytes"] }
tokio-util = { version = "0.7", default-features = false, features = [
"codec",
@ -96,7 +96,4 @@ webpki-roots = "0.26.0"
[dependencies.tracing-actix-web]
version = "0.7.10"
default-features = false
features = ["emit_event_on_error", "opentelemetry_0_22"]
[dev-dependencies]
tokio-uring = { version = "0.4", features = ["bytes"] }
features = ["opentelemetry_0_22"]

View file

@ -16,6 +16,8 @@ concurrency = 32
format = "normal"
targets = "info"
log_spans = false
no_ansi = false
log_requests = false
[tracing.console]
buffer_capacity = 102400

View file

@ -5,11 +5,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1705309234,
"narHash": "sha256-uNRRNRKmJyCRC/8y1RqBkqWBLM034y4qN7EprSdmgyA=",
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "1ef2e671c3b0c19053962c07dbda38332dcebf26",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1705133751,
"narHash": "sha256-rCIsyE80jgiOU78gCWN3A0wE0tR2GI5nH6MlS+HaaSQ=",
"lastModified": 1717196966,
"narHash": "sha256-yZKhxVIKd2lsbOqYd5iDoUIwsRZFqE87smE2Vzf6Ck0=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "9b19f5e77dd906cb52dade0b7bd280339d2a1f3d",
"rev": "57610d2f8f0937f39dbd72251e9614b1561942d8",
"type": "github"
},
"original": {

View file

@ -15,13 +15,29 @@
in
{
packages = rec {
imagemagick7_pict-rs = pkgs.callPackage ./nix/pkgs/imagemagick_pict-rs {};
ffmpeg6_pict-rs = pkgs.callPackage ./nix/pkgs/ffmpeg_pict-rs {};
pict-rs = pkgs.callPackage ./pict-rs.nix {
inherit (pkgs.darwin.apple_sdk.frameworks) Security;
inherit imagemagick7_pict-rs ffmpeg6_pict-rs;
};
default = pict-rs;
};
docker = pkgs.dockerTools.buildLayeredImage {
name = "pict-rs";
tag = "latest";
contents = [ pkgs.tini self.packages.${system}.pict-rs pkgs.bash ];
config = {
Entrypoint = [ "/bin/tini" "--" "/bin/pict-rs" ];
Cmd = [ "run" ];
};
};
apps = rec {
dev = flake-utils.lib.mkApp { drv = self.packages.${system}.pict-rs; };
default = dev;
@ -36,9 +52,9 @@
curl
diesel-cli
exiftool
ffmpeg_6-full
garage
imagemagick
self.packages.${system}.imagemagick7_pict-rs
self.packages.${system}.ffmpeg6_pict-rs
jq
minio-client
rust-analyzer

View file

@ -0,0 +1,5 @@
{ ffmpeg_6-headless }:
ffmpeg_6-headless.override {
withWebp = true;
}

View file

@ -0,0 +1,23 @@
{ imagemagick7 }:
imagemagick7.override {
bzip2Support = true;
zlibSupport = true;
libX11Support = false;
libXtSupport = false;
fontconfigSupport = false;
freetypeSupport = false;
libjpegSupport = true;
djvulibreSupport = false;
lcms2Support = false;
openexrSupport = false;
libjxlSupport = true;
libpngSupport = true;
liblqr1Support = false;
librsvgSupport = false;
libtiffSupport = false;
libxml2Support = false;
openjpegSupport = true;
libwebpSupport = true;
libheifSupport = true;
}

View file

@ -1,6 +1,6 @@
{ exiftool
, ffmpeg_6-full
, imagemagick
, ffmpeg6_pict-rs
, imagemagick7_pict-rs
, lib
, makeWrapper
, nixosTests
@ -11,7 +11,7 @@
rustPlatform.buildRustPackage {
pname = "pict-rs";
version = "0.5.11";
version = "0.5.15";
src = ./.;
cargoLock = {
@ -27,7 +27,7 @@ rustPlatform.buildRustPackage {
postInstall = ''
wrapProgram $out/bin/pict-rs \
--prefix PATH : "${lib.makeBinPath [ imagemagick ffmpeg_6-full exiftool ]}"
--prefix PATH : "${lib.makeBinPath [ imagemagick7_pict-rs ffmpeg6_pict-rs exiftool ]}"
'';
passthru.tests = { inherit (nixosTests) pict-rs; };

View file

@ -98,6 +98,16 @@ targets = 'info'
# default: false
log_spans = false
## Optional: whether to disable colorized log output
# environment variable: PICTRS__TRACING__LOGGING__NO_ANSI
# default: false
no_ansi = false
## Optional: whether to log upon request completion
# environment variable: PICTRS__TRACING__LOGGING__LOG_REQUESTS
# default: false
log_requests = false
## Console configuration
[tracing.console]

46
releases/0.5.12.md Normal file
View file

@ -0,0 +1,46 @@
# pict-rs 0.5.12
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.12 is a bugfix release to remove two issues that, when compounded, would cause pict-rs
to fail to process media.
### Fixes
- [Panic Handling in Background Jobs](#panic-handling-in-background-jobs)
- [BytesStream Divide-by-Zero](#bytes-stream-divide-by-zero)
## Upgrade Notes
There are no significant differences from 0.5.11. Upgrading should be as simple as pulling a new
version of pict-rs.
## Descriptions
### Panic Handling in Background Jobs
pict-rs makes an effort to never use explicitly panicking code, but since there's no static way to
guarantee that a given function wont panic, pict-rs needs to be able to deal with that. pict-rs
0.5.12 now wraps invocations of jobs in spawned tasks, which can catch and report panics that happen
in background jobs.
Previously, a panic in a background job would bring down that thread's job processor, which resulted
in future jobs never being processed. Now job processing should properly continue after panics
occur.
### BytesStream Divide-by-Zero
Part of my rework of BytesStream recently included adding debug logs around how many bytes chunks
were in a given stream, and their average length. Unfortunately, if there were no bytes in the
stream, this would cause the "average chunk length" calculation to divide by 0. In previous versions
of pict-rs, this would generally result in a failed request for processed media, but in pict-rs
0.5.11 this would end up killing the background jobs processor.
This specific panic has been fixed by ensuring we divide by the number of chunks or 1, whichever is
greater.

62
releases/0.5.13.md Normal file
View file

@ -0,0 +1,62 @@
# pict-rs 0.5.13
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.13 is a maintenance release aiming to enable better logging in some scenarios.
### Features
- [Colorless Logging](#colorless-logging)
### Changes
- [Remove Flume](#remove-flume)
## Upgrade Notes
There are no significant changes from 0.5.12. Upgrading should be as simple as pulling a new version
of pict-rs.
## Descriptions
### Colorless Logging
When opting to use the `json` logger, the tracing subscriber automatically disables colored output.
This didn't remove colors from errors, though, and pict-rs hasn't had a way to disable colors while
using other log formats. pict-rs 0.5.13 introduces a new configuration value to remove colored
output from all logs regardless of logging format.
With pict-rs.toml
```toml
[tracing.logging]
no_ansi = true
```
With environment variables
```bash
PICTRS__TRACING__LOGGING__NO_ANSI=true
```
With commandline flags
```bash
pict-rs --no-log-ansi run
```
Colors in logs can be useful, so I imagine this option won't be used much. There has been a request
for this functionality though and it's little cost to maintain.
### Remove Flume
Recently I've been debugging a memory usage issue in another project of mine. I wasn't able to fully
track down the cause, but I did notice that removing the
[flume channel library](https://github.com/zesterer/flume) seemed to make the leak go away. Since I
also use flume in pict-rs, I'm opting to replace it with tokio's native channel implementation. This
may or may not improve memory usage, but it does reduce the depenency count and therefore build time
for pict-rs.

28
releases/0.5.14.md Normal file
View file

@ -0,0 +1,28 @@
# pict-rs 0.5.14
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.14 includes a bugfix for identifying certain MOV videos, as well as updated dependencies.
### Fixes
- [Empty Stream Parsing](#empty-stream-parsing)
## Upgrade Notes
There are no significant changes from 0.5.13. Upgrading should be as simple as pulling a new version
of pict-rs.
## Descriptions
### Empty Stream Parsing
Certain videos, when identified with ffprobe, contain stream json objects with no fields. This would
cause pict-rs to fail to parse the information for these videos, as it expects streams to at least
contain a codec field. In pict-rs 0.5.14, empty streams are now considered valid and are simply
ignored.

58
releases/0.5.15.md Normal file
View file

@ -0,0 +1,58 @@
# pict-rs 0.5.15
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.15 includes a bugfix for cleaning proxied media, updated dependencies, and a new option
to log requests.
### Fixes
- [Proxied Media Cleanup](#proxied-media-cleanup)
### Additions
- [Request Logging](#request-logging)
## Upgrade Notes
There are no significant changes from 0.5.14. Upgrading should be a simple as pulling a new version
of pict-rs.
## Descriptions
### Proxied Media Cleanup
At some point, the cleanup logic for proxied media got flipped around to try removing the internal
alias before removing the proxy record. This works fine with a sled backend, but not with a
postgres backend, and postgres would complain about invalidating a foreign key relationship.
pict-rs 0.5.15 fixes this by ensuring that the related proxy record is cleaned first.
### Request Logging
A new configuration option has been added to pict-rs as an option to get more information about
what pict-rs is doing. By default, pict-rs only logs what it considers to be errors, but when
`log_requests` is enabled, it will also log information about successful requests. This can help
with debugging without enabling full debug logs or resorting to logging spans.
It can be configured via toml
```toml
[tracing.logging]
log_requests = true
```
via environment variables
```bash
PICTRS__TRACING__LOGGING__LOG_REQUESTS=true
```
or via the commandline
```bash
pict-rs --log-requests run
```

View file

@ -35,7 +35,7 @@ impl BytesStream {
tracing::debug!(
"BytesStream with {} chunks, avg length {}",
bs.chunks_len(),
bs.len() / bs.chunks_len()
bs.len() / bs.chunks_len().max(1)
);
Ok(bs)

View file

@ -18,6 +18,8 @@ impl Args {
log_format,
log_targets,
log_spans,
log_requests,
no_log_ansi,
console_address,
console_buffer_capacity,
opentelemetry_url,
@ -38,6 +40,8 @@ impl Args {
format: log_format,
targets: log_targets.map(Serde::new),
log_spans,
no_ansi: no_log_ansi,
log_requests,
},
console: Console {
address: console_address,
@ -581,6 +585,10 @@ struct Logging {
targets: Option<Serde<Targets>>,
#[serde(skip_serializing_if = "std::ops::Not::not")]
log_spans: bool,
#[serde(skip_serializing_if = "std::ops::Not::not")]
log_requests: bool,
#[serde(skip_serializing_if = "std::ops::Not::not")]
no_ansi: bool,
}
#[derive(Debug, Default, serde::Serialize)]
@ -924,6 +932,13 @@ pub(super) struct Args {
/// Whether to log openning and closing of tracing spans to stdout
#[arg(long)]
log_spans: bool,
/// Whether to log request completions at an INFO level
#[arg(long)]
log_requests: bool,
#[arg(long)]
/// Whether to disable color-codes in log output
no_log_ansi: bool,
/// Address and port to expose tokio-console metrics
#[arg(long)]

View file

@ -55,6 +55,8 @@ struct LoggingDefaults {
format: LogFormat,
targets: Serde<Targets>,
log_spans: bool,
log_requests: bool,
no_ansi: bool,
}
#[derive(Clone, Debug, serde::Serialize)]
@ -235,6 +237,8 @@ impl Default for LoggingDefaults {
format: LogFormat::Normal,
targets: "info".parse().expect("Valid targets string"),
log_spans: false,
log_requests: false,
no_ansi: false,
}
}
}

View file

@ -163,6 +163,10 @@ pub(crate) struct Logging {
pub(crate) targets: Serde<Targets>,
pub(crate) log_spans: bool,
pub(crate) no_ansi: bool,
pub(crate) log_requests: bool,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]

View file

@ -53,6 +53,7 @@ impl FfMpegStreams {
FfMpegStream::Unknown { codec_name } => {
tracing::info!("Encountered unknown stream {codec_name}");
}
FfMpegStream::Empty {} => {}
}
}
@ -135,6 +136,7 @@ enum FfMpegStream {
Audio(FfMpegAudioStream),
Video(FfMpegVideoStream),
Unknown { codec_name: String },
Empty {},
}
#[derive(Debug, serde::Deserialize)]

View file

@ -0,0 +1,35 @@
{
"programs": [
],
"streams": [
{
"codec_name": "hevc",
"width": 1920,
"height": 1080,
"pix_fmt": "yuv420p10le",
"nb_read_frames": "187",
"side_data_list": [
{
}
]
},
{
"codec_name": "aac",
"nb_read_frames": "135"
},
{
},
{
},
{
}
],
"format": {
"format_name": "mov,mp4,m4a,3gp,3g2,mj2"
}
}

View file

@ -1,11 +1,11 @@
use crate::formats::{
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat, Mp4Codec,
WebmAlphaCodec, WebmCodec,
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat,
Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmCodec,
};
use super::{Discovery, FfMpegDiscovery, PixelFormatOutput};
fn details_tests() -> [(&'static str, Option<Discovery>); 13] {
fn details_tests() -> [(&'static str, Option<Discovery>); 14] {
[
(
"animated_webp",
@ -151,6 +151,18 @@ fn details_tests() -> [(&'static str, Option<Discovery>); 13] {
frames: None,
}),
),
(
"mov",
Some(Discovery {
input: InputFile::Video(InputVideoFormat::Mp4 {
video_codec: Mp4Codec::H265,
audio_codec: Some(Mp4AudioCodec::Aac),
}),
width: 1920,
height: 1080,
frames: Some(187),
}),
),
]
}

3
src/http1.rs Normal file
View file

@ -0,0 +1,3 @@
pub(crate) fn to_actix_status(status: reqwest::StatusCode) -> actix_web::http::StatusCode {
actix_web::http::StatusCode::from_u16(status.as_u16()).expect("status codes are always valid")
}

View file

@ -1,4 +1,5 @@
use crate::config::{LogFormat, OpenTelemetry, Tracing};
use color_eyre::config::Theme;
use console_subscriber::ConsoleLayer;
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
@ -11,7 +12,15 @@ use tracing_subscriber::{
};
pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
color_eyre::install()?;
let eyre_theme = if tracing.logging.no_ansi {
Theme::new()
} else {
Theme::dark()
};
color_eyre::config::HookBuilder::new()
.theme(eyre_theme)
.install()?;
LogTracer::init()?;
@ -23,7 +32,9 @@ pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
FmtSpan::NONE
};
let format_layer = tracing_subscriber::fmt::layer().with_span_events(fmt_span);
let format_layer = tracing_subscriber::fmt::layer()
.with_span_events(fmt_span)
.with_ansi(!tracing.logging.no_ansi);
match tracing.logging.format {
LogFormat::Compact => with_format(format_layer.compact(), tracing),

View file

@ -14,6 +14,7 @@ mod file_path;
mod formats;
mod future;
mod generate;
mod http1;
mod ingest;
mod init_metrics;
mod init_tracing;
@ -41,18 +42,12 @@ use actix_web::{
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
web, App, HttpRequest, HttpResponse, HttpResponseBuilder, HttpServer,
};
use details::{ApiDetails, HumanDate};
use future::{WithPollTimer, WithTimeout};
use futures_core::Stream;
use magick::ArcPolicyDir;
use metrics_exporter_prometheus::PrometheusBuilder;
use middleware::{Metrics, Payload};
use repo::ArcRepo;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_tracing::TracingMiddleware;
use rustls_channel_resolver::ChannelSender;
use rusty_s3::UrlStyle;
use state::State;
use std::{
marker::PhantomData,
path::Path,
@ -61,8 +56,6 @@ use std::{
time::{Duration, SystemTime},
};
use streem::IntoStreamer;
use sync::DropHandle;
use tmp_file::{ArcTmpDir, TmpDir};
use tokio::sync::Semaphore;
use tracing::Instrument;
use tracing_actix_web::TracingLogger;
@ -70,20 +63,25 @@ use tracing_actix_web::TracingLogger;
use self::{
backgrounded::Backgrounded,
config::{Configuration, Operation},
details::Details,
details::{ApiDetails, Details, HumanDate},
either::Either,
error::{Error, UploadError},
formats::InputProcessableFormat,
future::{WithPollTimer, WithTimeout},
ingest::Session,
init_tracing::init_tracing,
middleware::{Deadline, Internal},
magick::ArcPolicyDir,
middleware::{Deadline, Internal, Log, Metrics, Payload},
migrate_store::migrate_store,
queue::queue_generate,
repo::{sled::SledRepo, Alias, DeleteToken, Hash, Repo, UploadId, UploadResult},
repo::{sled::SledRepo, Alias, ArcRepo, DeleteToken, Hash, Repo, UploadId, UploadResult},
serde_str::Serde,
state::State,
store::{file_store::FileStore, object_store::ObjectStore, Store},
stream::empty,
sync::DropHandle,
tls::Tls,
tmp_file::{ArcTmpDir, TmpDir},
};
pub use self::config::{ConfigSource, PictRsConfiguration};
@ -521,7 +519,7 @@ struct UrlQuery {
url: String,
#[serde(default)]
backgrounded: bool,
backgrounded: Serde<bool>,
}
#[derive(Debug, serde::Deserialize)]
@ -562,7 +560,7 @@ async fn download<S: Store + 'static>(
let stream = download_stream(&url_query.url, &state).await?;
if url_query.backgrounded {
if *url_query.backgrounded {
do_download_backgrounded(stream, state, upload_query).await
} else {
do_download_inline(stream, &state, &upload_query).await
@ -580,7 +578,7 @@ async fn download_stream<S>(
let res = state.client.get(url).send().await?;
if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into());
return Err(UploadError::Download(http1::to_actix_status(res.status())).into());
}
let stream = crate::stream::limit(
@ -1743,6 +1741,7 @@ async fn launch<
spawn_workers(state.clone());
App::new()
.wrap(Log::new(state.config.tracing.logging.log_requests))
.wrap(TracingLogger::default())
.wrap(Deadline)
.wrap(Metrics)
@ -1763,7 +1762,7 @@ async fn launch<
tracing::info!("Starting pict-rs with TLS on {address}");
server.bind_rustls_0_22(address, config)?.run().await?;
server.bind_rustls_0_23(address, config)?.run().await?;
handle.abort();
let _ = handle.await;
@ -1868,7 +1867,8 @@ impl<P: AsRef<Path>, T: serde::Serialize> ConfigSource<P, T> {
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let configuration = pict_rs::ConfigSource::memory(serde_json::json!({
/// "server": {
/// "address": "127.0.0.1:8080"
/// "address": "127.0.0.1:8080",
/// "temporary_directory": "/tmp/t1"
/// },
/// "repo": {
/// "type": "sled",
@ -1936,6 +1936,19 @@ impl PictRsConfiguration {
Ok(self)
}
/// Install aws-lc-rs as the default crypto provider
///
/// This would happen automatically anyway unless rustls crate features get mixed up
pub fn install_crypto_provider(self) -> Self {
if rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.is_err()
{
tracing::info!("rustls crypto provider already installed");
}
self
}
/// Run the pict-rs application on a tokio `LocalSet`
///
/// This must be called from within `tokio::main` directly
@ -1945,13 +1958,16 @@ impl PictRsConfiguration {
/// #[tokio::main]
/// async fn main() -> color_eyre::Result<()> {
/// let pict_rs_server = pict_rs::ConfigSource::memory(serde_json::json!({
/// "server": {
/// "temporary_directory": "/tmp/t2"
/// },
/// "repo": {
/// "type": "sled",
/// "path": "/tmp/pict-rs/run-on-localset/sled-repo",
/// "path": "/tmp/pict-rs-run-on-localset/sled-repo",
/// },
/// "store": {
/// "type": "filesystem",
/// "path": "/tmp/pict-rs/run-on-localset/files",
/// "path": "/tmp/pict-rs-run-on-localset/files",
/// },
/// }))
/// .init::<&str>(None)?
@ -1976,13 +1992,16 @@ impl PictRsConfiguration {
/// fn main() -> color_eyre::Result<()> {
/// actix_web::rt::System::new().block_on(async move {
/// let pict_rs_server = pict_rs::ConfigSource::memory(serde_json::json!({
/// "server": {
/// "temporary_directory": "/tmp/t3"
/// },
/// "repo": {
/// "type": "sled",
/// "path": "/tmp/pict-rs/run/sled-repo",
/// "path": "/tmp/pict-rs-run/sled-repo",
/// },
/// "store": {
/// "type": "filesystem",
/// "path": "/tmp/pict-rs/run/files",
/// "path": "/tmp/pict-rs-run/files",
/// },
/// }))
/// .init::<&str>(None)?

View file

@ -4,6 +4,7 @@ fn main() -> color_eyre::Result<()> {
pict_rs::PictRsConfiguration::build_default()?
.install_tracing()?
.install_metrics()?
.install_crypto_provider()
.run()
.await
})
@ -18,6 +19,7 @@ fn main() -> color_eyre::Result<()> {
pict_rs::PictRsConfiguration::build_default()?
.install_tracing()?
.install_metrics()?
.install_crypto_provider()
.run_on_localset()
.await
})

View file

@ -1,9 +1,11 @@
mod deadline;
mod internal;
mod log;
mod metrics;
mod payload;
pub(crate) use self::deadline::Deadline;
pub(crate) use self::internal::Internal;
pub(crate) use self::log::Log;
pub(crate) use self::metrics::Metrics;
pub(crate) use self::payload::Payload;

223
src/middleware/log.rs Normal file
View file

@ -0,0 +1,223 @@
use std::future::{ready, Future, Ready};
use actix_web::{
body::MessageBody,
dev::{Service, ServiceRequest, ServiceResponse, Transform},
http::StatusCode,
ResponseError,
};
pub(crate) struct Log {
info: bool,
}
pub(crate) struct LogMiddleware<S> {
info: bool,
inner: S,
}
impl Log {
pub(crate) fn new(info: bool) -> Self {
Self { info }
}
}
#[derive(Debug)]
pub(crate) struct LogError {
info: bool,
error: actix_web::Error,
}
pin_project_lite::pin_project! {
pub(crate) struct LogFuture<F> {
info: bool,
#[pin]
inner: F,
}
}
pin_project_lite::pin_project! {
pub(crate) struct LogBody<B> {
info: bool,
status: Option<StatusCode>,
#[pin]
inner: B,
}
}
impl<S, B> Transform<S, ServiceRequest> for Log
where
B: MessageBody,
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
S::Future: 'static,
S::Error: Into<actix_web::Error>,
{
type Response = ServiceResponse<LogBody<B>>;
type Error = actix_web::Error;
type InitError = ();
type Transform = LogMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(LogMiddleware {
info: self.info,
inner: service,
}))
}
}
impl<S, B> Service<ServiceRequest> for LogMiddleware<S>
where
B: MessageBody,
S: Service<ServiceRequest, Response = ServiceResponse<B>>,
S::Future: 'static,
S::Error: Into<actix_web::Error>,
{
type Response = ServiceResponse<LogBody<B>>;
type Error = actix_web::Error;
type Future = LogFuture<S::Future>;
fn poll_ready(
&self,
ctx: &mut core::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(ctx).map(|res| {
res.map_err(|e| {
LogError {
info: self.info,
error: e.into(),
}
.into()
})
})
}
fn call(&self, req: ServiceRequest) -> Self::Future {
LogFuture {
info: self.info,
inner: self.inner.call(req),
}
}
}
impl<F, B, E> Future for LogFuture<F>
where
B: MessageBody,
F: Future<Output = Result<ServiceResponse<B>, E>>,
E: Into<actix_web::Error>,
{
type Output = Result<ServiceResponse<LogBody<B>>, actix_web::Error>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let info = self.info;
let this = self.project();
std::task::Poll::Ready(match std::task::ready!(this.inner.poll(cx)) {
Ok(response) => {
let status = response.status();
let status = if response.response().body().size().is_eof() {
emit(status, info);
None
} else {
Some(status)
};
Ok(response.map_body(|_, inner| LogBody {
info,
status,
inner,
}))
}
Err(e) => Err(LogError {
info,
error: e.into(),
}
.into()),
})
}
}
impl<B> MessageBody for LogBody<B>
where
B: MessageBody,
{
type Error = B::Error;
fn size(&self) -> actix_web::body::BodySize {
self.inner.size()
}
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<actix_web::web::Bytes, Self::Error>>> {
let this = self.project();
let opt = std::task::ready!(this.inner.poll_next(cx));
if opt.is_none() {
if let Some(status) = this.status.take() {
emit(status, *this.info);
}
}
std::task::Poll::Ready(opt)
}
}
impl std::fmt::Display for LogError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.error.fmt(f)
}
}
impl std::error::Error for LogError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.error.source()
}
}
impl ResponseError for LogError {
fn status_code(&self) -> actix_web::http::StatusCode {
self.error.as_response_error().status_code()
}
fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
let response = self.error.error_response();
let status = response.status();
if response.body().size().is_eof() {
emit(status, self.info);
response
} else {
response.map_body(|_, inner| {
LogBody {
info: self.info,
status: Some(status),
inner,
}
.boxed()
})
}
}
}
fn emit(status: StatusCode, info: bool) {
if status.is_server_error() {
tracing::error!("server error");
} else if status.is_client_error() {
tracing::warn!("client error");
} else if status.is_redirection() {
tracing::info!("redirected");
} else if info {
tracing::info!("completed");
} else {
tracing::debug!("completed");
}
}

View file

@ -45,10 +45,10 @@ impl Drop for MetricsGuard {
}
}
async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
async fn drain(mut rx: tokio::sync::mpsc::Receiver<actix_web::dev::Payload>) {
let mut set = JoinSet::new();
while let Ok(payload) = rx.recv_async().await {
while let Some(payload) = rx.recv().await {
tracing::trace!("drain: looping");
// draining a payload is a best-effort task - if we can't collect in 2 minutes we bail
@ -94,18 +94,18 @@ async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
struct DrainHandle(Option<Rc<tokio::task::JoinHandle<()>>>);
pub(crate) struct Payload {
sender: flume::Sender<actix_web::dev::Payload>,
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
handle: DrainHandle,
}
pub(crate) struct PayloadMiddleware<S> {
inner: S,
sender: flume::Sender<actix_web::dev::Payload>,
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
_handle: DrainHandle,
}
pub(crate) struct PayloadStream {
inner: Option<actix_web::dev::Payload>,
sender: flume::Sender<actix_web::dev::Payload>,
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
}
impl DrainHandle {

View file

@ -59,9 +59,6 @@ impl Drop for MetricsGuard {
}
}
#[derive(Debug)]
struct StatusError(ExitStatus);
pub(crate) struct Process {
command: Arc<str>,
child: Child,
@ -487,11 +484,3 @@ impl ProcessRead {
}
}
}
impl std::fmt::Display for StatusError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Command failed with bad status: {}", self.0)
}
}
impl std::error::Error for StatusError {}

View file

@ -11,9 +11,11 @@ use crate::{
use std::{
ops::Deref,
rc::Rc,
sync::Arc,
time::{Duration, Instant},
};
use tokio::task::JoinError;
use tracing::Instrument;
pub(crate) mod cleanup;
@ -297,54 +299,66 @@ where
}
}
fn job_result(result: &JobResult) -> crate::repo::JobResult {
fn job_result(result: &Result<JobResult, JoinError>) -> crate::repo::JobResult {
match result {
Ok(()) => crate::repo::JobResult::Success,
Err(JobError::Retry(_)) => crate::repo::JobResult::Failure,
Err(JobError::Abort(_)) => crate::repo::JobResult::Aborted,
Ok(Ok(())) => crate::repo::JobResult::Success,
Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure,
Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted,
Err(_) => crate::repo::JobResult::Aborted,
}
}
async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
where
S: Store,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
S: Store + 'static,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
{
let worker_id = uuid::Uuid::new_v4();
let state = Rc::new(state);
loop {
tracing::trace!("process_jobs: looping");
crate::sync::cooperate().await;
let res = job_loop(&state, worker_id, queue, callback)
.with_poll_timer("job-loop")
.await;
// add a panic boundary by spawning a task
let res = crate::sync::spawn(
"job-loop",
job_loop(state.clone(), worker_id, queue, callback),
)
.await;
if let Err(e) = res {
tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}"));
match res {
// clean exit
Ok(Ok(())) => break,
if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await;
// job error
Ok(Err(e)) => {
tracing::warn!("Error processing jobs: {}", format!("{e}"));
tracing::warn!("{}", format!("{e:?}"));
if e.is_disconnected() {
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
continue;
// job panic
Err(_) => {
tracing::warn!("Panic while processing jobs");
}
}
break;
}
}
async fn job_loop<S, F>(
state: &State<S>,
state: Rc<State<S>>,
worker_id: uuid::Uuid,
queue: &'static str,
callback: F,
) -> Result<(), Error>
where
S: Store,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy,
S: Store + 'static,
for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
{
loop {
tracing::trace!("job_loop: looping");
@ -360,14 +374,18 @@ where
let guard = MetricsGuard::guard(worker_id, queue);
let res = heartbeat(
&state.repo,
queue,
worker_id,
job_id,
(callback)(state, job),
)
.with_poll_timer("job-and-heartbeat")
let state2 = state.clone();
let res = crate::sync::spawn("job-and-heartbeat", async move {
let state = state2;
heartbeat(
&state.repo,
queue,
worker_id,
job_id,
(callback)(&state, job),
)
.await
})
.await;
state
@ -376,7 +394,7 @@ where
.with_poll_timer("job-complete")
.await?;
res?;
res.map_err(|_| UploadError::Canceled)??;
guard.disarm();

View file

@ -147,15 +147,24 @@ async fn hash(repo: &ArcRepo, hash: Hash) -> JobResult {
pub(crate) async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> JobResult {
let saved_delete_token = repo.delete_token(&alias).await.retry()?;
if saved_delete_token.is_none() {
let hash = repo.hash(&alias).await.retry()?;
// already deleted
if hash.is_none() {
return Ok(());
}
}
if !saved_delete_token.is_some_and(|t| t.ct_eq(&token)) {
return Err(UploadError::InvalidToken).abort();
}
let hash = repo.hash(&alias).await.retry()?;
repo.cleanup_alias(&alias).await.retry()?;
repo.remove_relation(alias.clone()).await.retry()?;
repo.remove_alias_access(alias.clone()).await.retry()?;
repo.cleanup_alias(&alias).await.retry()?;
let hash = hash.ok_or(UploadError::MissingAlias).abort()?;

View file

@ -1,4 +1,3 @@
use time::Instant;
use tracing::{Instrument, Span};
use crate::{
@ -13,7 +12,7 @@ use crate::{
store::Store,
UploadQuery,
};
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use super::{JobContext, JobFuture, JobResult};
@ -90,7 +89,7 @@ impl UploadGuard {
impl Drop for UploadGuard {
fn drop(&mut self) {
metrics::counter!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST, "completed" => (!self.armed).to_string()).increment(1);
metrics::histogram!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST_DURATION, "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_seconds_f64());
metrics::histogram!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST_DURATION, "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_secs_f64());
if self.armed {
tracing::warn!(

View file

@ -444,7 +444,6 @@ where
pub(crate) trait SettingsRepo: BaseRepo {
async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError>;
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError>;
async fn remove(&self, key: &'static str) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
@ -459,10 +458,6 @@ where
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
T::get(self, key).await
}
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
T::remove(self, key).await
}
}
#[async_trait::async_trait(?Send)]

View file

@ -1,10 +1,7 @@
use dashmap::DashMap;
use dashmap::{mapref::entry::Entry, DashMap};
use std::{
future::Future,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Weak,
},
sync::{Arc, Weak},
time::Duration,
};
use tokio::sync::Notify;
@ -26,7 +23,6 @@ struct NotificationEntryInner {
key: Arc<str>,
map: Map,
notify: Notify,
armed: AtomicBool,
}
impl NotificationMap {
@ -37,30 +33,34 @@ impl NotificationMap {
}
pub(super) fn register_interest(&self, key: Arc<str>) -> NotificationEntry {
let new_entry = Arc::new(NotificationEntryInner {
key: key.clone(),
map: self.map.clone(),
notify: crate::sync::bare_notify(),
armed: AtomicBool::new(false),
});
match self.map.entry(key.clone()) {
Entry::Occupied(mut occupied) => {
if let Some(inner) = occupied.get().upgrade() {
NotificationEntry { inner }
} else {
let inner = Arc::new(NotificationEntryInner {
key,
map: self.map.clone(),
notify: crate::sync::bare_notify(),
});
let mut key_entry = self
.map
.entry(key)
.or_insert_with(|| Arc::downgrade(&new_entry));
occupied.insert(Arc::downgrade(&inner));
let upgraded_entry = key_entry.value().upgrade();
NotificationEntry { inner }
}
}
Entry::Vacant(vacant) => {
let inner = Arc::new(NotificationEntryInner {
key,
map: self.map.clone(),
notify: crate::sync::bare_notify(),
});
let inner = if let Some(entry) = upgraded_entry {
entry
} else {
*key_entry.value_mut() = Arc::downgrade(&new_entry);
new_entry
};
vacant.insert(Arc::downgrade(&inner));
inner.armed.store(true, Ordering::Release);
NotificationEntry { inner }
NotificationEntry { inner }
}
}
}
pub(super) fn notify(&self, key: &str) {
@ -87,8 +87,6 @@ impl Default for NotificationMap {
impl Drop for NotificationEntryInner {
fn drop(&mut self) {
if self.armed.load(Ordering::Acquire) {
self.map.remove(&self.key);
}
self.map.remove(&self.key);
}
}

View file

@ -26,7 +26,7 @@ use diesel_async::{
use futures_core::Stream;
use tokio::sync::Notify;
use tokio_postgres::{AsyncMessage, Connection, NoTls, Notification, Socket};
use tokio_postgres_rustls::MakeRustlsConnect;
use tokio_postgres_generic_rustls::{AwsLcRsDigest, MakeRustlsConnect};
use tracing::Instrument;
use url::Url;
use uuid::Uuid;
@ -173,7 +173,7 @@ impl PostgresError {
async fn build_tls_connector(
certificate_file: Option<PathBuf>,
) -> Result<MakeRustlsConnect, TlsError> {
) -> Result<MakeRustlsConnect<AwsLcRsDigest>, TlsError> {
let mut cert_store = rustls::RootCertStore {
roots: Vec::from(webpki_roots::TLS_SERVER_ROOTS),
};
@ -199,14 +199,14 @@ async fn build_tls_connector(
.with_root_certificates(cert_store)
.with_no_client_auth();
let tls = MakeRustlsConnect::new(config);
let tls = MakeRustlsConnect::new(config, AwsLcRsDigest);
Ok(tls)
}
async fn connect_for_migrations(
postgres_url: &Url,
tls_connector: Option<MakeRustlsConnect>,
tls_connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
) -> Result<
(
tokio_postgres::Client,
@ -265,8 +265,8 @@ where
async fn build_pool(
postgres_url: &Url,
tx: flume::Sender<Notification>,
connector: Option<MakeRustlsConnect>,
tx: tokio::sync::mpsc::Sender<Notification>,
connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
max_size: u32,
) -> Result<Pool<AsyncPgConnection>, ConnectPostgresError> {
let mut config = ManagerConfig::default();
@ -319,7 +319,7 @@ impl PostgresRepo {
.map(|u| u.into())
.unwrap_or(1_usize);
let (tx, rx) = flume::bounded(10);
let (tx, rx) = crate::sync::channel(10);
let pool = build_pool(
&postgres_url,
@ -621,7 +621,7 @@ type ConfigFn =
Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>;
async fn delegate_notifications(
receiver: flume::Receiver<Notification>,
mut receiver: tokio::sync::mpsc::Receiver<Notification>,
inner: Arc<Inner>,
capacity: usize,
) {
@ -636,7 +636,7 @@ async fn delegate_notifications(
let keyed_notifier_state = KeyedNotifierState { inner: &inner };
while let Ok(notification) = receiver.recv_async().await {
while let Some(notification) = receiver.recv().await {
tracing::trace!("delegate_notifications: looping");
metrics::counter!(crate::init_metrics::POSTGRES_NOTIFICATION).increment(1);
@ -666,8 +666,8 @@ async fn delegate_notifications(
}
fn build_handler(
sender: flume::Sender<Notification>,
connector: Option<MakeRustlsConnect>,
sender: tokio::sync::mpsc::Sender<Notification>,
connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
) -> ConfigFn {
Box::new(
move |config: &str| -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> {
@ -708,7 +708,7 @@ fn build_handler(
}
fn spawn_db_notification_task<S>(
sender: flume::Sender<Notification>,
sender: tokio::sync::mpsc::Sender<Notification>,
mut conn: Connection<Socket, S>,
) where
S: tokio_postgres::tls::TlsStream + Send + Unpin + 'static,
@ -729,7 +729,7 @@ fn spawn_db_notification_task<S>(
tracing::warn!("Database Notice {e:?}");
}
Ok(AsyncMessage::Notification(notification)) => {
if sender.send_async(notification).await.is_err() {
if sender.send(notification).await.is_err() {
tracing::warn!("Missed notification. Are we shutting down?");
}
}
@ -1412,24 +1412,6 @@ impl SettingsRepo for PostgresRepo {
Ok(opt)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> {
use schema::settings::dsl::*;
let mut conn = self.get_connection().await?;
diesel::delete(settings)
.filter(key.eq(input_key))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_SETTINGS_REMOVE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
}
#[async_trait::async_trait(?Send)]

View file

@ -807,7 +807,7 @@ impl QueueRepo for SledRepo {
.read()
.unwrap()
.get(&queue_name)
.map(Arc::clone);
.cloned();
let notify = if let Some(notify) = opt {
notify
@ -945,13 +945,6 @@ impl SettingsRepo for SledRepo {
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
}
#[tracing::instrument(level = "trace", skip(self))]
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
b!(self.settings, settings.remove(key));
Ok(())
}
}
fn variant_access_key(hash: &[u8], variant: &str) -> Vec<u8> {

View file

@ -72,7 +72,7 @@ impl From<crate::store::object_store::ObjectError> for StoreError {
fn from(value: crate::store::object_store::ObjectError) -> Self {
match value {
e @ crate::store::object_store::ObjectError::Status(
actix_web::http::StatusCode::NOT_FOUND,
reqwest::StatusCode::NOT_FOUND,
_,
_,
) => Self::ObjectNotFound(e),

View file

@ -4,16 +4,16 @@ use crate::{
};
use actix_web::{
error::BlockingError,
http::{
header::{ByteRangeSpec, Range, CONTENT_LENGTH},
StatusCode,
},
http::header::{ByteRangeSpec, Range},
rt::task::JoinError,
web::Bytes,
};
use base64::{prelude::BASE64_STANDARD, Engine};
use futures_core::Stream;
use reqwest::{header::RANGE, Body, Response};
use reqwest::{
header::{CONTENT_LENGTH, RANGE},
Body, Response, StatusCode,
};
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
use rusty_s3::{
actions::{CreateMultipartUpload, S3Action},

View file

@ -91,7 +91,7 @@ where
S: Stream + 'static,
S::Item: Send + Sync,
{
let (tx, rx) = crate::sync::channel(1);
let (tx, mut rx) = crate::sync::channel(1);
let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move {
let stream = std::pin::pin!(stream);
@ -100,16 +100,14 @@ where
while let Some(res) = streamer.next().await {
tracing::trace!("make send tx: looping");
if tx.send_async(res).await.is_err() {
if tx.send(res).await.is_err() {
break;
}
}
}));
streem::from_fn(|yiedler| async move {
let mut stream = rx.into_stream().into_streamer();
while let Some(res) = stream.next().await {
while let Some(res) = rx.recv().await {
tracing::trace!("make send rx: looping");
yiedler.yield_(res).await;
@ -124,20 +122,18 @@ where
I: IntoIterator + Send + 'static,
I::Item: Send + Sync,
{
let (tx, rx) = crate::sync::channel(buffer);
let (tx, mut rx) = crate::sync::channel(buffer);
let handle = crate::sync::spawn_blocking("blocking-iterator", move || {
for value in iterator {
if tx.send(value).is_err() {
if tx.blocking_send(value).is_err() {
break;
}
}
});
streem::from_fn(|yielder| async move {
let mut stream = rx.into_stream().into_streamer();
while let Some(res) = stream.next().await {
while let Some(res) = rx.recv().await {
tracing::trace!("from_iterator: looping");
yielder.yield_(res).await;

View file

@ -39,11 +39,13 @@ impl<T> std::future::Future for DropHandle<T> {
}
#[track_caller]
pub(crate) fn channel<T>(bound: usize) -> (flume::Sender<T>, flume::Receiver<T>) {
pub(crate) fn channel<T>(
bound: usize,
) -> (tokio::sync::mpsc::Sender<T>, tokio::sync::mpsc::Receiver<T>) {
let span = tracing::trace_span!(parent: None, "make channel");
let guard = span.enter();
let channel = flume::bounded(bound);
let channel = tokio::sync::mpsc::channel(bound);
drop(guard);
channel

View file

@ -1,6 +1,6 @@
use std::path::PathBuf;
use rustls::{crypto::ring::sign::any_supported_type, sign::CertifiedKey, Error};
use rustls::{crypto::aws_lc_rs::sign::any_supported_type, sign::CertifiedKey, Error};
pub(super) struct Tls {
certificate: PathBuf,