Measure bcrypt, change DashMap to RwLock<HashMap for collector

This commit is contained in:
asonix 2022-11-23 11:13:30 -06:00
parent 1a638f7f8d
commit e9f312bed5
2 changed files with 28 additions and 135 deletions

View file

@ -1,4 +1,3 @@
use dashmap::DashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
use metrics::{Key, Recorder, SetRecorderError}; use metrics::{Key, Recorder, SetRecorderError};
use metrics_util::{ use metrics_util::{
@ -8,7 +7,7 @@ use metrics_util::{
use quanta::Clock; use quanta::Clock;
use std::{ use std::{
collections::{BTreeMap, HashMap}, collections::{BTreeMap, HashMap},
sync::{atomic::Ordering, Arc}, sync::{atomic::Ordering, Arc, RwLock},
time::Duration, time::Duration,
}; };
@ -23,8 +22,8 @@ pub struct MemoryCollector {
} }
struct Inner { struct Inner {
descriptions: DashMap<String, metrics::SharedString>, descriptions: RwLock<HashMap<String, metrics::SharedString>>,
distributions: DashMap<String, IndexMap<Vec<(String, String)>, Summary>>, distributions: RwLock<HashMap<String, IndexMap<Vec<(String, String)>, Summary>>>,
recency: Recency<Key>, recency: Recency<Key>,
registry: Registry<Key, GenerationalStorage<AtomicStorage>>, registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
} }
@ -273,24 +272,24 @@ impl Inner {
.recency .recency
.should_store_histogram(&key, gen, &self.registry) .should_store_histogram(&key, gen, &self.registry)
{ {
let delete_by_name = if let Some(mut by_name) = self.distributions.get_mut(&name) { let mut d = self.distributions.write().unwrap();
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
by_name.remove(&labels); by_name.remove(&labels);
by_name.is_empty() by_name.is_empty()
} else { } else {
false false
}; };
drop(d);
if delete_by_name { if delete_by_name {
self.descriptions.remove(&name); self.descriptions.write().unwrap().remove(&name);
} }
continue; continue;
} }
let mut outer_entry = self let mut d = self.distributions.write().unwrap();
.distributions let outer_entry = d.entry(name.clone()).or_insert_with(IndexMap::new);
.entry(name.clone())
.or_insert_with(IndexMap::new);
let entry = outer_entry let entry = outer_entry
.entry(labels) .entry(labels)
@ -303,16 +302,16 @@ impl Inner {
}) })
} }
self.distributions let d = self.distributions.read().unwrap().clone();
.iter() let h = d
.map(|entry| { .into_iter()
.map(|(key, value)| {
( (
entry.key().clone(), key,
entry value
.value() .into_iter()
.iter()
.map(|(labels, summary)| Histogram { .map(|(labels, summary)| Histogram {
labels: labels.iter().cloned().collect(), labels: labels.into_iter().collect(),
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0] value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
.into_iter() .into_iter()
.map(|q| (q, summary.quantile(q))) .map(|q| (q, summary.quantile(q)))
@ -321,7 +320,9 @@ impl Inner {
.collect(), .collect(),
) )
}) })
.collect() .collect();
h
} }
fn snapshot(&self) -> Snapshot { fn snapshot(&self) -> Snapshot {
@ -362,10 +363,8 @@ impl MemoryCollector {
key: &metrics::KeyName, key: &metrics::KeyName,
description: metrics::SharedString, description: metrics::SharedString,
) { ) {
self.inner let mut d = self.inner.descriptions.write().unwrap();
.descriptions d.entry(key.as_str().to_owned()).or_insert(description);
.entry(key.as_str().to_owned())
.or_insert(description);
} }
} }
@ -415,114 +414,3 @@ impl Recorder for MemoryCollector {
.get_or_create_histogram(key, |c| c.clone().into()) .get_or_create_histogram(key, |c| c.clone().into())
} }
} }
/*
struct Bucket {
begin: Instant,
summary: Summary,
}
pub(crate) struct RollingSummary {
buckets: Vec<Bucket>,
bucket_duration: Duration,
expire_after: Duration,
count: usize,
}
impl Default for RollingSummary {
fn default() -> Self {
Self::new(
Duration::from_secs(5 * MINUTES),
Duration::from_secs(1 * DAYS),
)
}
}
impl RollingSummary {
fn new(bucket_duration: Duration, expire_after: Duration) -> Self {
Self {
buckets: Vec::new(),
bucket_duration,
expire_after,
count: 0,
}
}
fn add(&mut self, value: f64, now: Instant) {
self.count += 1;
// try adding to existing bucket
for bucket in &mut self.buckets {
let end = bucket.begin + self.bucket_duration;
if now >= end {
break;
}
if now >= bucket.begin {
bucket.summary.add(value);
return;
}
}
// if we're adding a new bucket, clean old buckets first
if let Some(cutoff) = now.checked_sub(self.expire_after) {
self.buckets.retain(|b| b.begin > cutoff);
}
let mut summary = Summary::with_defaults();
summary.add(value);
// if there's no buckets, make one and return
if self.buckets.is_empty() {
self.buckets.push(Bucket {
summary,
begin: now,
});
return;
}
let mut begin = self.buckets[0].begin;
// there are buckets, but none can hold our value, see why
if now < self.buckets[0].begin {
// create an old bucket
while now < begin {
begin -= self.bucket_duration;
}
self.buckets.push(Bucket { begin, summary });
self.buckets.sort_unstable_by(|a, b| b.begin.cmp(&a.begin));
} else {
// create a new bucket
let mut end = self.buckets[0].begin + self.bucket_duration;
while now >= end {
begin += self.bucket_duration;
end += self.bucket_duration;
}
self.buckets.insert(0, Bucket { begin, summary });
}
}
fn snapshot(&self, now: Instant) -> Summary {
let cutoff = now.checked_sub(self.expire_after);
let mut acc = Summary::with_defaults();
let summaries = self
.buckets
.iter()
.filter(|b| cutoff.map(|c| b.begin > c).unwrap_or(true))
.map(|b| &b.summary);
for item in summaries {
acc.merge(item)
.expect("All summaries are created with default settings");
}
acc
}
}
*/

View file

@ -11,7 +11,7 @@ use actix_web::{
use bcrypt::{BcryptError, DEFAULT_COST}; use bcrypt::{BcryptError, DEFAULT_COST};
use futures_util::future::LocalBoxFuture; use futures_util::future::LocalBoxFuture;
use http_signature_normalization_actix::prelude::InvalidHeaderValue; use http_signature_normalization_actix::prelude::InvalidHeaderValue;
use std::{convert::Infallible, str::FromStr}; use std::{convert::Infallible, str::FromStr, time::Instant};
use tracing_error::SpanTrace; use tracing_error::SpanTrace;
use crate::db::Db; use crate::db::Db;
@ -178,10 +178,15 @@ impl FromRequest for Admin {
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let now = Instant::now();
let res = Self::prepare_verify(req); let res = Self::prepare_verify(req);
Box::pin(async move { Box::pin(async move {
let (db, c, t) = res?; let (db, c, t) = res?;
Self::verify(c, t).await?; Self::verify(c, t).await?;
metrics::histogram!(
"relay.admin.verify",
now.elapsed().as_micros() as f64 / 1_000_000_f64
);
Ok(Admin { db }) Ok(Admin { db })
}) })
} }