Compare commits

...

29 commits

Author SHA1 Message Date
asonix 1c4e343d9d Prepare 0.5.14 2024-05-20 22:23:08 -05:00
asonix d03cc63d2b ffprobe: handle files with empty stream json 2024-05-20 22:08:54 -05:00
asonix 260f9a158a Update dependencies (minor & point) 2024-05-19 10:39:21 -05:00
asonix a7c78cd54e Merge pull request 'Update rustls for tokio-postgres' (#58) from asonix/update-tokio-postgres-rustls into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/58
2024-05-19 15:35:47 +00:00
asonix d7dc2e506d Merge branch 'main' into asonix/update-tokio-postgres-rustls 2024-05-19 10:21:12 -05:00
asonix e48f60a6c6 Merge pull request 'Update rustls for actix-web' (#61) from asonix/update-actix-web-rustls into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/61
2024-05-19 15:18:37 +00:00
asonix 9d01aeb82c Update rustls for actix-web
includes update for rustls-channel-resolver
2024-05-19 10:08:48 -05:00
asonix bddfb3c9d0 Merge branch 'main' into asonix/update-tokio-postgres-rustls 2024-05-19 09:40:45 -05:00
asonix 7ae3c0c776 Merge pull request 'Update reqwest to 0.12' (#59) from asonix/update-reqwest into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/59
2024-05-19 14:37:50 +00:00
asonix 983e9ce151 Merge branch 'main' into asonix/update-reqwest 2024-05-19 09:36:54 -05:00
asonix 9302062b26 Merge pull request 'Update metrics-exporter-prometheus' (#60) from asonix/update-metrics-exporter-prometheus into main
Reviewed-on: https://git.asonix.dog/asonix/pict-rs/pulls/60
2024-05-19 14:35:27 +00:00
asonix 33e72266f5 Add public mechanism for installing aws-lc-rs 2024-05-03 23:05:17 -05:00
asonix 39da69b1aa Use tokio-postgres-generic-rustls 2024-05-03 22:39:30 -05:00
asonix 64b8635059 Update rustls for tokio-postgres
This doesn't update rustls for actix-web (0.22), or rustls for reqwest (0.21)
2024-05-03 22:39:30 -05:00
asonix d45e3fa386 Remove unused 'remove' repo method 2024-05-03 22:35:20 -05:00
asonix bfd4fd4689 Remove unused StatusError type 2024-05-03 22:34:18 -05:00
asonix 89f3c447a8 clippy 2024-05-01 14:57:03 -05:00
asonix 46cfbf99a5 Update metrics-exporter-prometheus
This pulls in hyper 1 and http 1
2024-05-01 14:50:20 -05:00
asonix 58529a2eb2 Update reqwest to 0.12
This pulls in hyper 1 and http 1, but removes rustls 0.21
2024-05-01 14:46:29 -05:00
asonix 700aeb90e0 Fix time deprecation warnings 2024-05-01 14:33:07 -05:00
asonix ff39c30cc8 Update direct base64 dependency 2024-05-01 14:32:26 -05:00
asonix 9561c578dc Update dependencies (minor & point) 2024-05-01 14:30:22 -05:00
asonix dc7bdf7eeb Update flake.lock 2024-04-21 21:02:42 -05:00
asonix 33ba045ee1 Apparently imagemagick needs a shell to delegate to ffmpeg properly 2024-04-21 21:02:31 -05:00
asonix f082e48ed8 Attempt to set up nix-based docker for pict-rs
There's a bug when converting APNG files to WEBP files, which
imagemagick delegates to ffmpeg. When doing 'nix build' and running the
result, or running pict-rs in the dev shell, it works fine. In the
container, this doesn't work at all. imagemagick complains that there's
no media to convert, implying ffmpeg has output a zero-sized file.

This work is helping to narrow down exactly what pict-rs needs to run,
though. This still needs to be tested against h264, h265, vp8, vp9 and
av1.
2024-04-21 14:31:03 -05:00
asonix 97159e0030 Update release document 2024-04-15 21:17:40 -05:00
asonix 6d40fbee47 Prepare 0.5.13 2024-04-15 15:31:31 -05:00
asonix c4e99ef539 Add ability to disable colorized logs 2024-04-15 15:16:10 -05:00
asonix 3428c31f16 Use tokio channels again 2024-04-14 20:06:58 -05:00
32 changed files with 1000 additions and 634 deletions

1221
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.5.12"
version = "0.5.14"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
@ -20,10 +20,10 @@ random-errors = ["dep:nanorand"]
[dependencies]
actix-form-data = "0.7.0-beta.7"
actix-web = { version = "4.0.0", default-features = false, features = ["rustls-0_22"] }
actix-web = { version = "4.6.0", default-features = false, features = ["rustls-0_23"] }
async-trait = "0.1.51"
barrel = { version = "0.7.0", features = ["pg"] }
base64 = "0.21.0"
base64 = "0.22.0"
bb8 = "0.8.3"
blurhash-update = "0.1.0"
clap = { version = "4.0.2", features = ["derive"] }
@ -34,12 +34,11 @@ dashmap = "5.1.0"
diesel = { version = "2.1.1", features = ["postgres_backend", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.4.1", features = ["bb8", "postgres"] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
flume = "0.11.0"
futures-core = "0.3"
hex = "0.4.3"
md-5 = "0.10.5"
metrics = "0.22.0"
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = ["http-listener"] }
metrics-exporter-prometheus = { version = "0.14.0", default-features = false, features = ["http-listener"] }
mime = "0.3.1"
nanorand = { version = "0.7", optional = true }
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"] }
@ -47,13 +46,14 @@ opentelemetry = "0.22"
opentelemetry-otlp = "0.15"
pin-project-lite = "0.2.7"
refinery = { version = "0.8.10", features = ["tokio-postgres", "postgres"] }
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "stream"] }
reqwest-middleware = "0.2.2"
reqwest-tracing = "0.4.5"
# pinned to tokio-postgres-rustls
rustls = "0.22.0"
reqwest = { version = "0.12.0", default-features = false, features = ["json", "rustls-tls", "stream"] }
reqwest-middleware = "0.3.0"
reqwest-tracing = "0.5.0"
# pinned to tokio-postgres-generic-rustls
# pinned to actix-web
rustls = "0.23"
# pinned to rustls
rustls-channel-resolver = "0.2.0"
rustls-channel-resolver = "0.3.0"
# pinned to rustls
rustls-pemfile = "2.0.0"
rusty-s3 = "0.5.0"
@ -69,7 +69,7 @@ thiserror = "1.0"
time = { version = "0.3.0", features = ["serde", "serde-well-known"] }
tokio = { version = "1", features = ["full", "tracing"] }
tokio-postgres = { version = "0.7.10", features = ["with-uuid-1", "with-time-0_3", "with-serde_json-1"] }
tokio-postgres-rustls = "0.11.0"
tokio-postgres-generic-rustls = { version = "0.1.0", default-features = false, features = ["aws-lc-rs"] }
tokio-uring = { version = "0.4", optional = true, features = ["bytes"] }
tokio-util = { version = "0.7", default-features = false, features = [
"codec",

View file

@ -16,6 +16,7 @@ concurrency = 32
format = "normal"
targets = "info"
log_spans = false
no_ansi = false
[tracing.console]
buffer_capacity = 102400

View file

@ -5,11 +5,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1705309234,
"narHash": "sha256-uNRRNRKmJyCRC/8y1RqBkqWBLM034y4qN7EprSdmgyA=",
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "1ef2e671c3b0c19053962c07dbda38332dcebf26",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1705133751,
"narHash": "sha256-rCIsyE80jgiOU78gCWN3A0wE0tR2GI5nH6MlS+HaaSQ=",
"lastModified": 1713537308,
"narHash": "sha256-XtTSSIB2DA6tOv+l0FhvfDMiyCmhoRbNB+0SeInZkbk=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "9b19f5e77dd906cb52dade0b7bd280339d2a1f3d",
"rev": "5c24cf2f0a12ad855f444c30b2421d044120c66f",
"type": "github"
},
"original": {

View file

@ -15,13 +15,29 @@
in
{
packages = rec {
imagemagick7_pict-rs = pkgs.callPackage ./nix/pkgs/imagemagick_pict-rs {};
ffmpeg6_pict-rs = pkgs.callPackage ./nix/pkgs/ffmpeg_pict-rs {};
pict-rs = pkgs.callPackage ./pict-rs.nix {
inherit (pkgs.darwin.apple_sdk.frameworks) Security;
inherit imagemagick7_pict-rs ffmpeg6_pict-rs;
};
default = pict-rs;
};
docker = pkgs.dockerTools.buildLayeredImage {
name = "pict-rs";
tag = "latest";
contents = [ pkgs.tini self.packages.${system}.pict-rs pkgs.bash ];
config = {
Entrypoint = [ "/bin/tini" "--" "/bin/pict-rs" ];
Cmd = [ "run" ];
};
};
apps = rec {
dev = flake-utils.lib.mkApp { drv = self.packages.${system}.pict-rs; };
default = dev;
@ -36,9 +52,9 @@
curl
diesel-cli
exiftool
ffmpeg_6-full
garage
imagemagick
self.packages.${system}.imagemagick7_pict-rs
self.packages.${system}.ffmpeg6_pict-rs
jq
minio-client
rust-analyzer

View file

@ -0,0 +1,5 @@
{ ffmpeg_6-headless }:
ffmpeg_6-headless.override {
withWebp = true;
}

View file

@ -0,0 +1,23 @@
{ imagemagick7 }:
imagemagick7.override {
bzip2Support = true;
zlibSupport = true;
libX11Support = false;
libXtSupport = false;
fontconfigSupport = false;
freetypeSupport = false;
libjpegSupport = true;
djvulibreSupport = false;
lcms2Support = false;
openexrSupport = false;
libjxlSupport = true;
libpngSupport = true;
liblqr1Support = false;
librsvgSupport = false;
libtiffSupport = false;
libxml2Support = false;
openjpegSupport = true;
libwebpSupport = true;
libheifSupport = true;
}

View file

@ -1,6 +1,6 @@
{ exiftool
, ffmpeg_6-full
, imagemagick
, ffmpeg6_pict-rs
, imagemagick7_pict-rs
, lib
, makeWrapper
, nixosTests
@ -11,7 +11,7 @@
rustPlatform.buildRustPackage {
pname = "pict-rs";
version = "0.5.12";
version = "0.5.14";
src = ./.;
cargoLock = {
@ -27,7 +27,7 @@ rustPlatform.buildRustPackage {
postInstall = ''
wrapProgram $out/bin/pict-rs \
--prefix PATH : "${lib.makeBinPath [ imagemagick ffmpeg_6-full exiftool ]}"
--prefix PATH : "${lib.makeBinPath [ imagemagick7_pict-rs ffmpeg6_pict-rs exiftool ]}"
'';
passthru.tests = { inherit (nixosTests) pict-rs; };

View file

@ -98,6 +98,11 @@ targets = 'info'
# default: false
log_spans = false
## Optional: whether to disable colorized log output
# environment variable: PICTRS__TRACING__LOGGING__NO_ANSI
# default: false
no_ansi = false
## Console configuration
[tracing.console]

62
releases/0.5.13.md Normal file
View file

@ -0,0 +1,62 @@
# pict-rs 0.5.13
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.13 is a maintenance release aiming to enable better logging in some scenarios.
### Features
- [Colorless Logging](#colorless-logging)
### Changes
- [Remove Flume](#remove-flume)
## Upgrade Notes
There are no significant changes from 0.5.12. Upgrading should be as simple as pulling a new version
of pict-rs.
## Descriptions
### Colorless Logging
When opting to use the `json` logger, the tracing subscriber automatically disables colored output.
This didn't remove colors from errors, though, and pict-rs hasn't had a way to disable colors while
using other log formats. pict-rs 0.5.13 introduces a new configuration value to remove colored
output from all logs regardless of logging format.
With pict-rs.toml
```toml
[tracing.logging]
no_ansi = true
```
With environment variables
```bash
PICTRS__TRACING__LOGGING__NO_ANSI=true
```
With commandline flags
```bash
pict-rs --no-log-ansi run
```
Colors in logs can be useful, so I imagine this option won't be used much. There has been a request
for this functionality though and it's little cost to maintain.
### Remove Flume
Recently I've been debugging a memory usage issue in another project of mine. I wasn't able to fully
track down the cause, but I did notice that removing the
[flume channel library](https://github.com/zesterer/flume) seemed to make the leak go away. Since I
also use flume in pict-rs, I'm opting to replace it with tokio's native channel implementation. This
may or may not improve memory usage, but it does reduce the depenency count and therefore build time
for pict-rs.

28
releases/0.5.14.md Normal file
View file

@ -0,0 +1,28 @@
# pict-rs 0.5.14
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.14 includes a bugfix for identifying certain MOV videos, as well as updated dependencies.
### Fixes
- [Empty Stream Parsing](#empty-stream-parsing)
## Upgrade Notes
There are no significant changes from 0.5.13. Upgrading should be as simple as pulling a new version
of pict-rs.
## Descriptions
### Empty Stream Parsing
Certain videos, when identified with ffprobe, contain stream json objects with no fields. This would
cause pict-rs to fail to parse the information for these videos, as it expects streams to at least
contain a codec field. In pict-rs 0.5.14, empty streams are now considered valid and are simply
ignored.

View file

@ -18,6 +18,7 @@ impl Args {
log_format,
log_targets,
log_spans,
no_log_ansi,
console_address,
console_buffer_capacity,
opentelemetry_url,
@ -38,6 +39,7 @@ impl Args {
format: log_format,
targets: log_targets.map(Serde::new),
log_spans,
no_ansi: no_log_ansi,
},
console: Console {
address: console_address,
@ -581,6 +583,8 @@ struct Logging {
targets: Option<Serde<Targets>>,
#[serde(skip_serializing_if = "std::ops::Not::not")]
log_spans: bool,
#[serde(skip_serializing_if = "std::ops::Not::not")]
no_ansi: bool,
}
#[derive(Debug, Default, serde::Serialize)]
@ -925,6 +929,10 @@ pub(super) struct Args {
#[arg(long)]
log_spans: bool,
#[arg(long)]
/// Whether to disable color-codes in log output
no_log_ansi: bool,
/// Address and port to expose tokio-console metrics
#[arg(long)]
console_address: Option<SocketAddr>,

View file

@ -55,6 +55,7 @@ struct LoggingDefaults {
format: LogFormat,
targets: Serde<Targets>,
log_spans: bool,
no_ansi: bool,
}
#[derive(Clone, Debug, serde::Serialize)]
@ -235,6 +236,7 @@ impl Default for LoggingDefaults {
format: LogFormat::Normal,
targets: "info".parse().expect("Valid targets string"),
log_spans: false,
no_ansi: false,
}
}
}

View file

@ -163,6 +163,8 @@ pub(crate) struct Logging {
pub(crate) targets: Serde<Targets>,
pub(crate) log_spans: bool,
pub(crate) no_ansi: bool,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]

View file

@ -53,6 +53,7 @@ impl FfMpegStreams {
FfMpegStream::Unknown { codec_name } => {
tracing::info!("Encountered unknown stream {codec_name}");
}
FfMpegStream::Empty {} => {}
}
}
@ -135,6 +136,7 @@ enum FfMpegStream {
Audio(FfMpegAudioStream),
Video(FfMpegVideoStream),
Unknown { codec_name: String },
Empty {},
}
#[derive(Debug, serde::Deserialize)]

View file

@ -0,0 +1,35 @@
{
"programs": [
],
"streams": [
{
"codec_name": "hevc",
"width": 1920,
"height": 1080,
"pix_fmt": "yuv420p10le",
"nb_read_frames": "187",
"side_data_list": [
{
}
]
},
{
"codec_name": "aac",
"nb_read_frames": "135"
},
{
},
{
},
{
}
],
"format": {
"format_name": "mov,mp4,m4a,3gp,3g2,mj2"
}
}

View file

@ -1,11 +1,11 @@
use crate::formats::{
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat, Mp4Codec,
WebmAlphaCodec, WebmCodec,
AlphaCodec, AnimationFormat, ImageFormat, ImageInput, InputFile, InputVideoFormat,
Mp4AudioCodec, Mp4Codec, WebmAlphaCodec, WebmCodec,
};
use super::{Discovery, FfMpegDiscovery, PixelFormatOutput};
fn details_tests() -> [(&'static str, Option<Discovery>); 13] {
fn details_tests() -> [(&'static str, Option<Discovery>); 14] {
[
(
"animated_webp",
@ -151,6 +151,18 @@ fn details_tests() -> [(&'static str, Option<Discovery>); 13] {
frames: None,
}),
),
(
"mov",
Some(Discovery {
input: InputFile::Video(InputVideoFormat::Mp4 {
video_codec: Mp4Codec::H265,
audio_codec: Some(Mp4AudioCodec::Aac),
}),
width: 1920,
height: 1080,
frames: Some(187),
}),
),
]
}

3
src/http1.rs Normal file
View file

@ -0,0 +1,3 @@
pub(crate) fn to_actix_status(status: reqwest::StatusCode) -> actix_web::http::StatusCode {
actix_web::http::StatusCode::from_u16(status.as_u16()).expect("status codes are always valid")
}

View file

@ -1,4 +1,5 @@
use crate::config::{LogFormat, OpenTelemetry, Tracing};
use color_eyre::config::Theme;
use console_subscriber::ConsoleLayer;
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
@ -11,7 +12,15 @@ use tracing_subscriber::{
};
pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
color_eyre::install()?;
let eyre_theme = if tracing.logging.no_ansi {
Theme::new()
} else {
Theme::dark()
};
color_eyre::config::HookBuilder::new()
.theme(eyre_theme)
.install()?;
LogTracer::init()?;
@ -23,7 +32,9 @@ pub(super) fn init_tracing(tracing: &Tracing) -> color_eyre::Result<()> {
FmtSpan::NONE
};
let format_layer = tracing_subscriber::fmt::layer().with_span_events(fmt_span);
let format_layer = tracing_subscriber::fmt::layer()
.with_span_events(fmt_span)
.with_ansi(!tracing.logging.no_ansi);
match tracing.logging.format {
LogFormat::Compact => with_format(format_layer.compact(), tracing),

View file

@ -14,6 +14,7 @@ mod file_path;
mod formats;
mod future;
mod generate;
mod http1;
mod ingest;
mod init_metrics;
mod init_tracing;
@ -580,7 +581,7 @@ async fn download_stream<S>(
let res = state.client.get(url).send().await?;
if !res.status().is_success() {
return Err(UploadError::Download(res.status()).into());
return Err(UploadError::Download(http1::to_actix_status(res.status())).into());
}
let stream = crate::stream::limit(
@ -1763,7 +1764,7 @@ async fn launch<
tracing::info!("Starting pict-rs with TLS on {address}");
server.bind_rustls_0_22(address, config)?.run().await?;
server.bind_rustls_0_23(address, config)?.run().await?;
handle.abort();
let _ = handle.await;
@ -1937,6 +1938,19 @@ impl PictRsConfiguration {
Ok(self)
}
/// Install aws-lc-rs as the default crypto provider
///
/// This would happen automatically anyway unless rustls crate features get mixed up
pub fn install_crypto_provider(self) -> Self {
if rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.is_err()
{
tracing::info!("rustls crypto provider already installed");
}
self
}
/// Run the pict-rs application on a tokio `LocalSet`
///
/// This must be called from within `tokio::main` directly

View file

@ -4,6 +4,7 @@ fn main() -> color_eyre::Result<()> {
pict_rs::PictRsConfiguration::build_default()?
.install_tracing()?
.install_metrics()?
.install_crypto_provider()
.run()
.await
})
@ -18,6 +19,7 @@ fn main() -> color_eyre::Result<()> {
pict_rs::PictRsConfiguration::build_default()?
.install_tracing()?
.install_metrics()?
.install_crypto_provider()
.run_on_localset()
.await
})

View file

@ -45,10 +45,10 @@ impl Drop for MetricsGuard {
}
}
async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
async fn drain(mut rx: tokio::sync::mpsc::Receiver<actix_web::dev::Payload>) {
let mut set = JoinSet::new();
while let Ok(payload) = rx.recv_async().await {
while let Some(payload) = rx.recv().await {
tracing::trace!("drain: looping");
// draining a payload is a best-effort task - if we can't collect in 2 minutes we bail
@ -94,18 +94,18 @@ async fn drain(rx: flume::Receiver<actix_web::dev::Payload>) {
struct DrainHandle(Option<Rc<tokio::task::JoinHandle<()>>>);
pub(crate) struct Payload {
sender: flume::Sender<actix_web::dev::Payload>,
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
handle: DrainHandle,
}
pub(crate) struct PayloadMiddleware<S> {
inner: S,
sender: flume::Sender<actix_web::dev::Payload>,
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
_handle: DrainHandle,
}
pub(crate) struct PayloadStream {
inner: Option<actix_web::dev::Payload>,
sender: flume::Sender<actix_web::dev::Payload>,
sender: tokio::sync::mpsc::Sender<actix_web::dev::Payload>,
}
impl DrainHandle {

View file

@ -59,9 +59,6 @@ impl Drop for MetricsGuard {
}
}
#[derive(Debug)]
struct StatusError(ExitStatus);
pub(crate) struct Process {
command: Arc<str>,
child: Child,
@ -487,11 +484,3 @@ impl ProcessRead {
}
}
}
impl std::fmt::Display for StatusError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Command failed with bad status: {}", self.0)
}
}
impl std::error::Error for StatusError {}

View file

@ -1,4 +1,3 @@
use time::Instant;
use tracing::{Instrument, Span};
use crate::{
@ -13,7 +12,7 @@ use crate::{
store::Store,
UploadQuery,
};
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use super::{JobContext, JobFuture, JobResult};
@ -90,7 +89,7 @@ impl UploadGuard {
impl Drop for UploadGuard {
fn drop(&mut self) {
metrics::counter!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST, "completed" => (!self.armed).to_string()).increment(1);
metrics::histogram!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST_DURATION, "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_seconds_f64());
metrics::histogram!(crate::init_metrics::BACKGROUND_UPLOAD_INGEST_DURATION, "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_secs_f64());
if self.armed {
tracing::warn!(

View file

@ -444,7 +444,6 @@ where
pub(crate) trait SettingsRepo: BaseRepo {
async fn set(&self, key: &'static str, value: Arc<[u8]>) -> Result<(), RepoError>;
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError>;
async fn remove(&self, key: &'static str) -> Result<(), RepoError>;
}
#[async_trait::async_trait(?Send)]
@ -459,10 +458,6 @@ where
async fn get(&self, key: &'static str) -> Result<Option<Arc<[u8]>>, RepoError> {
T::get(self, key).await
}
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
T::remove(self, key).await
}
}
#[async_trait::async_trait(?Send)]

View file

@ -26,7 +26,7 @@ use diesel_async::{
use futures_core::Stream;
use tokio::sync::Notify;
use tokio_postgres::{AsyncMessage, Connection, NoTls, Notification, Socket};
use tokio_postgres_rustls::MakeRustlsConnect;
use tokio_postgres_generic_rustls::{AwsLcRsDigest, MakeRustlsConnect};
use tracing::Instrument;
use url::Url;
use uuid::Uuid;
@ -173,7 +173,7 @@ impl PostgresError {
async fn build_tls_connector(
certificate_file: Option<PathBuf>,
) -> Result<MakeRustlsConnect, TlsError> {
) -> Result<MakeRustlsConnect<AwsLcRsDigest>, TlsError> {
let mut cert_store = rustls::RootCertStore {
roots: Vec::from(webpki_roots::TLS_SERVER_ROOTS),
};
@ -199,14 +199,14 @@ async fn build_tls_connector(
.with_root_certificates(cert_store)
.with_no_client_auth();
let tls = MakeRustlsConnect::new(config);
let tls = MakeRustlsConnect::new(config, AwsLcRsDigest);
Ok(tls)
}
async fn connect_for_migrations(
postgres_url: &Url,
tls_connector: Option<MakeRustlsConnect>,
tls_connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
) -> Result<
(
tokio_postgres::Client,
@ -265,8 +265,8 @@ where
async fn build_pool(
postgres_url: &Url,
tx: flume::Sender<Notification>,
connector: Option<MakeRustlsConnect>,
tx: tokio::sync::mpsc::Sender<Notification>,
connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
max_size: u32,
) -> Result<Pool<AsyncPgConnection>, ConnectPostgresError> {
let mut config = ManagerConfig::default();
@ -319,7 +319,7 @@ impl PostgresRepo {
.map(|u| u.into())
.unwrap_or(1_usize);
let (tx, rx) = flume::bounded(10);
let (tx, rx) = crate::sync::channel(10);
let pool = build_pool(
&postgres_url,
@ -621,7 +621,7 @@ type ConfigFn =
Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>;
async fn delegate_notifications(
receiver: flume::Receiver<Notification>,
mut receiver: tokio::sync::mpsc::Receiver<Notification>,
inner: Arc<Inner>,
capacity: usize,
) {
@ -636,7 +636,7 @@ async fn delegate_notifications(
let keyed_notifier_state = KeyedNotifierState { inner: &inner };
while let Ok(notification) = receiver.recv_async().await {
while let Some(notification) = receiver.recv().await {
tracing::trace!("delegate_notifications: looping");
metrics::counter!(crate::init_metrics::POSTGRES_NOTIFICATION).increment(1);
@ -666,8 +666,8 @@ async fn delegate_notifications(
}
fn build_handler(
sender: flume::Sender<Notification>,
connector: Option<MakeRustlsConnect>,
sender: tokio::sync::mpsc::Sender<Notification>,
connector: Option<MakeRustlsConnect<AwsLcRsDigest>>,
) -> ConfigFn {
Box::new(
move |config: &str| -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> {
@ -708,7 +708,7 @@ fn build_handler(
}
fn spawn_db_notification_task<S>(
sender: flume::Sender<Notification>,
sender: tokio::sync::mpsc::Sender<Notification>,
mut conn: Connection<Socket, S>,
) where
S: tokio_postgres::tls::TlsStream + Send + Unpin + 'static,
@ -729,7 +729,7 @@ fn spawn_db_notification_task<S>(
tracing::warn!("Database Notice {e:?}");
}
Ok(AsyncMessage::Notification(notification)) => {
if sender.send_async(notification).await.is_err() {
if sender.send(notification).await.is_err() {
tracing::warn!("Missed notification. Are we shutting down?");
}
}
@ -1412,24 +1412,6 @@ impl SettingsRepo for PostgresRepo {
Ok(opt)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn remove(&self, input_key: &'static str) -> Result<(), RepoError> {
use schema::settings::dsl::*;
let mut conn = self.get_connection().await?;
diesel::delete(settings)
.filter(key.eq(input_key))
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_SETTINGS_REMOVE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
Ok(())
}
}
#[async_trait::async_trait(?Send)]

View file

@ -807,7 +807,7 @@ impl QueueRepo for SledRepo {
.read()
.unwrap()
.get(&queue_name)
.map(Arc::clone);
.cloned();
let notify = if let Some(notify) = opt {
notify
@ -945,13 +945,6 @@ impl SettingsRepo for SledRepo {
Ok(opt.map(|ivec| Arc::from(ivec.to_vec())))
}
#[tracing::instrument(level = "trace", skip(self))]
async fn remove(&self, key: &'static str) -> Result<(), RepoError> {
b!(self.settings, settings.remove(key));
Ok(())
}
}
fn variant_access_key(hash: &[u8], variant: &str) -> Vec<u8> {

View file

@ -72,7 +72,7 @@ impl From<crate::store::object_store::ObjectError> for StoreError {
fn from(value: crate::store::object_store::ObjectError) -> Self {
match value {
e @ crate::store::object_store::ObjectError::Status(
actix_web::http::StatusCode::NOT_FOUND,
reqwest::StatusCode::NOT_FOUND,
_,
_,
) => Self::ObjectNotFound(e),

View file

@ -4,16 +4,16 @@ use crate::{
};
use actix_web::{
error::BlockingError,
http::{
header::{ByteRangeSpec, Range, CONTENT_LENGTH},
StatusCode,
},
http::header::{ByteRangeSpec, Range},
rt::task::JoinError,
web::Bytes,
};
use base64::{prelude::BASE64_STANDARD, Engine};
use futures_core::Stream;
use reqwest::{header::RANGE, Body, Response};
use reqwest::{
header::{CONTENT_LENGTH, RANGE},
Body, Response, StatusCode,
};
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
use rusty_s3::{
actions::{CreateMultipartUpload, S3Action},

View file

@ -91,7 +91,7 @@ where
S: Stream + 'static,
S::Item: Send + Sync,
{
let (tx, rx) = crate::sync::channel(1);
let (tx, mut rx) = crate::sync::channel(1);
let handle = crate::sync::abort_on_drop(crate::sync::spawn("send-stream", async move {
let stream = std::pin::pin!(stream);
@ -100,16 +100,14 @@ where
while let Some(res) = streamer.next().await {
tracing::trace!("make send tx: looping");
if tx.send_async(res).await.is_err() {
if tx.send(res).await.is_err() {
break;
}
}
}));
streem::from_fn(|yiedler| async move {
let mut stream = rx.into_stream().into_streamer();
while let Some(res) = stream.next().await {
while let Some(res) = rx.recv().await {
tracing::trace!("make send rx: looping");
yiedler.yield_(res).await;
@ -124,20 +122,18 @@ where
I: IntoIterator + Send + 'static,
I::Item: Send + Sync,
{
let (tx, rx) = crate::sync::channel(buffer);
let (tx, mut rx) = crate::sync::channel(buffer);
let handle = crate::sync::spawn_blocking("blocking-iterator", move || {
for value in iterator {
if tx.send(value).is_err() {
if tx.blocking_send(value).is_err() {
break;
}
}
});
streem::from_fn(|yielder| async move {
let mut stream = rx.into_stream().into_streamer();
while let Some(res) = stream.next().await {
while let Some(res) = rx.recv().await {
tracing::trace!("from_iterator: looping");
yielder.yield_(res).await;

View file

@ -39,11 +39,13 @@ impl<T> std::future::Future for DropHandle<T> {
}
#[track_caller]
pub(crate) fn channel<T>(bound: usize) -> (flume::Sender<T>, flume::Receiver<T>) {
pub(crate) fn channel<T>(
bound: usize,
) -> (tokio::sync::mpsc::Sender<T>, tokio::sync::mpsc::Receiver<T>) {
let span = tracing::trace_span!(parent: None, "make channel");
let guard = span.enter();
let channel = flume::bounded(bound);
let channel = tokio::sync::mpsc::channel(bound);
drop(guard);
channel

View file

@ -1,6 +1,6 @@
use std::path::PathBuf;
use rustls::{crypto::ring::sign::any_supported_type, sign::CertifiedKey, Error};
use rustls::{crypto::aws_lc_rs::sign::any_supported_type, sign::CertifiedKey, Error};
pub(super) struct Tls {
certificate: PathBuf,