diff --git a/bridge_test.go b/bridge_test.go index 33d4716..7d62d13 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -26,7 +26,8 @@ func TestHandlePacket(t *testing.T) { }{ { name: "empty", - }, { + }, + { name: "simple counter", in: "foo:2|c", out: Events{ @@ -36,7 +37,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{}, }, }, - }, { + }, + { name: "simple gauge", in: "foo:3|g", out: Events{ @@ -46,7 +48,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{}, }, }, - }, { + }, + { name: "gauge decrement", in: "foo:-10|g", out: Events{ @@ -57,7 +60,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{}, }, }, - }, { + }, + { name: "simple timer", in: "foo:200|ms", out: Events{ @@ -67,7 +71,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{}, }, }, - }, { + }, + { name: "datadog tag extension", in: "foo:100|c|#tag1:bar,tag2:baz", out: Events{ @@ -77,7 +82,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, - }, { + }, + { name: "datadog tag extension with # in all keys (as sent by datadog php client)", in: "foo:100|c|#tag1:bar,#tag2:baz", out: Events{ @@ -87,7 +93,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, - }, { + }, + { name: "datadog tag extension with tag keys unsupported by prometheus", in: "foo:100|c|#09digits:0,tag.with.dots:1", out: Events{ @@ -97,7 +104,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{"_09digits": "0", "tag_with_dots": "1"}, }, }, - }, { + }, + { name: "datadog tag extension with valueless tags: ignored", in: "foo:100|c|#tag_without_a_value", out: Events{ @@ -107,7 +115,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{}, }, }, - }, { + }, + { name: "datadog tag extension with valueless tags (edge case)", in: "foo:100|c|#tag_without_a_value,tag:value", out: Events{ @@ -117,7 +126,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{"tag": "value"}, }, }, - }, { + }, + { name: "datadog tag extension with empty tags (edge case)", in: "foo:100|c|#tag:value,,", out: Events{ @@ -127,7 +137,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{"tag": "value"}, }, }, - }, { + }, + { name: "datadog tag extension with sampling", in: "foo:100|c|@0.1|#tag1:bar,#tag2:baz", out: Events{ @@ -137,7 +148,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, - }, { + }, + { name: "datadog tag extension with multiple colons", in: "foo:100|c|@0.1|#tag1:foo:bar", out: Events{ @@ -147,13 +159,16 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{"tag1": "foo:bar"}, }, }, - }, { + }, + { name: "datadog tag extension with invalid utf8 tag values", in: "foo:100|c|@0.1|#tag:\xc3\x28invalid", - }, { + }, + { name: "datadog tag extension with both valid and invalid utf8 tag values", in: "foo:100|c|@0.1|#tag1:valid,tag2:\xc3\x28invalid", - }, { + }, + { name: "multiple metrics with invalid datadog utf8 tag values", in: "foo:200|c|#tag:value\nfoo:300|c|#tag:\xc3\x28invalid", out: Events{ @@ -163,7 +178,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{"tag": "value"}, }, }, - }, { + }, + { name: "combined multiline metrics", in: "foo:200|ms:300|ms:5|c|@0.1:6|g\nbar:1|c:5|ms", out: Events{ @@ -198,7 +214,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{}, }, }, - }, { + }, + { name: "timings with sampling factor", in: "foo.timing:0.5|ms|@0.1", out: Events{ @@ -213,16 +230,20 @@ func TestHandlePacket(t *testing.T) { &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, }, - }, { + }, + { name: "bad line", in: "foo", - }, { + }, + { name: "bad component", in: "foo:1", - }, { + }, + { name: "bad value", in: "foo:1o|c", - }, { + }, + { name: "illegal sampling factor", in: "foo:1|c|@bar", out: Events{ @@ -232,7 +253,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{}, }, }, - }, { + }, + { name: "zero sampling factor", in: "foo:2|c|@0", out: Events{ @@ -242,7 +264,8 @@ func TestHandlePacket(t *testing.T) { labels: map[string]string{}, }, }, - }, { + }, + { name: "illegal stat type", in: "foo:2|t", }, @@ -288,7 +311,7 @@ func TestHandlePacket(t *testing.T) { } for j, expected := range scenario.out { - if !reflect.DeepEqual(&expected, &actual[j]) { + if !reflect.DeepEqual(&expected, &actual[j]) { //nolint:gosec t.Fatalf("%d.%d.%d. Expected %#v, got %#v in scenario '%s'", k, i, j, expected, actual[j], scenario.name) } } diff --git a/exporter.go b/exporter.go index 974f468..f0455c6 100644 --- a/exporter.go +++ b/exporter.go @@ -31,7 +31,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" - "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -56,7 +55,9 @@ func labelNames(labels prometheus.Labels) []string { for labelName := range labels { names = append(names, labelName) } + sort.Strings(names) + return names } @@ -72,6 +73,7 @@ func hashNameAndLabels(name string, labels prometheus.Labels) uint64 { hash.Write(strBuf.Bytes()) binary.BigEndian.PutUint64(intBuf, model.LabelsToSignature(labels)) hash.Write(intBuf) + return hash.Sum64() } @@ -93,12 +95,20 @@ func (c *CounterContainer) Get(metricName string, labels prometheus.Labels, help Name: metricName, Help: help, }, labelNames(labels)) + if err := prometheus.Register(counterVec); err != nil { return nil, fmt.Errorf("failed to register to prometheus:%w", err) } + c.Elements[metricName] = counterVec } - return counterVec.GetMetricWith(labels) + + cnt, err := counterVec.GetMetricWith(labels) + if err != nil { + return nil, fmt.Errorf("failed to get metrics:%w", err) + } + + return cnt, nil } func (c *CounterContainer) Delete(metricName string, labels prometheus.Labels) { @@ -125,11 +135,18 @@ func (c *GaugeContainer) Get(metricName string, labels prometheus.Labels, help s Help: help, }, labelNames(labels)) if err := prometheus.Register(gaugeVec); err != nil { - return nil, err + return nil, fmt.Errorf("could not register prometheus:%w", err) } + c.Elements[metricName] = gaugeVec } - return gaugeVec.GetMetricWith(labels) + + gv, err := gaugeVec.GetMetricWith(labels) + if err != nil { + return nil, fmt.Errorf("failed to get metrics width:%w", err) + } + + return gv, nil } func (c *GaugeContainer) Delete(metricName string, labels prometheus.Labels) { @@ -157,22 +174,33 @@ func (c *SummaryContainer) Get(metricName string, labels prometheus.Labels, help if mapping != nil && mapping.Quantiles != nil && len(mapping.Quantiles) > 0 { quantiles = mapping.Quantiles } + objectives := make(map[float64]float64) + for _, q := range quantiles { objectives[q.Quantile] = q.Error } + summaryVec = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: metricName, Help: help, Objectives: objectives, }, labelNames(labels)) + if err := prometheus.Register(summaryVec); err != nil { - return nil, err + return nil, fmt.Errorf("could not register to prometheus:%w", err) } + c.Elements[metricName] = summaryVec } - return summaryVec.GetMetricWith(labels) + + sv, err := summaryVec.GetMetricWith(labels) + if err != nil { + return nil, fmt.Errorf("failed to get metrics width:%w", err) + } + + return sv, nil } func (c *SummaryContainer) Delete(metricName string, labels prometheus.Labels) { @@ -200,18 +228,27 @@ func (c *HistogramContainer) Get(metricName string, labels prometheus.Labels, he if mapping != nil && mapping.Buckets != nil && len(mapping.Buckets) > 0 { buckets = mapping.Buckets } + histogramVec = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: metricName, Help: help, Buckets: buckets, }, labelNames(labels)) + if err := prometheus.Register(histogramVec); err != nil { - return nil, err + return nil, fmt.Errorf("could not register to prometheus:%w", err) } + c.Elements[metricName] = histogramVec } - return histogramVec.GetMetricWith(labels) + + po, err := histogramVec.GetMetricWith(labels) + if err != nil { + return nil, fmt.Errorf("failed to get metrics width:%w", err) + } + + return po, nil } func (c *HistogramContainer) Delete(metricName string, labels prometheus.Labels) { @@ -247,8 +284,8 @@ type GaugeEvent struct { func (g *GaugeEvent) MetricName() string { return g.metricName } func (g *GaugeEvent) Value() float64 { return g.value } -func (c *GaugeEvent) Labels() map[string]string { return c.labels } -func (c *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge } +func (g *GaugeEvent) Labels() map[string]string { return g.labels } +func (g *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge } type TimerEvent struct { metricName string @@ -258,8 +295,8 @@ type TimerEvent struct { func (t *TimerEvent) MetricName() string { return t.metricName } func (t *TimerEvent) Value() float64 { return t.value } -func (c *TimerEvent) Labels() map[string]string { return c.labels } -func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer } +func (t *TimerEvent) Labels() map[string]string { return t.labels } +func (t *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer } type Events []Event @@ -286,6 +323,7 @@ func escapeMetricName(metricName string) string { // Replace all illegal metric chars with underscores. metricName = illegalCharsRE.ReplaceAllString(metricName, "_") + return metricName } @@ -302,8 +340,10 @@ func (b *Exporter) Listen(e <-chan Events) { if !ok { log.Debug("Channel is closed. Break out of Exporter.Listener.") removeStaleMetricsTicker.Stop() + return } + for _, event := range events { b.handleEvent(event) } @@ -312,7 +352,7 @@ func (b *Exporter) Listen(e <-chan Events) { } // handleEvent processes a single Event according to the configured mapping. -func (b *Exporter) handleEvent(event Event) { +func (b *Exporter) handleEvent(event Event) { //nolint:gocognit,gocyclo mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType()) if mapping == nil { mapping = &mapper.MetricMapping{} @@ -330,10 +370,13 @@ func (b *Exporter) handleEvent(event Event) { help = mapping.HelpText } - metricName := "" + var metricName string + prometheusLabels := event.Labels() + if present { metricName = escapeMetricName(mapping.Name) + for label, value := range labels { prometheusLabels[label] = value } @@ -349,6 +392,7 @@ func (b *Exporter) handleEvent(event Event) { if event.Value() < 0.0 { log.Debugf("Counter %q is: '%f' (counter must be non-negative value)", metricName, event.Value()) eventStats.WithLabelValues("illegal_negative_counter").Inc() + return } @@ -379,7 +423,9 @@ func (b *Exporter) handleEvent(event Event) { } else { gauge.Set(event.Value()) } + b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl) + eventStats.WithLabelValues("gauge").Inc() } else { log.Debugf(regErrF, metricName, err) @@ -391,6 +437,7 @@ func (b *Exporter) handleEvent(event Event) { if mapping != nil { t = mapping.TimerType } + if t == mapper.TimerTypeDefault { t = b.mapper.Defaults.TimerType } @@ -404,7 +451,8 @@ func (b *Exporter) handleEvent(event Event) { mapping, ) if err == nil { - histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond + // prometheus presumes seconds, statsd millisecond + histogram.Observe(event.Value() / 1000) //nolint:gomnd b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("timer").Inc() } else { @@ -420,7 +468,8 @@ func (b *Exporter) handleEvent(event Event) { mapping, ) if err == nil { - summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond + // prometheus presumes seconds, statsd millisecond + summary.Observe(event.Value() / 1000) //nolint:gomnd b.saveLabelValues(metricName, prometheusLabels, mapping.Ttl) eventStats.WithLabelValues("timer").Inc() } else { @@ -438,7 +487,7 @@ func (b *Exporter) handleEvent(event Event) { } } -// removeStaleMetrics removes label values set from metric with stale values +// removeStaleMetrics removes label values set from metric with stale values. func (b *Exporter) removeStaleMetrics() { now := clock.Now() // delete timeseries with expired ttl @@ -447,6 +496,7 @@ func (b *Exporter) removeStaleMetrics() { if lvs.ttl == 0 { continue } + if lvs.lastRegisteredAt.Add(lvs.ttl).Before(now) { b.Counters.Delete(metricName, lvs.labels) b.Gauges.Delete(metricName, lvs.labels) @@ -458,14 +508,16 @@ func (b *Exporter) removeStaleMetrics() { } } -// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time and ttl value +// saveLabelValues stores label values set to labelValues and update lastRegisteredAt time and ttl value. func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, ttl time.Duration) { metric, hasMetric := b.labelValues[metricName] if !hasMetric { metric = make(map[uint64]*LabelValues) b.labelValues[metricName] = metric } + hash := hashNameAndLabels(metricName, labels) + metricLabelValues, ok := metric[hash] if !ok { metricLabelValues = &LabelValues{ @@ -474,6 +526,7 @@ func (b *Exporter) saveLabelValues(metricName string, labels prometheus.Labels, } b.labelValues[metricName][hash] = metricLabelValues } + now := clock.Now() metricLabelValues.lastRegisteredAt = now // Update ttl from mapping @@ -496,20 +549,20 @@ func buildEvent(statType, metric string, value float64, relative bool, labels ma case "c": return &CounterEvent{ metricName: metric, - value: float64(value), + value: float64(value), //nolint:unconvert labels: labels, }, nil case "g": return &GaugeEvent{ metricName: metric, - value: float64(value), + value: float64(value), //nolint:unconvert relative: relative, labels: labels, }, nil case "ms", "h": return &TimerEvent{ metricName: metric, - value: float64(value), + value: float64(value), //nolint:unconvert labels: labels, }, nil case "s": @@ -521,24 +574,28 @@ func buildEvent(statType, metric string, value float64, relative bool, labels ma func parseDogStatsDTagsToLabels(component string) map[string]string { labels := map[string]string{} - tagsReceived.Inc() tags := strings.Split(component, ",") + + tagsReceived.Inc() + for _, t := range tags { t = strings.TrimPrefix(t, "#") - kv := strings.SplitN(t, ":", 2) + kv := strings.SplitN(t, ":", 2) //nolint:gomnd if len(kv) < 2 || len(kv[1]) == 0 { tagErrors.Inc() log.Debugf("Malformed or empty DogStatsD tag %s in component %s", t, component) + continue } labels[escapeMetricName(kv[0])] = kv[1] } + return labels } -func lineToEvents(line string) Events { +func lineToEvents(line string) Events { //nolint:gocyclo,gocognit events := Events{} if line == "" { return events @@ -548,10 +605,14 @@ func lineToEvents(line string) Events { if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { sampleErrors.WithLabelValues("malformed_line").Inc() log.Debugln("Bad line from StatsD:", line) + return events } + metric := elements[0] + var samples []string + if strings.Contains(elements[1], "|#") { // using datadog extensions, disable multi-metrics samples = elements[1:] @@ -561,17 +622,18 @@ func lineToEvents(line string) Events { samples: for _, sample := range samples { samplesReceived.Inc() - samplingFactor := 1.0 + samplingFactor := 1.0 //nolint:ineffassign,wastedassign components := strings.Split(sample, "|") if len(components) < 2 || len(components) > 4 { sampleErrors.WithLabelValues("malformed_component").Inc() log.Debugln("Bad component on line:", line) + continue } valueStr, statType := components[0], components[1] - var relative = false + relative := false if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 { relative = true } @@ -580,16 +642,19 @@ samples: if err != nil { log.Debugf("Bad value %s on line: %s", valueStr, line) sampleErrors.WithLabelValues("malformed_value").Inc() + continue } multiplyEvents := 1 labels := map[string]string{} - if len(components) >= 3 { + + if len(components) >= 3 { //nolint:nestif,gomnd for _, component := range components[2:] { if len(component) == 0 { log.Debugln("Empty component on line: ", line) sampleErrors.WithLabelValues("malformed_component").Inc() + continue samples } } @@ -600,6 +665,7 @@ samples: if statType != "c" && statType != "ms" { log.Debugln("Illegal sampling factor for non-counter metric on line", line) sampleErrors.WithLabelValues("illegal_sample_factor").Inc() + continue } samplingFactor, err = strconv.ParseFloat(component[1:], 64) @@ -621,6 +687,7 @@ samples: default: log.Debugf("Invalid sampling factor or tag section %s on line %s", components[2], line) sampleErrors.WithLabelValues("invalid_sample_factor").Inc() + continue } } @@ -631,11 +698,13 @@ samples: if err != nil { log.Debugf("Error building event on line %s: %s", line, err) sampleErrors.WithLabelValues("illegal_event").Inc() + continue } events = append(events, event) } } + return events } @@ -645,7 +714,8 @@ type StatsDUDPListener struct { } func (l *StatsDUDPListener) Listen(e chan<- Events) { - buf := make([]byte, 65535) + buf := make([]byte, 65535) //nolint:gomnd + for { n, _, err := l.conn.ReadFromUDP(buf) if err != nil { @@ -662,10 +732,13 @@ func (l *StatsDUDPListener) Listen(e chan<- Events) { func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) { udpPackets.Inc() - lines := strings.Split(string(packet), "\n") + events := Events{} + + lines := strings.Split(string(packet), "\n") for _, line := range lines { linesReceived.Inc() + events = append(events, lineToEvents(line)...) } e <- events @@ -681,6 +754,7 @@ func (l *StatsDTCPListener) Listen(e chan<- Events) { if err != nil { log.Fatalf("AcceptTCP failed: %v", err) } + go l.handleConn(c, e) } } @@ -691,6 +765,7 @@ func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) { tcpConnections.Inc() r := bufio.NewReader(c) + for { line, isPrefix, err := r.ReadLine() if err != nil { @@ -698,13 +773,17 @@ func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) { tcpErrors.Inc() log.Debugf("Read %s failed: %v", c.RemoteAddr(), err) } + break } + if isPrefix { tcpLineTooLong.Inc() log.Debugf("Read %s failed: line too long", c.RemoteAddr()) + break } + linesReceived.Inc() e <- lineToEvents(string(line)) } diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 1c3104c..1090ae0 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -18,7 +18,7 @@ import ( "testing" ) -func benchmarkExporter(times int, b *testing.B) { +func benchmarkExporter(times int, b *testing.B) { //nolint:thelper input := []string{ "foo1:2|c", "foo2:3|g", @@ -31,12 +31,15 @@ func benchmarkExporter(times int, b *testing.B) { "foo15:200|ms:300|ms:5|c|@0.1:6|g\nfoo15a:1|c:5|ms", "some_very_useful_metrics_with_quite_a_log_name:13|c", } + bytesInput := make([]string, len(input)*times) + for run := 0; run < times; run++ { for i := 0; i < len(input); i++ { bytesInput[run*len(input)+i] = fmt.Sprintf("run%d%s", run, input[i]) } } + for n := 0; n < b.N; n++ { l := StatsDUDPListener{} // there are more events than input lines, need bigger buffer @@ -53,9 +56,11 @@ func benchmarkExporter(times int, b *testing.B) { func BenchmarkExporter1(b *testing.B) { benchmarkExporter(1, b) } + func BenchmarkExporter5(b *testing.B) { benchmarkExporter(5, b) } + func BenchmarkExporter50(b *testing.B) { benchmarkExporter(50, b) } diff --git a/exporter_test.go b/exporter_test.go index bb21d86..903121c 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -21,7 +21,6 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" - "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -31,7 +30,7 @@ import ( func TestNegativeCounter(t *testing.T) { defer func() { if e := recover(); e != nil { - err := e.(error) + err := e.(error) //nolint:forcetypeassert if err.Error() == "counter cannot decrease in value" { t.Fatalf("Counter was negative and causes a panic.") } else { @@ -59,11 +58,11 @@ func TestNegativeCounter(t *testing.T) { // TestInvalidUtf8InDatadogTagValue validates robustness of exporter listener // against datadog tags with invalid tag values. // It sends the same tags first with a valid value, then with an invalid one. -// The exporter should not panic, but drop the invalid event +// The exporter should not panic, but drop the invalid event. func TestInvalidUtf8InDatadogTagValue(t *testing.T) { defer func() { if e := recover(); e != nil { - err := e.(error) + err := e.(error) //nolint:forcetypeassert t.Fatalf("Exporter listener should not panic on bad utf8: %q", err.Error()) } }() @@ -74,6 +73,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events) } + close(events) }() @@ -108,10 +108,12 @@ func TestHistogramUnits(t *testing.T) { if err != nil { t.Fatalf("Cannot gather from DefaultGatherer: %v", err) } + value := getFloat64(metrics, name, prometheus.Labels{}) if value == nil { t.Fatal("Histogram value should not be nil") } + if *value == 300 { t.Fatalf("Histogram observations not scaled into Seconds") } else if *value != .300 { @@ -155,6 +157,7 @@ func (ml *mockStatsDTCPListener) handlePacket(packet []byte, e chan<- Events) { if err != nil { panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err)) } + ml.handleConn(sc, e) } @@ -178,7 +181,7 @@ func TestEscapeMetricName(t *testing.T) { // TestTtlExpiration validates expiration of time series. // foobar metric without mapping should expire with default ttl of 1s -// bazqux metric should expire with ttl of 2s +// bazqux metric should expire with ttl of 2s. func TestTtlExpiration(t *testing.T) { // Mock a time.NewTicker tickerCh := make(chan time.Time) @@ -196,10 +199,11 @@ mappings: ` // Create mapper from config and start an Exporter with a synchronous channel testMapper := &mapper.MetricMapper{} - err := testMapper.InitFromYAMLString(config) - if err != nil { + + if err := testMapper.InitFromYAMLString(config); err != nil { t.Fatalf("Config load error: %s %s", config, err) } + events := make(chan Events) defer close(events) @@ -222,9 +226,11 @@ mappings: }, } - var metrics []*dto.MetricFamily - var foobarValue *float64 - var bazquxValue *float64 + var ( + metrics []*dto.MetricFamily + foobarValue *float64 + bazquxValue *float64 + ) // Step 1. Send events with statsd metrics. // Send empty Events to wait for events are handled. @@ -234,18 +240,22 @@ mappings: events <- Events{} // Check values - metrics, err = prometheus.DefaultGatherer.Gather() + metrics, err := prometheus.DefaultGatherer.Gather() if err != nil { t.Fatal("Gather should not fail") } - foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) if foobarValue == nil || bazquxValue == nil { t.Fatalf("Gauge `foobar` and Summary `bazqux` should be gathered") } + if *foobarValue != 200 { t.Fatalf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue) } + if *bazquxValue != 42 { t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) } @@ -260,14 +270,17 @@ mappings: if err != nil { t.Fatal("Gather should not fail") } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) - bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) if foobarValue != nil { t.Fatalf("Gauge `foobar` should be expired") } + + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) if bazquxValue == nil { t.Fatalf("Summary `bazqux` should be gathered") } + if *bazquxValue != 42 { t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) } @@ -282,11 +295,14 @@ mappings: if err != nil { t.Fatal("Gather should not fail") } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) if bazquxValue != nil { t.Fatalf("Summary `bazqux` should be expired") } + if foobarValue != nil { t.Fatalf("Gauge `foobar` should not be gathered after expiration") } @@ -296,18 +312,22 @@ mappings: // Method returns a value or nil if metric is not found. func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 { var metricFamily *dto.MetricFamily + for _, m := range metrics { if *m.Name == name { metricFamily = m break } } + if metricFamily == nil { return nil } var metric *dto.Metric + labelsHash := hashNameAndLabels(name, labels) + for _, m := range metricFamily.Metric { h := hashNameAndLabels(name, labelPairsAsLabels(m.GetLabel())) if h == labelsHash { @@ -315,45 +335,57 @@ func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labe break } } + if metric == nil { return nil } var value float64 + if metric.Gauge != nil { value = metric.Gauge.GetValue() return &value } + if metric.Counter != nil { value = metric.Counter.GetValue() return &value } + if metric.Histogram != nil { value = metric.Histogram.GetSampleSum() return &value } + if metric.Summary != nil { value = metric.Summary.GetSampleSum() return &value } + if metric.Untyped != nil { value = metric.Untyped.GetValue() return &value } + panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric)) } func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) { labels = prometheus.Labels{} + for _, pair := range pairs { if pair.Name == nil { continue } + value := "" + if pair.Value != nil { value = *pair.Value } + labels[*pair.Name] = value } + return } diff --git a/main.go b/main.go index eda9097..d11fa19 100644 --- a/main.go +++ b/main.go @@ -26,19 +26,19 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/version" - "gopkg.in/alecthomas/kingpin.v2" - "github.com/prometheus/statsd_exporter/pkg/mapper" + "gopkg.in/alecthomas/kingpin.v2" ) -func init() { +func init() { //nolint:gochecknoinits prometheus.MustRegister(version.NewCollector("statsd_exporter")) } func serveHTTP(listenAddress, metricsEndpoint string) { //lint:ignore SA1019 prometheus.Handler() is deprecated. - http.Handle(metricsEndpoint, prometheus.Handler()) + http.Handle(metricsEndpoint, prometheus.Handler()) //nolint:staticcheck http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + //nolint:errcheck w.Write([]byte(` StatsD Exporter @@ -47,7 +47,8 @@ func serveHTTP(listenAddress, metricsEndpoint string) { `)) }) - log.Fatal(http.ListenAndServe(listenAddress, nil)) + + log.Fatal(http.ListenAndServe(listenAddress, nil)) //nolint:gosec } func ipPortFromString(addr string) (*net.IPAddr, int) { @@ -59,6 +60,7 @@ func ipPortFromString(addr string) (*net.IPAddr, int) { if host == "" { host = "0.0.0.0" } + ip, err := net.ResolveIPAddr("ip", host) if err != nil { log.Fatalf("Unable to resolve %s: %s", host, err) @@ -74,6 +76,7 @@ func ipPortFromString(addr string) (*net.IPAddr, int) { func udpAddrFromString(addr string) *net.UDPAddr { ip, port := ipPortFromString(addr) + return &net.UDPAddr{ IP: ip.IP, Port: port, @@ -83,6 +86,7 @@ func udpAddrFromString(addr string) *net.UDPAddr { func tcpAddrFromString(addr string) *net.TCPAddr { ip, port := ipPortFromString(addr) + return &net.TCPAddr{ IP: ip.IP, Port: port, @@ -105,6 +109,7 @@ func watchConfig(fileName string, mapper *mapper.MetricMapper) { select { case ev := <-watcher.Event: log.Infof("Config file changed (%s), attempting reload", ev) + err = mapper.InitFromFile(fileName) if err != nil { log.Errorln("Error reloading config:", err) @@ -128,12 +133,15 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string) error { if err != nil { return fmt.Errorf("could not create file:%w", err) } + log.Infoln("Start dumping FSM to", dumpFilename) + w := bufio.NewWriter(f) mapper.FSM.DumpFSM(w) w.Flush() f.Close() log.Infoln("Finish dumping FSM") + return nil } @@ -145,7 +153,7 @@ func main() { statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() - readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() + readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() //nolint:lll dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String() ) @@ -174,9 +182,10 @@ func main() { if *statsdListenUDP != "" { udpListenAddr := udpAddrFromString(*statsdListenUDP) + uconn, err := net.ListenUDP("udp", udpListenAddr) if err != nil { - log.Fatal(err) + log.Fatal(err) //nolint:gocritic } if *readBuffer != 0 { @@ -192,6 +201,7 @@ func main() { if *statsdListenTCP != "" { tcpListenAddr := tcpAddrFromString(*statsdListenTCP) + tconn, err := net.ListenTCP("tcp", tcpListenAddr) if err != nil { log.Fatal(err) @@ -208,20 +218,24 @@ func main() { if err != nil { log.Fatal("Error loading config:", err) } + if *dumpFSMPath != "" { err := dumpFSM(mapper, *dumpFSMPath) if err != nil { log.Fatal("Error dumping FSM:", err) } } + go watchConfig(*mappingConfig, mapper) } + exporter := NewExporter(mapper) exporter.Listen(events) } -func passthroughUdpConn(addr string) []net.Conn { - var out []net.Conn +func passthroughUdpConn(addr string) []net.Conn { //nolint:revive + var out []net.Conn //nolint:prealloc + addrs := strings.Split(addr, ",") for _, a := range addrs { diff --git a/pkg/mapper/fsm/formatter.go b/pkg/mapper/fsm/formatter.go index 567bbc2..44287ac 100644 --- a/pkg/mapper/fsm/formatter.go +++ b/pkg/mapper/fsm/formatter.go @@ -20,9 +20,7 @@ import ( "strings" ) -var ( - templateReplaceCaptureRE = regexp.MustCompile(`\$\{?([a-zA-Z0-9_\$]+)\}?`) -) +var templateReplaceCaptureRE = regexp.MustCompile(`\$\{?([a-zA-Z0-9_\$]+)\}?`) type TemplateFormatter struct { captureIndexes []int diff --git a/pkg/mapper/fsm/fsm.go b/pkg/mapper/fsm/fsm.go index cf8c00e..10d63c6 100644 --- a/pkg/mapper/fsm/fsm.go +++ b/pkg/mapper/fsm/fsm.go @@ -169,7 +169,8 @@ func (f *FSM) GetMapping(statsdMetric string, statsdMetricType string) (*mapping if !present || fieldsLeft > altState.maxRemainingLength || fieldsLeft < altState.minRemainingLength { } else { // push to backtracking stack - newCursor := fsmBacktrackStackCursor{prev: backtrackCursor, state: altState, + newCursor := fsmBacktrackStackCursor{ + prev: backtrackCursor, state: altState, fieldIndex: i, captureIndex: captureIdx, currentCapture: field, } diff --git a/pkg/mapper/mapper.go b/pkg/mapper/mapper.go index 4cb4fbe..5cb04dc 100644 --- a/pkg/mapper/mapper.go +++ b/pkg/mapper/mapper.go @@ -18,11 +18,11 @@ import ( "io/ioutil" "regexp" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/mapper/fsm" yaml "gopkg.in/yaml.v2" - "time" ) var ( diff --git a/pkg/mapper/mapper_test.go b/pkg/mapper/mapper_test.go index 8d1ea49..4a96a3e 100644 --- a/pkg/mapper/mapper_test.go +++ b/pkg/mapper/mapper_test.go @@ -141,7 +141,7 @@ mappings: }, }, }, - //Config with backtracking + // Config with backtracking { config: ` defaults: @@ -177,7 +177,7 @@ mappings: }, }, }, - //Config with backtracking, the non-matched rule has star(s) + // Config with backtracking, the non-matched rule has star(s) // A metric like full.name.anothertest will first match full.name.* and then tries // to match *.dummy.* and then failed. // This test case makes sure the captures in the non-matched later rule @@ -215,7 +215,7 @@ mappings: }, }, }, - //Config with super sets, disables ordering + // Config with super sets, disables ordering { config: ` defaults: @@ -249,7 +249,7 @@ mappings: }, }, }, - //Config with super sets, keeps ordering + // Config with super sets, keeps ordering { config: ` defaults: @@ -491,7 +491,7 @@ mappings: `, configBad: true, }, - //Config with uncompilable regex. + // Config with uncompilable regex. { config: `--- mappings: @@ -502,7 +502,7 @@ mappings: `, configBad: true, }, - //Config with non-matched metric. + // Config with non-matched metric. { config: `--- mappings: @@ -519,7 +519,7 @@ mappings: }, }, }, - //Config with no name. + // Config with no name. { config: `--- mappings: diff --git a/telemetry.go b/telemetry.go index 4e881aa..051f16e 100644 --- a/telemetry.go +++ b/telemetry.go @@ -104,7 +104,7 @@ var ( ) ) -func init() { +func init() { //nolint:gochecknoinits prometheus.MustRegister(eventStats) prometheus.MustRegister(eventsUnmapped) prometheus.MustRegister(udpPackets)