postgres: Rework job & upload notifications (more)

postgres: Add metrics to job push & pop, upload wait
sled: add upload wait metrics
This commit is contained in:
asonix 2023-09-04 16:19:46 -05:00
parent ac9777782a
commit 37448722de
8 changed files with 283 additions and 95 deletions

View file

@ -103,6 +103,7 @@ impl Details {
))
}
#[tracing::instrument(level = "DEBUG")]
pub(crate) async fn from_store<S: Store>(
store: &S,
identifier: &Arc<str>,

View file

@ -9,6 +9,7 @@ use crate::{
use super::Discovery;
#[tracing::instrument(level = "DEBUG", skip_all)]
pub(super) async fn check_reorient(
Discovery {
input,

View file

@ -97,6 +97,7 @@ pub(super) async fn confirm_bytes(
.await
}
#[tracing::instrument(level = "DEBUG", skip(f))]
async fn count_avif_frames<F, Fut>(f: F, timeout: u64) -> Result<u32, MagickError>
where
F: FnOnce(crate::file::File) -> Fut,
@ -147,6 +148,7 @@ where
Ok(lines)
}
#[tracing::instrument(level = "DEBUG", skip(f))]
async fn discover_file<F, Fut>(f: F, timeout: u64) -> Result<Discovery, MagickError>
where
F: FnOnce(crate::file::File) -> Fut,

View file

@ -1,3 +1,9 @@
mod alias;
mod delete_token;
mod hash;
mod metrics;
mod migrate;
use crate::{
config,
details::Details,
@ -9,10 +15,6 @@ use std::{fmt::Debug, sync::Arc};
use url::Url;
use uuid::Uuid;
mod alias;
mod delete_token;
mod hash;
mod migrate;
pub(crate) mod postgres;
pub(crate) mod sled;

74
src/repo/metrics.rs Normal file
View file

@ -0,0 +1,74 @@
use std::time::Instant;
pub(super) struct PushMetricsGuard {
queue: &'static str,
armed: bool,
}
pub(super) struct PopMetricsGuard {
queue: &'static str,
start: Instant,
armed: bool,
}
pub(super) struct WaitMetricsGuard {
start: Instant,
armed: bool,
}
impl PushMetricsGuard {
pub(super) fn guard(queue: &'static str) -> Self {
Self { queue, armed: true }
}
pub(super) fn disarm(mut self) {
self.armed = false;
}
}
impl PopMetricsGuard {
pub(super) fn guard(queue: &'static str) -> Self {
Self {
queue,
start: Instant::now(),
armed: true,
}
}
pub(super) fn disarm(mut self) {
self.armed = false;
}
}
impl WaitMetricsGuard {
pub(super) fn guard() -> Self {
Self {
start: Instant::now(),
armed: true,
}
}
pub(super) fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for PushMetricsGuard {
fn drop(&mut self) {
metrics::increment_counter!("pict-rs.queue.push", "completed" => (!self.armed).to_string(), "queue" => self.queue);
}
}
impl Drop for PopMetricsGuard {
fn drop(&mut self) {
metrics::histogram!("pict-rs.queue.pop.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string(), "queue" => self.queue);
metrics::increment_counter!("pict-rs.queue.pop", "completed" => (!self.armed).to_string(), "queue" => self.queue);
}
}
impl Drop for WaitMetricsGuard {
fn drop(&mut self) {
metrics::histogram!("pict-rs.upload.wait.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string());
metrics::increment_counter!("pict-rs.upload.wait", "completed" => (!self.armed).to_string());
}
}

View file

@ -3,11 +3,12 @@ mod job_status;
mod schema;
use std::{
collections::{BTreeSet, VecDeque},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Arc, Weak,
},
time::Duration,
time::{Duration, Instant},
};
use dashmap::DashMap;
@ -35,6 +36,7 @@ use crate::{
use self::job_status::JobStatus;
use super::{
metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard},
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, DetailsRepo,
FullRepo, Hash, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo,
QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult,
@ -52,35 +54,24 @@ struct Inner {
health_count: AtomicU64,
pool: Pool<AsyncPgConnection>,
queue_notifications: DashMap<String, Arc<Notify>>,
upload_notifier: Notify,
upload_notifications: DashMap<UploadId, Weak<Notify>>,
}
async fn delegate_notifications(receiver: flume::Receiver<Notification>, inner: Arc<Inner>) {
while let Ok(notification) = receiver.recv_async().await {
match notification.channel() {
"queue_status_channel" => {
// new job inserted for queue
let queue_name = notification.payload().to_string();
inner
.queue_notifications
.entry(queue_name)
.or_insert_with(crate::sync::notify)
.notify_waiters();
}
"upload_completion_channel" => {
inner.upload_notifier.notify_waiters();
}
channel => {
tracing::info!(
"Unhandled postgres notification: {channel}: {}",
notification.payload()
);
}
}
struct UploadInterest {
inner: Arc<Inner>,
interest: Option<Arc<Notify>>,
upload_id: UploadId,
}
tracing::warn!("Notification delegator shutting down");
struct JobNotifierState<'a> {
inner: &'a Inner,
capacity: usize,
jobs: BTreeSet<JobId>,
jobs_ordered: VecDeque<JobId>,
}
struct UploadNotifierState<'a> {
inner: &'a Inner,
}
#[derive(Debug, thiserror::Error)]
@ -166,7 +157,7 @@ impl PostgresRepo {
health_count: AtomicU64::new(0),
pool,
queue_notifications: DashMap::new(),
upload_notifier: crate::sync::bare_notify(),
upload_notifications: DashMap::new(),
});
let handle = crate::sync::spawn(delegate_notifications(rx, inner.clone()));
@ -184,10 +175,125 @@ impl PostgresRepo {
}
}
struct GetConnectionMetricsGuard {
start: Instant,
armed: bool,
}
impl GetConnectionMetricsGuard {
fn guard() -> Self {
GetConnectionMetricsGuard {
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for GetConnectionMetricsGuard {
fn drop(&mut self) {
metrics::increment_counter!("pict-rs.postgres.pool.get.end", "completed" => (!self.armed).to_string());
metrics::histogram!("pict-rs.postgres.pool.get.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string());
}
}
impl Inner {
#[tracing::instrument(level = "DEBUG", skip(self))]
#[tracing::instrument(level = "TRACE", skip(self))]
async fn get_connection(&self) -> Result<Object<AsyncPgConnection>, PostgresError> {
self.pool.get().await.map_err(PostgresError::Pool)
let guard = GetConnectionMetricsGuard::guard();
let res = self.pool.get().await.map_err(PostgresError::Pool);
guard.disarm();
res
}
fn interest(self: &Arc<Self>, upload_id: UploadId) -> UploadInterest {
let notify = crate::sync::notify();
self.upload_notifications
.insert(upload_id, Arc::downgrade(&notify));
UploadInterest {
inner: self.clone(),
interest: Some(notify),
upload_id,
}
}
}
impl UploadInterest {
async fn notified_timeout(&self, timeout: Duration) -> Result<(), tokio::time::error::Elapsed> {
actix_rt::time::timeout(
timeout,
self.interest.as_ref().expect("interest exists").notified(),
)
.await
}
}
impl Drop for UploadInterest {
fn drop(&mut self) {
if let Some(interest) = self.interest.take() {
if Arc::into_inner(interest).is_some() {
self.inner.upload_notifications.remove(&self.upload_id);
}
}
}
}
impl<'a> JobNotifierState<'a> {
fn handle(&mut self, payload: &str) {
let Some((job_id, queue_name)) = payload.split_once(' ') else {
tracing::warn!("Invalid queue payload {payload}");
return;
};
let Ok(job_id) = job_id.parse::<Uuid>().map(JobId) else {
tracing::warn!("Invalid job ID {job_id}");
return;
};
if !self.jobs.insert(job_id) {
// duplicate job
return;
}
self.jobs_ordered.push_back(job_id);
if self.jobs_ordered.len() > self.capacity {
if let Some(job_id) = self.jobs_ordered.pop_front() {
self.jobs.remove(&job_id);
}
}
self.inner
.queue_notifications
.entry(queue_name.to_string())
.or_insert_with(crate::sync::notify)
.notify_one();
metrics::increment_counter!("pict-rs.postgres.job-notifier.notified", "queue" => queue_name.to_string());
}
}
impl<'a> UploadNotifierState<'a> {
fn handle(&self, payload: &str) {
let Ok(upload_id) = payload.parse::<UploadId>() else {
tracing::warn!("Invalid upload id {payload}");
return;
};
if let Some(notifier) = self
.inner
.upload_notifications
.get(&upload_id)
.and_then(|weak| weak.upgrade())
{
notifier.notify_waiters();
metrics::increment_counter!("pict-rs.postgres.upload-notifier.notified");
}
}
}
@ -195,6 +301,44 @@ type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> +
type ConfigFn =
Box<dyn Fn(&str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> + Send + Sync + 'static>;
async fn delegate_notifications(receiver: flume::Receiver<Notification>, inner: Arc<Inner>) {
let parallelism = std::thread::available_parallelism()
.map(|u| u.into())
.unwrap_or(1_usize);
let mut job_notifier_state = JobNotifierState {
inner: &inner,
capacity: parallelism * 8,
jobs: BTreeSet::new(),
jobs_ordered: VecDeque::new(),
};
let upload_notifier_state = UploadNotifierState { inner: &inner };
while let Ok(notification) = receiver.recv_async().await {
metrics::increment_counter!("pict-rs.postgres.notification");
match notification.channel() {
"queue_status_channel" => {
// new job inserted for queue
job_notifier_state.handle(notification.payload());
}
"upload_completion_channel" => {
// new upload finished
upload_notifier_state.handle(notification.payload());
}
channel => {
tracing::info!(
"Unhandled postgres notification: {channel}: {}",
notification.payload()
);
}
}
}
tracing::warn!("Notification delegator shutting down");
}
fn build_handler(sender: flume::Sender<Notification>) -> ConfigFn {
Box::new(
move |config: &str| -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> {
@ -834,6 +978,8 @@ impl QueueRepo for PostgresRepo {
queue_name: &'static str,
job_json: serde_json::Value,
) -> Result<JobId, RepoError> {
let guard = PushMetricsGuard::guard(queue_name);
use schema::job_queue::dsl::*;
let mut conn = self.get_connection().await?;
@ -845,6 +991,8 @@ impl QueueRepo for PostgresRepo {
.await
.map_err(PostgresError::Diesel)?;
guard.disarm();
Ok(JobId(job_id))
}
@ -854,6 +1002,8 @@ impl QueueRepo for PostgresRepo {
queue_name: &'static str,
worker_id: Uuid,
) -> Result<(JobId, serde_json::Value), RepoError> {
let guard = PopMetricsGuard::guard(queue_name);
use schema::job_queue::dsl::*;
loop {
@ -923,6 +1073,7 @@ impl QueueRepo for PostgresRepo {
};
if let Some((job_id, job_json)) = opt {
guard.disarm();
return Ok((JobId(job_id), job_json));
}
@ -1334,9 +1485,14 @@ impl UploadRepo for PostgresRepo {
#[tracing::instrument(level = "DEBUG", skip(self))]
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
let guard = WaitMetricsGuard::guard();
use schema::uploads::dsl::*;
let interest = self.inner.interest(upload_id);
loop {
let interest_future = interest.notified_timeout(Duration::from_secs(5));
let mut conn = self.get_connection().await?;
diesel::sql_query("LISTEN upload_completion_channel;")
@ -1359,6 +1515,7 @@ impl UploadRepo for PostgresRepo {
serde_json::from_value(upload_result)
.map_err(PostgresError::DeserializeUploadResult)?;
guard.disarm();
return Ok(upload_result.into());
}
}
@ -1369,13 +1526,7 @@ impl UploadRepo for PostgresRepo {
drop(conn);
if actix_rt::time::timeout(
Duration::from_secs(5),
self.inner.upload_notifier.notified(),
)
.await
.is_ok()
{
if interest_future.await.is_ok() {
tracing::debug!("Notified");
} else {
tracing::debug!("Timed out");

View file

@ -34,7 +34,7 @@ CREATE OR REPLACE FUNCTION queue_status_notify()
RETURNS trigger AS
$$
BEGIN
PERFORM pg_notify('queue_status_channel', NEW.queue::text);
PERFORM pg_notify('queue_status_channel', NEW.id::text || ' ' || NEW.queue::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View file

@ -12,17 +12,18 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
time::Instant,
};
use tokio::sync::Notify;
use url::Url;
use uuid::Uuid;
use super::{
hash::Hash, Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken,
Details, DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash,
ProxyRepo, QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo,
UploadResult, VariantAccessRepo, VariantAlreadyExists,
hash::Hash,
metrics::{PopMetricsGuard, PushMetricsGuard, WaitMetricsGuard},
Alias, AliasAccessRepo, AliasAlreadyExists, AliasRepo, BaseRepo, DeleteToken, Details,
DetailsRepo, FullRepo, HashAlreadyExists, HashPage, HashRepo, JobId, OrderedHash, ProxyRepo,
QueueRepo, RepoError, SettingsRepo, StoreMigrationRepo, UploadId, UploadRepo, UploadResult,
VariantAccessRepo, VariantAlreadyExists,
};
macro_rules! b {
@ -490,54 +491,6 @@ impl From<InnerUploadResult> for UploadResult {
}
}
struct PushMetricsGuard {
queue: &'static str,
armed: bool,
}
struct PopMetricsGuard {
queue: &'static str,
start: Instant,
armed: bool,
}
impl PushMetricsGuard {
fn guard(queue: &'static str) -> Self {
Self { queue, armed: true }
}
fn disarm(mut self) {
self.armed = false;
}
}
impl PopMetricsGuard {
fn guard(queue: &'static str) -> Self {
Self {
queue,
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for PushMetricsGuard {
fn drop(&mut self) {
metrics::increment_counter!("pict-rs.queue.push", "completed" => (!self.armed).to_string(), "queue" => self.queue);
}
}
impl Drop for PopMetricsGuard {
fn drop(&mut self) {
metrics::histogram!("pict-rs.queue.pop.duration", self.start.elapsed().as_secs_f64(), "completed" => (!self.armed).to_string(), "queue" => self.queue);
metrics::increment_counter!("pict-rs.queue.pop", "completed" => (!self.armed).to_string(), "queue" => self.queue);
}
}
#[async_trait::async_trait(?Send)]
impl UploadRepo for SledRepo {
#[tracing::instrument(level = "trace", skip(self))]
@ -551,6 +504,7 @@ impl UploadRepo for SledRepo {
#[tracing::instrument(skip(self))]
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, RepoError> {
let guard = WaitMetricsGuard::guard();
let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes());
let bytes = upload_id.as_bytes().to_vec();
@ -560,6 +514,7 @@ impl UploadRepo for SledRepo {
if bytes != b"1" {
let result: InnerUploadResult =
serde_json::from_slice(&bytes).map_err(SledError::UploadResult)?;
guard.disarm();
return Ok(result.into());
}
} else {
@ -575,6 +530,8 @@ impl UploadRepo for SledRepo {
if value != b"1" {
let result: InnerUploadResult =
serde_json::from_slice(&value).map_err(SledError::UploadResult)?;
guard.disarm();
return Ok(result.into());
}
}