diff --git a/exporter.go b/exporter.go index e22f5e4..bad7386 100644 --- a/exporter.go +++ b/exporter.go @@ -40,9 +40,9 @@ const ( ) var ( - hash = fnv.New64a() - strBuf bytes.Buffer // Used for hashing. - intBuf = make([]byte, 8) + fnvHash = fnv.New64a() + strBuf bytes.Buffer // Used for hashing. + intBuf = make([]byte, 8) ) // uncheckedCollector wraps a Collector but its Describe method yields no Desc. @@ -56,15 +56,6 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) { u.c.Collect(c) } -type metricType int - -const ( - CounterMetricType metricType = iota - GaugeMetricType - SummaryMetricType - HistogramMetricType -) - type metricChecker interface { metricConflicts(string, metricType) bool } @@ -88,7 +79,7 @@ func getContainerMapKey(metricName string, labelNames []string) string { // Not safe for concurrent use! (Uses a shared buffer and hasher to save on // allocations.) func hashNameAndLabels(name string, labelNames []string, labels prometheus.Labels) uint64 { - hash.Reset() + fnvHash.Reset() strBuf.Reset() strBuf.WriteString(name) strBuf.WriteByte(model.SeparatorByte) @@ -100,8 +91,8 @@ func hashNameAndLabels(name string, labelNames []string, labels prometheus.Label strBuf.WriteByte(model.SeparatorByte) } - hash.Write(strBuf.Bytes()) - return hash.Sum64() + fnvHash.Write(strBuf.Bytes()) + return fnvHash.Sum64() } type CounterContainer struct { @@ -356,6 +347,7 @@ type Exporter struct { Summaries *SummaryContainer Histograms *HistogramContainer mapper *mapper.MetricMapper + registry *registry labelValues map[string]map[uint64]*LabelValues } @@ -423,6 +415,7 @@ func (b *Exporter) Listen(e <-chan Events) { select { case <-removeStaleMetricsTicker.C: b.removeStaleMetrics() + b.registry.removeStaleMetrics() case events, ok := <-e: if !ok { log.Debug("Channel is closed. Break out of Exporter.Listener.") @@ -474,7 +467,6 @@ func (b *Exporter) handleEvent(event Event) { metricName = escapeMetricName(event.MetricName()) } - sortedLabelNames := getSortedLabelNames(prometheusLabels) switch ev := event.(type) { case *CounterEvent: // We don't accept negative values for counters. Incrementing the counter with a negative number @@ -485,16 +477,9 @@ func (b *Exporter) handleEvent(event Event) { return } - counter, err := b.Counters.Get( - metricName, - sortedLabelNames, - prometheusLabels, - b, - help, - ) + counter, err := b.registry.getCounter(metricName, prometheusLabels, help, mapping) if err == nil { counter.Add(event.Value()) - b.saveLabelValues(metricName, CounterMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("counter").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -502,13 +487,7 @@ func (b *Exporter) handleEvent(event Event) { } case *GaugeEvent: - gauge, err := b.Gauges.Get( - metricName, - sortedLabelNames, - prometheusLabels, - b, - help, - ) + gauge, err := b.registry.getGauge(metricName, prometheusLabels, help, mapping) if err == nil { if ev.relative { @@ -516,7 +495,6 @@ func (b *Exporter) handleEvent(event Event) { } else { gauge.Set(event.Value()) } - b.saveLabelValues(metricName, GaugeMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("gauge").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -534,17 +512,9 @@ func (b *Exporter) handleEvent(event Event) { switch t { case mapper.TimerTypeHistogram: - histogram, err := b.Histograms.Get( - metricName, - sortedLabelNames, - prometheusLabels, - b, - help, - mapping, - ) + histogram, err := b.registry.getHistogram(metricName, prometheusLabels, help, mapping) if err == nil { histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond - b.saveLabelValues(metricName, HistogramMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -552,17 +522,9 @@ func (b *Exporter) handleEvent(event Event) { } case mapper.TimerTypeDefault, mapper.TimerTypeSummary: - summary, err := b.Summaries.Get( - metricName, - sortedLabelNames, - prometheusLabels, - b, - help, - mapping, - ) + summary, err := b.registry.getSummary(metricName, prometheusLabels, help, mapping) if err == nil { summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond - b.saveLabelValues(metricName, SummaryMetricType, sortedLabelNames, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("timer").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -653,6 +615,7 @@ func NewExporter(mapper *mapper.MetricMapper) *Exporter { Summaries: NewSummaryContainer(mapper), Histograms: NewHistogramContainer(mapper), mapper: mapper, + registry: newRegistry(mapper), labelValues: make(map[string]map[uint64]*LabelValues), } } diff --git a/exporter_test.go b/exporter_test.go index fc3c070..c449ca1 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -531,7 +531,7 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) { metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { - t.Fatal("Gather should not fail") + t.Fatal("Gather should not fail: ", err) } var metricFamily *dto.MetricFamily diff --git a/registry.go b/registry.go new file mode 100644 index 0000000..d8d1db7 --- /dev/null +++ b/registry.go @@ -0,0 +1,414 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "fmt" + "hash" + "hash/fnv" + "sort" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/statsd_exporter/pkg/clock" + "github.com/prometheus/statsd_exporter/pkg/mapper" +) + +type metricType int + +const ( + CounterMetricType metricType = iota + GaugeMetricType + SummaryMetricType + HistogramMetricType +) + +type nameHash uint64 +type valueHash uint64 +type labelHash struct { + // This is a hash over the label names + names nameHash + // This is a hash over the label names + label values + values valueHash +} + +type metricHolder interface{} + +type registeredMetric struct { + lastRegisteredAt time.Time + labels prometheus.Labels + ttl time.Duration + metric metricHolder + vecKey nameHash +} + +type vectorHolder interface { + Delete(label prometheus.Labels) bool +} + +type vector struct { + holder vectorHolder + refCount uint64 +} + +type metric struct { + metricType metricType + // Vectors key is the hash of the label names + vectors map[nameHash]*vector + // Metrics key is a hash of the label names + label values + metrics map[valueHash]*registeredMetric +} + +type registry struct { + metrics map[string]metric + mapper *mapper.MetricMapper + // The below value and label variables are allocated in the registry struct + // so that we don't have to allocate them every time have to compute a label + // hash. + valueBuf, nameBuf bytes.Buffer + valueHasher, nameHasher hash.Hash64 +} + +func newRegistry(mapper *mapper.MetricMapper) *registry { + return ®istry{ + metrics: make(map[string]metric), + mapper: mapper, + valueHasher: fnv.New64a(), + nameHasher: fnv.New64a(), + } +} + +func (r *registry) metricConflicts(metricName string, metricType metricType) bool { + vector, hasMetric := r.metrics[metricName] + if !hasMetric { + // No metric with this name exists + return false + } + + if vector.metricType == metricType { + // We've found a copy of this metric with this type, but different + // labels, so it's safe to create a new one. + return false + } + + // The metric exists, but it's of a different type than we're trying to + // create. + return true +} + +func (r *registry) storeCounter(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.CounterVec, c prometheus.Counter, ttl time.Duration) { + r.store(metricName, hash, labels, vec, c, CounterMetricType, ttl) +} + +func (r *registry) storeGauge(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.GaugeVec, g prometheus.Counter, ttl time.Duration) { + r.store(metricName, hash, labels, vec, g, GaugeMetricType, ttl) +} + +func (r *registry) storeHistogram(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.HistogramVec, o prometheus.Observer, ttl time.Duration) { + r.store(metricName, hash, labels, vec, o, HistogramMetricType, ttl) +} + +func (r *registry) storeSummary(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.SummaryVec, o prometheus.Observer, ttl time.Duration) { + r.store(metricName, hash, labels, vec, o, SummaryMetricType, ttl) +} + +func (r *registry) store(metricName string, hash labelHash, labels prometheus.Labels, vh vectorHolder, mh metricHolder, metricType metricType, ttl time.Duration) { + metric, hasMetric := r.metrics[metricName] + if !hasMetric { + metric.metricType = metricType + metric.vectors = make(map[nameHash]*vector) + metric.metrics = make(map[valueHash]*registeredMetric) + + r.metrics[metricName] = metric + } + + v, ok := metric.vectors[hash.names] + if !ok { + v = &vector{holder: vh} + metric.vectors[hash.names] = v + } + + rm, ok := metric.metrics[hash.values] + if !ok { + rm = ®isteredMetric{ + labels: labels, + ttl: ttl, + metric: mh, + vecKey: hash.names, + } + metric.metrics[hash.values] = rm + v.refCount++ + } + now := clock.Now() + rm.lastRegisteredAt = now + // Update ttl from mapping + rm.ttl = ttl +} + +func (r *registry) get(metricName string, hash labelHash, metricType metricType) (vectorHolder, metricHolder) { + metric, hasMetric := r.metrics[metricName] + + if !hasMetric { + return nil, nil + } + if metric.metricType != metricType { + return nil, nil + } + + rm, ok := metric.metrics[hash.values] + if ok { + return metric.vectors[hash.names].holder, rm.metric + } + + vector, ok := metric.vectors[hash.names] + if ok { + return vector.holder, nil + } + + return nil, nil +} + +func (r *registry) getCounter(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Counter, error) { + hash, labelNames := r.hashLabels(labels) + vh, mh := r.get(metricName, hash, CounterMetricType) + if mh != nil { + return mh.(prometheus.Counter), nil + } + + if r.metricConflicts(metricName, CounterMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + + var counterVec *prometheus.CounterVec + if vh == nil { + metricsCount.WithLabelValues("counter").Inc() + counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: metricName, + Help: help, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil { + return nil, err + } + } else { + counterVec = vh.(*prometheus.CounterVec) + } + + var counter prometheus.Counter + var err error + if counter, err = counterVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.storeCounter(metricName, hash, labels, counterVec, counter, mapping.Ttl) + + return counter, nil +} + +func (r *registry) getGauge(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Gauge, error) { + hash, labelNames := r.hashLabels(labels) + vh, mh := r.get(metricName, hash, GaugeMetricType) + if mh != nil { + return mh.(prometheus.Gauge), nil + } + + if r.metricConflicts(metricName, GaugeMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + + var gaugeVec *prometheus.GaugeVec + if vh == nil { + metricsCount.WithLabelValues("gauge").Inc() + gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: metricName, + Help: help, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil { + return nil, err + } + } else { + gaugeVec = vh.(*prometheus.GaugeVec) + } + + var gauge prometheus.Gauge + var err error + if gauge, err = gaugeVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.storeGauge(metricName, hash, labels, gaugeVec, gauge, mapping.Ttl) + + return gauge, nil +} + +func (r *registry) getHistogram(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { + hash, labelNames := r.hashLabels(labels) + vh, mh := r.get(metricName, hash, HistogramMetricType) + if mh != nil { + return mh.(prometheus.Observer), nil + } + + if r.metricConflicts(metricName, HistogramMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_sum", HistogramMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_count", HistogramMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_bucket", HistogramMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + + var histogramVec *prometheus.HistogramVec + if vh == nil { + metricsCount.WithLabelValues("histogram").Inc() + buckets := r.mapper.Defaults.Buckets + if mapping.Buckets != nil && len(mapping.Buckets) > 0 { + buckets = mapping.Buckets + } + histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: metricName, + Help: help, + Buckets: buckets, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil { + return nil, err + } + } else { + histogramVec = vh.(*prometheus.HistogramVec) + } + + var observer prometheus.Observer + var err error + if observer, err = histogramVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.storeHistogram(metricName, hash, labels, histogramVec, observer, mapping.Ttl) + + return observer, nil +} + +func (r *registry) getSummary(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { + hash, labelNames := r.hashLabels(labels) + vh, mh := r.get(metricName, hash, SummaryMetricType) + if mh != nil { + return mh.(prometheus.Observer), nil + } + + if r.metricConflicts(metricName, SummaryMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_sum", SummaryMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + if r.metricConflicts(metricName+"_count", SummaryMetricType) { + return nil, fmt.Errorf("metric with name %s is already registered", metricName) + } + + var summaryVec *prometheus.SummaryVec + if vh == nil { + metricsCount.WithLabelValues("summary").Inc() + quantiles := r.mapper.Defaults.Quantiles + if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 { + quantiles = mapping.Quantiles + } + objectives := make(map[float64]float64) + for _, q := range quantiles { + objectives[q.Quantile] = q.Error + } + // In the case of no mapping file, explicitly define the default quantiles + if len(objectives) == 0 { + objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} + } + summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: metricName, + Help: help, + Objectives: objectives, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil { + return nil, err + } + } else { + summaryVec = vh.(*prometheus.SummaryVec) + } + + var observer prometheus.Observer + var err error + if observer, err = summaryVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.storeSummary(metricName, hash, labels, summaryVec, observer, mapping.Ttl) + + return observer, nil +} + +func (r *registry) removeStaleMetrics() { + now := clock.Now() + // delete timeseries with expired ttl + for metricName, metric := range r.metrics { + for hash, rm := range metric.metrics { + if rm.ttl == 0 { + continue + } + if rm.lastRegisteredAt.Add(rm.ttl).Before(now) { + metric.vectors[rm.vecKey].holder.Delete(rm.labels) + metric.vectors[rm.vecKey].refCount-- + delete(metric.metrics, hash) + } + // If no individual instances exist, then delete the entire vector + if metric.vectors[rm.vecKey].refCount == 0 { + delete(r.metrics, metricName) + } + } + } +} + +// Calculates a hash of both the label names and the label names + label values. +// Note that this function goes to silly lengths to avoid allocating memory. +// This is in the fast path for every metric ingested and allocating and gc'ing +// temporary memory is expensive. +func (r *registry) hashLabels(labels prometheus.Labels) (labelHash, []string) { + r.nameHasher.Reset() + r.valueHasher.Reset() + r.nameBuf.Reset() + r.valueBuf.Reset() + labelNames := make([]string, 0, len(labels)) + + for labelName := range labels { + labelNames = append(labelNames, labelName) + } + sort.Strings(labelNames) + + r.valueBuf.WriteByte(model.SeparatorByte) + for _, labelName := range labelNames { + r.valueBuf.WriteString(labels[labelName]) + r.valueBuf.WriteByte(model.SeparatorByte) + + r.nameBuf.WriteString(labelName) + r.nameBuf.WriteByte(model.SeparatorByte) + } + + r.nameHasher.Write(r.nameBuf.Bytes()) + r.valueHasher.Write(r.nameBuf.Bytes()) + r.valueHasher.Write(r.valueBuf.Bytes()) + + return labelHash{ + names: nameHash(r.nameHasher.Sum64()), + values: valueHash(r.valueHasher.Sum64())}, labelNames +}