diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..6931bfb --- /dev/null +++ b/.gitattributes @@ -0,0 +1,4 @@ +# Managing line ending conversions +# See http://git-scm.com/docs/gitattributes#_end-of-line_conversion +* text=auto +* eol=lf diff --git a/.gitignore b/.gitignore index e06c0a0..b9f1b6b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ dependencies-stamp /.deps /.release /.tarballs +*~ diff --git a/bridge_test.go b/bridge_test.go index c8862df..f45cefa 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -14,281 +14,291 @@ package main import ( + "fmt" + "net" "reflect" "testing" + "time" "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/statsd_exporter/pkg/clock" + "github.com/prometheus/statsd_exporter/pkg/event" + "github.com/prometheus/statsd_exporter/pkg/exporter" + "github.com/prometheus/statsd_exporter/pkg/listener" + "github.com/prometheus/statsd_exporter/pkg/mapper" ) func TestHandlePacket(t *testing.T) { scenarios := []struct { name string in string - out Events + out event.Events }{ { name: "empty", }, { name: "simple counter", in: "foo:2|c", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 2, - labels: map[string]string{}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 2, + CLabels: map[string]string{}, }, }, }, { name: "simple gauge", in: "foo:3|g", - out: Events{ - &GaugeEvent{ - metricName: "foo", - value: 3, - labels: map[string]string{}, + out: event.Events{ + &event.GaugeEvent{ + GMetricName: "foo", + GValue: 3, + GLabels: map[string]string{}, }, }, }, { name: "gauge with sampling", in: "foo:3|g|@0.2", - out: Events{ - &GaugeEvent{ - metricName: "foo", - value: 3, - labels: map[string]string{}, + out: event.Events{ + &event.GaugeEvent{ + GMetricName: "foo", + GValue: 3, + GLabels: map[string]string{}, }, }, }, { name: "gauge decrement", in: "foo:-10|g", - out: Events{ - &GaugeEvent{ - metricName: "foo", - value: -10, - relative: true, - labels: map[string]string{}, + out: event.Events{ + &event.GaugeEvent{ + GMetricName: "foo", + GValue: -10, + GRelative: true, + GLabels: map[string]string{}, }, }, }, { name: "simple timer", in: "foo:200|ms", - out: Events{ - &TimerEvent{ - metricName: "foo", - value: 200, - labels: map[string]string{}, + out: event.Events{ + &event.TimerEvent{ + TMetricName: "foo", + TValue: 200, + TLabels: map[string]string{}, }, }, }, { name: "simple histogram", in: "foo:200|h", - out: Events{ - &TimerEvent{ - metricName: "foo", - value: 200, - labels: map[string]string{}, + out: event.Events{ + &event.TimerEvent{ + TMetricName: "foo", + TValue: 200, + TLabels: map[string]string{}, }, }, }, { name: "simple distribution", in: "foo:200|d", - out: Events{ - &TimerEvent{ - metricName: "foo", - value: 200, - labels: map[string]string{}, + out: event.Events{ + &event.TimerEvent{ + TMetricName: "foo", + TValue: 200, + TLabels: map[string]string{}, }, }, }, { name: "distribution with sampling", in: "foo:0.01|d|@0.2|#tag1:bar,#tag2:baz", - out: Events{ - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + out: event.Events{ + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, }, { name: "librato tag extension", in: "foo#tag1=bar,tag2=baz:100|c", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, }, { name: "librato tag extension with tag keys unsupported by prometheus", in: "foo#09digits=0,tag.with.dots=1:100|c", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{"_09digits": "0", "tag_with_dots": "1"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{"_09digits": "0", "tag_with_dots": "1"}, }, }, }, { name: "influxdb tag extension", in: "foo,tag1=bar,tag2=baz:100|c", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, }, { name: "influxdb tag extension with tag keys unsupported by prometheus", in: "foo,09digits=0,tag.with.dots=1:100|c", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{"_09digits": "0", "tag_with_dots": "1"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{"_09digits": "0", "tag_with_dots": "1"}, }, }, }, { name: "datadog tag extension", in: "foo:100|c|#tag1:bar,tag2:baz", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, }, { name: "datadog tag extension with # in all keys (as sent by datadog php client)", in: "foo:100|c|#tag1:bar,#tag2:baz", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, }, { name: "datadog tag extension with tag keys unsupported by prometheus", in: "foo:100|c|#09digits:0,tag.with.dots:1", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{"_09digits": "0", "tag_with_dots": "1"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{"_09digits": "0", "tag_with_dots": "1"}, }, }, }, { name: "datadog tag extension with valueless tags: ignored", in: "foo:100|c|#tag_without_a_value", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{}, }, }, }, { name: "datadog tag extension with valueless tags (edge case)", in: "foo:100|c|#tag_without_a_value,tag:value", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{"tag": "value"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{"tag": "value"}, }, }, }, { name: "datadog tag extension with empty tags (edge case)", in: "foo:100|c|#tag:value,,", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 100, - labels: map[string]string{"tag": "value"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{"tag": "value"}, }, }, }, { name: "datadog tag extension with sampling", in: "foo:100|c|@0.1|#tag1:bar,#tag2:baz", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 1000, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 1000, + CLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, }, { name: "librato/dogstatsd mixed tag styles without sampling", in: "foo#tag1=foo,tag3=bing:100|c|#tag1:bar,#tag2:baz", - out: Events{}, + out: event.Events{}, }, { name: "influxdb/dogstatsd mixed tag styles without sampling", in: "foo,tag1=foo,tag3=bing:100|c|#tag1:bar,#tag2:baz", - out: Events{}, + out: event.Events{}, }, { name: "mixed tag styles with sampling", in: "foo#tag1=foo,tag3=bing:100|c|@0.1|#tag1:bar,#tag2:baz", - out: Events{}, + out: event.Events{}, }, { name: "histogram with sampling", in: "foo:0.01|h|@0.2|#tag1:bar,#tag2:baz", - out: Events{ - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + out: event.Events{ + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, - &TimerEvent{ - metricName: "foo", - value: 0.01, - labels: map[string]string{"tag1": "bar", "tag2": "baz"}, + &event.TimerEvent{ + TMetricName: "foo", + TValue: 0.01, + TLabels: map[string]string{"tag1": "bar", "tag2": "baz"}, }, }, }, { name: "datadog tag extension with multiple colons", in: "foo:100|c|@0.1|#tag1:foo:bar", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 1000, - labels: map[string]string{"tag1": "foo:bar"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 1000, + CLabels: map[string]string{"tag1": "foo:bar"}, }, }, }, { @@ -300,62 +310,62 @@ func TestHandlePacket(t *testing.T) { }, { name: "multiple metrics with invalid datadog utf8 tag values", in: "foo:200|c|#tag:value\nfoo:300|c|#tag:\xc3\x28invalid", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 200, - labels: map[string]string{"tag": "value"}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 200, + CLabels: map[string]string{"tag": "value"}, }, }, }, { name: "combined multiline metrics", in: "foo:200|ms:300|ms:5|c|@0.1:6|g\nbar:1|c:5|ms", - out: Events{ - &TimerEvent{ - metricName: "foo", - value: 200, - labels: map[string]string{}, + out: event.Events{ + &event.TimerEvent{ + TMetricName: "foo", + TValue: 200, + TLabels: map[string]string{}, }, - &TimerEvent{ - metricName: "foo", - value: 300, - labels: map[string]string{}, + &event.TimerEvent{ + TMetricName: "foo", + TValue: 300, + TLabels: map[string]string{}, }, - &CounterEvent{ - metricName: "foo", - value: 50, - labels: map[string]string{}, + &event.CounterEvent{ + CMetricName: "foo", + CValue: 50, + CLabels: map[string]string{}, }, - &GaugeEvent{ - metricName: "foo", - value: 6, - labels: map[string]string{}, + &event.GaugeEvent{ + GMetricName: "foo", + GValue: 6, + GLabels: map[string]string{}, }, - &CounterEvent{ - metricName: "bar", - value: 1, - labels: map[string]string{}, + &event.CounterEvent{ + CMetricName: "bar", + CValue: 1, + CLabels: map[string]string{}, }, - &TimerEvent{ - metricName: "bar", - value: 5, - labels: map[string]string{}, + &event.TimerEvent{ + TMetricName: "bar", + TValue: 5, + TLabels: map[string]string{}, }, }, }, { name: "timings with sampling factor", in: "foo.timing:0.5|ms|@0.1", - out: Events{ - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, - &TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}}, + out: event.Events{ + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, + &event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}}, }, }, { name: "bad line", @@ -369,21 +379,21 @@ func TestHandlePacket(t *testing.T) { }, { name: "illegal sampling factor", in: "foo:1|c|@bar", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 1, - labels: map[string]string{}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 1, + CLabels: map[string]string{}, }, }, }, { name: "zero sampling factor", in: "foo:2|c|@0", - out: Events{ - &CounterEvent{ - metricName: "foo", - value: 2, - labels: map[string]string{}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: 2, + CLabels: map[string]string{}, }, }, }, { @@ -405,25 +415,49 @@ func TestHandlePacket(t *testing.T) { { name: "some invalid utf8", in: "valid_utf8:1|c\ninvalid\xc3\x28utf8:1|c", - out: Events{ - &CounterEvent{ - metricName: "valid_utf8", - value: 1, - labels: map[string]string{}, + out: event.Events{ + &event.CounterEvent{ + CMetricName: "valid_utf8", + CValue: 1, + CLabels: map[string]string{}, }, }, }, } - for k, l := range []statsDPacketHandler{&StatsDUDPListener{nil, nil, log.NewNopLogger()}, &mockStatsDTCPListener{StatsDTCPListener{nil, nil, log.NewNopLogger()}, log.NewNopLogger()}} { - events := make(chan Events, 32) - l.SetEventHandler(&unbufferedEventHandler{c: events}) + for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ + Conn: nil, + EventHandler: nil, + Logger: log.NewNopLogger(), + UDPPackets: udpPackets, + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + }, &mockStatsDTCPListener{listener.StatsDTCPListener{ + Conn: nil, + EventHandler: nil, + Logger: log.NewNopLogger(), + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + TCPConnections: tcpConnections, + TCPErrors: tcpErrors, + TCPLineTooLong: tcpLineTooLong, + }, log.NewNopLogger()}} { + events := make(chan event.Events, 32) + l.SetEventHandler(&event.UnbufferedEventHandler{C: events}) for i, scenario := range scenarios { - l.handlePacket([]byte(scenario.in)) + l.HandlePacket([]byte(scenario.in)) le := len(events) // Flatten actual events. - actual := Events{} + actual := event.Events{} for i := 0; i < le; i++ { actual = append(actual, <-events...) } @@ -440,3 +474,225 @@ func TestHandlePacket(t *testing.T) { } } } + +type statsDPacketHandler interface { + HandlePacket(packet []byte) + SetEventHandler(eh event.EventHandler) +} + +type mockStatsDTCPListener struct { + listener.StatsDTCPListener + log.Logger +} + +func (ml *mockStatsDTCPListener) HandlePacket(packet []byte) { + // Forcing IPv4 because the TravisCI build environment does not have IPv6 + // addresses. + lc, err := net.ListenTCP("tcp4", nil) + if err != nil { + panic(fmt.Sprintf("mockStatsDTCPListener: listen failed: %v", err)) + } + + defer lc.Close() + + go func() { + cc, err := net.DialTCP("tcp", nil, lc.Addr().(*net.TCPAddr)) + if err != nil { + panic(fmt.Sprintf("mockStatsDTCPListener: dial failed: %v", err)) + } + + defer cc.Close() + + n, err := cc.Write(packet) + if err != nil || n != len(packet) { + panic(fmt.Sprintf("mockStatsDTCPListener: write failed: %v,%d", err, n)) + } + }() + + sc, err := lc.AcceptTCP() + if err != nil { + panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err)) + } + ml.HandleConn(sc) +} + +// TestTtlExpiration validates expiration of time series. +// foobar metric without mapping should expire with default ttl of 1s +// bazqux metric should expire with ttl of 2s +func TestTtlExpiration(t *testing.T) { + // Mock a time.NewTicker + tickerCh := make(chan time.Time) + clock.ClockInstance = &clock.Clock{ + TickerCh: tickerCh, + } + + config := ` +defaults: + ttl: 1s +mappings: +- match: bazqux.* + name: bazqux + ttl: 2s +` + // Create mapper from config and start an Exporter with a synchronous channel + testMapper := &mapper.MetricMapper{} + err := testMapper.InitFromYAMLString(config, 0) + if err != nil { + t.Fatalf("Config load error: %s %s", config, err) + } + events := make(chan event.Events) + defer close(events) + go func() { + ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Listen(events) + }() + + ev := event.Events{ + // event with default ttl = 1s + &event.GaugeEvent{ + GMetricName: "foobar", + GValue: 200, + }, + // event with ttl = 2s from a mapping + &event.TimerEvent{ + TMetricName: "bazqux.main", + TValue: 42000, + }, + } + + var metrics []*dto.MetricFamily + var foobarValue *float64 + var bazquxValue *float64 + + // Step 1. Send events with statsd metrics. + // Send empty Events to wait for events are handled. + // saveLabelValues will use fake instant as a lastRegisteredAt time. + clock.ClockInstance.Instant = time.Unix(0, 0) + events <- ev + events <- event.Events{} + + // Check values + metrics, err = prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal("Gather should not fail") + } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + if foobarValue == nil || bazquxValue == nil { + t.Fatalf("Gauge `foobar` and Summary `bazqux` should be gathered") + } + if *foobarValue != 200 { + t.Fatalf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue) + } + if *bazquxValue != 42 { + t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + } + + // Step 2. Increase Instant to emulate metrics expiration after 1s + clock.ClockInstance.Instant = time.Unix(1, 10) + clock.ClockInstance.TickerCh <- time.Unix(0, 0) + events <- event.Events{} + + // Check values + metrics, err = prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal("Gather should not fail") + } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + if foobarValue != nil { + t.Fatalf("Gauge `foobar` should be expired") + } + if bazquxValue == nil { + t.Fatalf("Summary `bazqux` should be gathered") + } + if *bazquxValue != 42 { + t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue) + } + + // Step 3. Increase Instant to emulate metrics expiration after 2s + clock.ClockInstance.Instant = time.Unix(2, 200) + clock.ClockInstance.TickerCh <- time.Unix(0, 0) + events <- event.Events{} + + // Check values + metrics, err = prometheus.DefaultGatherer.Gather() + if err != nil { + t.Fatal("Gather should not fail") + } + foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{}) + bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{}) + if bazquxValue != nil { + t.Fatalf("Summary `bazqux` should be expired") + } + if foobarValue != nil { + t.Fatalf("Gauge `foobar` should not be gathered after expiration") + } +} + +// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels. +// Method returns a value or nil if metric is not found. +func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 { + var metricFamily *dto.MetricFamily + for _, m := range metrics { + if *m.Name == name { + metricFamily = m + break + } + } + if metricFamily == nil { + return nil + } + + var metric *dto.Metric + labelStr := fmt.Sprintf("%v", labels) + for _, m := range metricFamily.Metric { + l := labelPairsAsLabels(m.GetLabel()) + ls := fmt.Sprintf("%v", l) + if labelStr == ls { + metric = m + break + } + } + if metric == nil { + return nil + } + + var value float64 + if metric.Gauge != nil { + value = metric.Gauge.GetValue() + return &value + } + if metric.Counter != nil { + value = metric.Counter.GetValue() + return &value + } + if metric.Histogram != nil { + value = metric.Histogram.GetSampleSum() + return &value + } + if metric.Summary != nil { + value = metric.Summary.GetSampleSum() + return &value + } + if metric.Untyped != nil { + value = metric.Untyped.GetValue() + return &value + } + panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric)) +} + +func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) { + labels = prometheus.Labels{} + for _, pair := range pairs { + if pair.Name == nil { + continue + } + value := "" + if pair.Value != nil { + value = *pair.Value + } + labels[*pair.Name] = value + } + return +} diff --git a/exporter.go b/exporter.go deleted file mode 100644 index e3984b7..0000000 --- a/exporter.go +++ /dev/null @@ -1,546 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "bufio" - "fmt" - "io" - "net" - "os" - "strconv" - "strings" - "time" - "unicode/utf8" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/client_golang/prometheus" - - "github.com/prometheus/statsd_exporter/pkg/clock" - "github.com/prometheus/statsd_exporter/pkg/mapper" -) - -const ( - defaultHelp = "Metric autogenerated by statsd_exporter." - regErrF = "Failed to update metric" -) - -// uncheckedCollector wraps a Collector but its Describe method yields no Desc. -// This allows incoming metrics to have inconsistent label sets -type uncheckedCollector struct { - c prometheus.Collector -} - -func (u uncheckedCollector) Describe(_ chan<- *prometheus.Desc) {} -func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) { - u.c.Collect(c) -} - -type Exporter struct { - mapper *mapper.MetricMapper - registry *registry - logger log.Logger -} - -// Listen handles all events sent to the given channel sequentially. It -// terminates when the channel is closed. -func (b *Exporter) Listen(e <-chan Events) { - removeStaleMetricsTicker := clock.NewTicker(time.Second) - - for { - select { - case <-removeStaleMetricsTicker.C: - b.registry.removeStaleMetrics() - case events, ok := <-e: - if !ok { - level.Debug(b.logger).Log("msg", "Channel is closed. Break out of Exporter.Listener.") - removeStaleMetricsTicker.Stop() - return - } - for _, event := range events { - b.handleEvent(event) - } - } - } -} - -// handleEvent processes a single Event according to the configured mapping. -func (b *Exporter) handleEvent(event Event) { - mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType()) - if mapping == nil { - mapping = &mapper.MetricMapping{} - if b.mapper.Defaults.Ttl != 0 { - mapping.Ttl = b.mapper.Defaults.Ttl - } - } - - if mapping.Action == mapper.ActionTypeDrop { - eventsActions.WithLabelValues("drop").Inc() - return - } - - help := defaultHelp - if mapping.HelpText != "" { - help = mapping.HelpText - } - - metricName := "" - prometheusLabels := event.Labels() - if present { - if mapping.Name == "" { - level.Debug(b.logger).Log("msg", "The mapping generates an empty metric name", "metric_name", event.MetricName(), "match", mapping.Match) - errorEventStats.WithLabelValues("empty_metric_name").Inc() - return - } - metricName = mapper.EscapeMetricName(mapping.Name) - for label, value := range labels { - prometheusLabels[label] = value - } - eventsActions.WithLabelValues(string(mapping.Action)).Inc() - } else { - eventsUnmapped.Inc() - metricName = mapper.EscapeMetricName(event.MetricName()) - } - - switch ev := event.(type) { - case *CounterEvent: - // We don't accept negative values for counters. Incrementing the counter with a negative number - // will cause the exporter to panic. Instead we will warn and continue to the next event. - if event.Value() < 0.0 { - level.Debug(b.logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", event.Value()) - errorEventStats.WithLabelValues("illegal_negative_counter").Inc() - return - } - - counter, err := b.registry.getCounter(metricName, prometheusLabels, help, mapping) - if err == nil { - counter.Add(event.Value()) - eventStats.WithLabelValues("counter").Inc() - } else { - level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("counter").Inc() - } - - case *GaugeEvent: - gauge, err := b.registry.getGauge(metricName, prometheusLabels, help, mapping) - - if err == nil { - if ev.relative { - gauge.Add(event.Value()) - } else { - gauge.Set(event.Value()) - } - eventStats.WithLabelValues("gauge").Inc() - } else { - level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("gauge").Inc() - } - - case *TimerEvent: - t := mapper.TimerTypeDefault - if mapping != nil { - t = mapping.TimerType - } - if t == mapper.TimerTypeDefault { - t = b.mapper.Defaults.TimerType - } - - switch t { - case mapper.TimerTypeHistogram: - histogram, err := b.registry.getHistogram(metricName, prometheusLabels, help, mapping) - if err == nil { - histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond - eventStats.WithLabelValues("timer").Inc() - } else { - level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("timer").Inc() - } - - case mapper.TimerTypeDefault, mapper.TimerTypeSummary: - summary, err := b.registry.getSummary(metricName, prometheusLabels, help, mapping) - if err == nil { - summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond - eventStats.WithLabelValues("timer").Inc() - } else { - level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("timer").Inc() - } - - default: - level.Error(b.logger).Log("msg", "unknown timer type", "type", t) - os.Exit(1) - } - - default: - level.Debug(b.logger).Log("msg", "Unsupported event type") - eventStats.WithLabelValues("illegal").Inc() - } -} - -func NewExporter(mapper *mapper.MetricMapper, logger log.Logger) *Exporter { - return &Exporter{ - mapper: mapper, - registry: newRegistry(mapper), - logger: logger, - } -} - -func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (Event, error) { - switch statType { - case "c": - return &CounterEvent{ - metricName: metric, - value: float64(value), - labels: labels, - }, nil - case "g": - return &GaugeEvent{ - metricName: metric, - value: float64(value), - relative: relative, - labels: labels, - }, nil - case "ms", "h", "d": - return &TimerEvent{ - metricName: metric, - value: float64(value), - labels: labels, - }, nil - case "s": - return nil, fmt.Errorf("no support for StatsD sets") - default: - return nil, fmt.Errorf("bad stat type %s", statType) - } -} - -func parseTag(component, tag string, separator rune, labels map[string]string, logger log.Logger) { - // Entirely empty tag is an error - if len(tag) == 0 { - tagErrors.Inc() - level.Debug(logger).Log("msg", "Empty name tag", "component", component) - return - } - - for i, c := range tag { - if c == separator { - k := tag[:i] - v := tag[i+1:] - - if len(k) == 0 || len(v) == 0 { - // Empty key or value is an error - tagErrors.Inc() - level.Debug(logger).Log("msg", "Malformed name tag", "k", k, "v", v, "component", component) - } else { - labels[mapper.EscapeMetricName(k)] = v - } - return - } - } - - // Missing separator (no value) is an error - tagErrors.Inc() - level.Debug(logger).Log("msg", "Malformed name tag", "tag", tag, "component", component) -} - -func parseNameTags(component string, labels map[string]string, logger log.Logger) { - lastTagEndIndex := 0 - for i, c := range component { - if c == ',' { - tag := component[lastTagEndIndex:i] - lastTagEndIndex = i + 1 - parseTag(component, tag, '=', labels, logger) - } - } - - // If we're not off the end of the string, add the last tag - if lastTagEndIndex < len(component) { - tag := component[lastTagEndIndex:] - parseTag(component, tag, '=', labels, logger) - } -} - -func trimLeftHash(s string) string { - if s != "" && s[0] == '#' { - return s[1:] - } - return s -} - -func parseDogStatsDTags(component string, labels map[string]string, logger log.Logger) { - lastTagEndIndex := 0 - for i, c := range component { - if c == ',' { - tag := component[lastTagEndIndex:i] - lastTagEndIndex = i + 1 - parseTag(component, trimLeftHash(tag), ':', labels, logger) - } - } - - // If we're not off the end of the string, add the last tag - if lastTagEndIndex < len(component) { - tag := component[lastTagEndIndex:] - parseTag(component, trimLeftHash(tag), ':', labels, logger) - } -} - -func parseNameAndTags(name string, labels map[string]string, logger log.Logger) string { - for i, c := range name { - // `#` delimits start of tags by Librato - // https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags - // `,` delimits start of tags by InfluxDB - // https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd - if c == '#' || c == ',' { - parseNameTags(name[i+1:], labels, logger) - return name[:i] - } - } - return name -} - -func lineToEvents(line string, logger log.Logger) Events { - events := Events{} - if line == "" { - return events - } - - elements := strings.SplitN(line, ":", 2) - if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { - sampleErrors.WithLabelValues("malformed_line").Inc() - level.Debug(logger).Log("msg", "Bad line from StatsD", "line", line) - return events - } - - labels := map[string]string{} - metric := parseNameAndTags(elements[0], labels, logger) - - var samples []string - if strings.Contains(elements[1], "|#") { - // using DogStatsD tags - - // don't allow mixed tagging styles - if len(labels) > 0 { - sampleErrors.WithLabelValues("mixed_tagging_styles").Inc() - level.Debug(logger).Log("msg", "Bad line (multiple tagging styles) from StatsD", "line", line) - return events - } - - // disable multi-metrics - samples = elements[1:] - } else { - samples = strings.Split(elements[1], ":") - } -samples: - for _, sample := range samples { - samplesReceived.Inc() - components := strings.Split(sample, "|") - samplingFactor := 1.0 - if len(components) < 2 || len(components) > 4 { - sampleErrors.WithLabelValues("malformed_component").Inc() - level.Debug(logger).Log("msg", "Bad component", "line", line) - continue - } - valueStr, statType := components[0], components[1] - - var relative = false - if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 { - relative = true - } - - value, err := strconv.ParseFloat(valueStr, 64) - if err != nil { - level.Debug(logger).Log("msg", "Bad value", "value", valueStr, "line", line) - sampleErrors.WithLabelValues("malformed_value").Inc() - continue - } - - multiplyEvents := 1 - if len(components) >= 3 { - for _, component := range components[2:] { - if len(component) == 0 { - level.Debug(logger).Log("msg", "Empty component", "line", line) - sampleErrors.WithLabelValues("malformed_component").Inc() - continue samples - } - } - - for _, component := range components[2:] { - switch component[0] { - case '@': - - samplingFactor, err = strconv.ParseFloat(component[1:], 64) - if err != nil { - level.Debug(logger).Log("msg", "Invalid sampling factor", "component", component[1:], "line", line) - sampleErrors.WithLabelValues("invalid_sample_factor").Inc() - } - if samplingFactor == 0 { - samplingFactor = 1 - } - - if statType == "g" { - continue - } else if statType == "c" { - value /= samplingFactor - } else if statType == "ms" || statType == "h" || statType == "d" { - multiplyEvents = int(1 / samplingFactor) - } - case '#': - parseDogStatsDTags(component[1:], labels, logger) - default: - level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line) - sampleErrors.WithLabelValues("invalid_sample_factor").Inc() - continue - } - } - } - - if len(labels) > 0 { - tagsReceived.Inc() - } - - for i := 0; i < multiplyEvents; i++ { - event, err := buildEvent(statType, metric, value, relative, labels) - if err != nil { - level.Debug(logger).Log("msg", "Error building event", "line", line, "error", err) - sampleErrors.WithLabelValues("illegal_event").Inc() - continue - } - events = append(events, event) - } - } - return events -} - -type StatsDUDPListener struct { - conn *net.UDPConn - eventHandler eventHandler - logger log.Logger -} - -func (l *StatsDUDPListener) SetEventHandler(eh eventHandler) { - l.eventHandler = eh -} - -func (l *StatsDUDPListener) Listen() { - buf := make([]byte, 65535) - for { - n, _, err := l.conn.ReadFromUDP(buf) - if err != nil { - // https://github.com/golang/go/issues/4373 - // ignore net: errClosing error as it will occur during shutdown - if strings.HasSuffix(err.Error(), "use of closed network connection") { - return - } - level.Error(l.logger).Log("error", err) - return - } - l.handlePacket(buf[0:n]) - } -} - -func (l *StatsDUDPListener) handlePacket(packet []byte) { - udpPackets.Inc() - lines := strings.Split(string(packet), "\n") - for _, line := range lines { - linesReceived.Inc() - l.eventHandler.queue(lineToEvents(line, l.logger)) - } -} - -type StatsDTCPListener struct { - conn *net.TCPListener - eventHandler eventHandler - logger log.Logger -} - -func (l *StatsDTCPListener) SetEventHandler(eh eventHandler) { - l.eventHandler = eh -} - -func (l *StatsDTCPListener) Listen() { - for { - c, err := l.conn.AcceptTCP() - if err != nil { - // https://github.com/golang/go/issues/4373 - // ignore net: errClosing error as it will occur during shutdown - if strings.HasSuffix(err.Error(), "use of closed network connection") { - return - } - level.Error(l.logger).Log("msg", "AcceptTCP failed", "error", err) - os.Exit(1) - } - go l.handleConn(c) - } -} - -func (l *StatsDTCPListener) handleConn(c *net.TCPConn) { - defer c.Close() - - tcpConnections.Inc() - - r := bufio.NewReader(c) - for { - line, isPrefix, err := r.ReadLine() - if err != nil { - if err != io.EOF { - tcpErrors.Inc() - level.Debug(l.logger).Log("msg", "Read failed", "addr", c.RemoteAddr(), "error", err) - } - break - } - if isPrefix { - tcpLineTooLong.Inc() - level.Debug(l.logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr()) - break - } - linesReceived.Inc() - l.eventHandler.queue(lineToEvents(string(line), l.logger)) - } -} - -type StatsDUnixgramListener struct { - conn *net.UnixConn - eventHandler eventHandler - logger log.Logger -} - -func (l *StatsDUnixgramListener) SetEventHandler(eh eventHandler) { - l.eventHandler = eh -} - -func (l *StatsDUnixgramListener) Listen() { - buf := make([]byte, 65535) - for { - n, _, err := l.conn.ReadFromUnix(buf) - if err != nil { - // https://github.com/golang/go/issues/4373 - // ignore net: errClosing error as it will occur during shutdown - if strings.HasSuffix(err.Error(), "use of closed network connection") { - return - } - level.Error(l.logger).Log(err) - os.Exit(1) - } - l.handlePacket(buf[:n]) - } -} - -func (l *StatsDUnixgramListener) handlePacket(packet []byte) { - unixgramPackets.Inc() - lines := strings.Split(string(packet), "\n") - for _, line := range lines { - linesReceived.Inc() - l.eventHandler.queue(lineToEvents(string(line), l.logger)) - } -} diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 1c86479..900941b 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -18,6 +18,9 @@ import ( "testing" "github.com/go-kit/kit/log" + "github.com/prometheus/statsd_exporter/pkg/event" + "github.com/prometheus/statsd_exporter/pkg/exporter" + "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -42,12 +45,12 @@ func benchmarkUDPListener(times int, b *testing.B) { } for n := 0; n < b.N; n++ { // there are more events than input lines, need bigger buffer - events := make(chan Events, len(bytesInput)*times*2) - l := StatsDUDPListener{eventHandler: &unbufferedEventHandler{c: events}} + events := make(chan event.Events, len(bytesInput)*times*2) + l := listener.StatsDUDPListener{EventHandler: &event.UnbufferedEventHandler{C: events}} for i := 0; i < times; i++ { for _, line := range bytesInput { - l.handlePacket([]byte(line)) + l.HandlePacket([]byte(line)) } } } @@ -64,52 +67,52 @@ func BenchmarkUDPListener50(b *testing.B) { } func BenchmarkExporterListener(b *testing.B) { - events := Events{ - &CounterEvent{ // simple counter - metricName: "counter", - value: 2, + events := event.Events{ + &event.CounterEvent{ // simple counter + CMetricName: "counter", + CValue: 2, }, - &GaugeEvent{ // simple gauge - metricName: "gauge", - value: 10, + &event.GaugeEvent{ // simple gauge + GMetricName: "gauge", + GValue: 10, }, - &TimerEvent{ // simple timer - metricName: "timer", - value: 200, + &event.TimerEvent{ // simple timer + TMetricName: "timer", + TValue: 200, }, - &TimerEvent{ // simple histogram - metricName: "histogram.test", - value: 200, + &event.TimerEvent{ // simple histogram + TMetricName: "histogram.test", + TValue: 200, }, - &CounterEvent{ // simple_tags - metricName: "simple_tags", - value: 100, - labels: map[string]string{ + &event.CounterEvent{ // simple_tags + CMetricName: "simple_tags", + CValue: 100, + CLabels: map[string]string{ "alpha": "bar", "bravo": "baz", }, }, - &CounterEvent{ // slightly different tags - metricName: "simple_tags", - value: 100, - labels: map[string]string{ + &event.CounterEvent{ // slightly different tags + CMetricName: "simple_tags", + CValue: 100, + CLabels: map[string]string{ "alpha": "bar", "charlie": "baz", }, }, - &CounterEvent{ // and even more different tags - metricName: "simple_tags", - value: 100, - labels: map[string]string{ + &event.CounterEvent{ // and even more different tags + CMetricName: "simple_tags", + CValue: 100, + CLabels: map[string]string{ "alpha": "bar", "bravo": "baz", "golf": "looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong", }, }, - &CounterEvent{ // datadog tag extension with complex tags - metricName: "foo", - value: 100, - labels: map[string]string{ + &event.CounterEvent{ // datadog tag extension with complex tags + CMetricName: "foo", + CValue: 100, + CLabels: map[string]string{ "action": "test", "application": "testapp", "application_component": "testcomp", @@ -139,9 +142,9 @@ mappings: b.Fatalf("Config load error: %s %s", config, err) } - ex := NewExporter(testMapper, log.NewNopLogger()) + ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) for i := 0; i < b.N; i++ { - ec := make(chan Events, 1000) + ec := make(chan event.Events, 1000) go func() { for i := 0; i < 1000; i++ { ec <- events diff --git a/main.go b/main.go index b36d3c4..7a31261 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,6 @@ package main import ( "bufio" - "fmt" "net" "net/http" _ "net/http/pprof" @@ -33,11 +32,171 @@ import ( "github.com/prometheus/common/version" "gopkg.in/alecthomas/kingpin.v2" + "github.com/prometheus/statsd_exporter/pkg/address" + "github.com/prometheus/statsd_exporter/pkg/event" + "github.com/prometheus/statsd_exporter/pkg/exporter" + "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" ) +const ( + defaultHelp = "Metric autogenerated by statsd_exporter." + regErrF = "Failed to update metric" +) + +var ( + eventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_total", + Help: "The total number of StatsD events seen.", + }, + []string{"type"}, + ) + eventsFlushed = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_event_queue_flushed_total", + Help: "Number of times events were flushed to exporter", + }, + ) + eventsUnmapped = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_unmapped_total", + Help: "The total number of StatsD events no mapping was found for.", + }) + udpPackets = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_udp_packets_total", + Help: "The total number of StatsD packets received over UDP.", + }, + ) + tcpConnections = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_connections_total", + Help: "The total number of TCP connections handled.", + }, + ) + tcpErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_connection_errors_total", + Help: "The number of errors encountered reading from TCP.", + }, + ) + tcpLineTooLong = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_too_long_lines_total", + Help: "The number of lines discarded due to being too long.", + }, + ) + unixgramPackets = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_unixgram_packets_total", + Help: "The total number of StatsD packets received over Unixgram.", + }, + ) + linesReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_lines_total", + Help: "The total number of StatsD lines received.", + }, + ) + samplesReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_samples_total", + Help: "The total number of StatsD samples received.", + }, + ) + sampleErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_sample_errors_total", + Help: "The total number of errors parsing StatsD samples.", + }, + []string{"reason"}, + ) + tagsReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tags_total", + Help: "The total number of DogStatsD tags processed.", + }, + ) + tagErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tag_errors_total", + Help: "The number of errors parsing DogStatsD tags.", + }, + ) + configLoads = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_config_reloads_total", + Help: "The number of configuration reloads.", + }, + []string{"outcome"}, + ) + mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "statsd_exporter_loaded_mappings", + Help: "The current number of configured metric mappings.", + }) + conflictingEventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_conflict_total", + Help: "The total number of StatsD events with conflicting names.", + }, + []string{"type"}, + ) + errorEventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_error_total", + Help: "The total number of StatsD events discarded due to errors.", + }, + []string{"reason"}, + ) + eventsActions = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_actions_total", + Help: "The total number of StatsD events by action.", + }, + []string{"action"}, + ) + metricsCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "statsd_exporter_metrics_total", + Help: "The total number of metrics.", + }, + []string{"type"}, + ) +) + func init() { prometheus.MustRegister(version.NewCollector("statsd_exporter")) + prometheus.MustRegister(eventStats) + prometheus.MustRegister(eventsFlushed) + prometheus.MustRegister(eventsUnmapped) + prometheus.MustRegister(udpPackets) + prometheus.MustRegister(tcpConnections) + prometheus.MustRegister(tcpErrors) + prometheus.MustRegister(tcpLineTooLong) + prometheus.MustRegister(unixgramPackets) + prometheus.MustRegister(linesReceived) + prometheus.MustRegister(samplesReceived) + prometheus.MustRegister(sampleErrors) + prometheus.MustRegister(tagsReceived) + prometheus.MustRegister(tagErrors) + prometheus.MustRegister(configLoads) + prometheus.MustRegister(mappingsCount) + prometheus.MustRegister(conflictingEventStats) + prometheus.MustRegister(errorEventStats) + prometheus.MustRegister(eventsActions) + prometheus.MustRegister(metricsCount) +} + +// uncheckedCollector wraps a Collector but its Describe method yields no Desc. +// This allows incoming metrics to have inconsistent label sets +type uncheckedCollector struct { + c prometheus.Collector +} + +func (u uncheckedCollector) Describe(_ chan<- *prometheus.Desc) {} +func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) { + u.c.Collect(c) } func serveHTTP(listenAddress, metricsEndpoint string, logger log.Logger) { @@ -55,52 +214,6 @@ func serveHTTP(listenAddress, metricsEndpoint string, logger log.Logger) { os.Exit(1) } -func ipPortFromString(addr string) (*net.IPAddr, int, error) { - host, portStr, err := net.SplitHostPort(addr) - if err != nil { - return nil, 0, fmt.Errorf("bad StatsD listening address: %s", addr) - } - - if host == "" { - host = "0.0.0.0" - } - ip, err := net.ResolveIPAddr("ip", host) - if err != nil { - return nil, 0, fmt.Errorf("Unable to resolve %s: %s", host, err) - } - - port, err := strconv.Atoi(portStr) - if err != nil || port < 0 || port > 65535 { - return nil, 0, fmt.Errorf("Bad port %s: %s", portStr, err) - } - - return ip, port, nil -} - -func udpAddrFromString(addr string) (*net.UDPAddr, error) { - ip, port, err := ipPortFromString(addr) - if err != nil { - return nil, err - } - return &net.UDPAddr{ - IP: ip.IP, - Port: port, - Zone: ip.Zone, - }, nil -} - -func tcpAddrFromString(addr string) (*net.TCPAddr, error) { - ip, port, err := ipPortFromString(addr) - if err != nil { - return nil, err - } - return &net.TCPAddr{ - IP: ip.IP, - Port: port, - Zone: ip.Zone, - }, nil -} - func configReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) { signals := make(chan os.Signal, 1) @@ -177,12 +290,12 @@ func main() { go serveHTTP(*listenAddress, *metricsEndpoint, logger) - events := make(chan Events, *eventQueueSize) + events := make(chan event.Events, *eventQueueSize) defer close(events) - eventQueue := newEventQueue(events, *eventFlushThreshold, *eventFlushInterval) + eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed) if *statsdListenUDP != "" { - udpListenAddr, err := udpAddrFromString(*statsdListenUDP) + udpListenAddr, err := address.UDPAddrFromString(*statsdListenUDP) if err != nil { level.Error(logger).Log("msg", "invalid UDP listen address", "address", *statsdListenUDP, "error", err) os.Exit(1) @@ -201,12 +314,24 @@ func main() { } } - ul := &StatsDUDPListener{conn: uconn, eventHandler: eventQueue, logger: logger} + ul := &listener.StatsDUDPListener{ + Conn: uconn, + EventHandler: eventQueue, + Logger: logger, + UDPPackets: udpPackets, + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + } + go ul.Listen() } if *statsdListenTCP != "" { - tcpListenAddr, err := tcpAddrFromString(*statsdListenTCP) + tcpListenAddr, err := address.TCPAddrFromString(*statsdListenTCP) if err != nil { level.Error(logger).Log("msg", "invalid TCP listen address", "address", *statsdListenUDP, "error", err) os.Exit(1) @@ -218,7 +343,21 @@ func main() { } defer tconn.Close() - tl := &StatsDTCPListener{conn: tconn, eventHandler: eventQueue, logger: logger} + tl := &listener.StatsDTCPListener{ + Conn: tconn, + EventHandler: eventQueue, + Logger: logger, + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + TCPConnections: tcpConnections, + TCPErrors: tcpErrors, + TCPLineTooLong: tcpLineTooLong, + } + go tl.Listen() } @@ -247,7 +386,19 @@ func main() { } } - ul := &StatsDUnixgramListener{conn: uxgconn, eventHandler: eventQueue, logger: logger} + ul := &listener.StatsDUnixgramListener{ + Conn: uxgconn, + EventHandler: eventQueue, + Logger: logger, + UnixgramPackets: unixgramPackets, + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + } + go ul.Listen() // if it's an abstract unix domain socket, it won't exist on fs @@ -291,7 +442,7 @@ func main() { go configReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption) - exporter := NewExporter(mapper, logger) + exporter := exporter.NewExporter(mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) diff --git a/pkg/address/address.go b/pkg/address/address.go new file mode 100644 index 0000000..75f4736 --- /dev/null +++ b/pkg/address/address.go @@ -0,0 +1,66 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package address + +import ( + "fmt" + "net" + "strconv" +) + +func IPPortFromString(addr string) (*net.IPAddr, int, error) { + host, portStr, err := net.SplitHostPort(addr) + if err != nil { + return nil, 0, fmt.Errorf("bad StatsD listening address: %s", addr) + } + + if host == "" { + host = "0.0.0.0" + } + ip, err := net.ResolveIPAddr("ip", host) + if err != nil { + return nil, 0, fmt.Errorf("Unable to resolve %s: %s", host, err) + } + + port, err := strconv.Atoi(portStr) + if err != nil || port < 0 || port > 65535 { + return nil, 0, fmt.Errorf("Bad port %s: %s", portStr, err) + } + + return ip, port, nil +} + +func UDPAddrFromString(addr string) (*net.UDPAddr, error) { + ip, port, err := IPPortFromString(addr) + if err != nil { + return nil, err + } + return &net.UDPAddr{ + IP: ip.IP, + Port: port, + Zone: ip.Zone, + }, nil +} + +func TCPAddrFromString(addr string) (*net.TCPAddr, error) { + ip, port, err := IPPortFromString(addr) + if err != nil { + return nil, err + } + return &net.TCPAddr{ + IP: ip.IP, + Port: port, + Zone: ip.Zone, + }, nil +} diff --git a/event.go b/pkg/event/event.go similarity index 56% rename from event.go rename to pkg/event/event.go index 8c9cc50..bc22629 100644 --- a/event.go +++ b/pkg/event/event.go @@ -11,12 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package event import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/clock" "github.com/prometheus/statsd_exporter/pkg/mapper" ) @@ -29,105 +30,108 @@ type Event interface { } type CounterEvent struct { - metricName string - value float64 - labels map[string]string + CMetricName string + CValue float64 + CLabels map[string]string } -func (c *CounterEvent) MetricName() string { return c.metricName } -func (c *CounterEvent) Value() float64 { return c.value } -func (c *CounterEvent) Labels() map[string]string { return c.labels } +func (c *CounterEvent) MetricName() string { return c.CMetricName } +func (c *CounterEvent) Value() float64 { return c.CValue } +func (c *CounterEvent) Labels() map[string]string { return c.CLabels } func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter } type GaugeEvent struct { - metricName string - value float64 - relative bool - labels map[string]string + GMetricName string + GValue float64 + GRelative bool + GLabels map[string]string } -func (g *GaugeEvent) MetricName() string { return g.metricName } -func (g *GaugeEvent) Value() float64 { return g.value } -func (c *GaugeEvent) Labels() map[string]string { return c.labels } +func (g *GaugeEvent) MetricName() string { return g.GMetricName } +func (g *GaugeEvent) Value() float64 { return g.GValue } +func (c *GaugeEvent) Labels() map[string]string { return c.GLabels } func (c *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge } type TimerEvent struct { - metricName string - value float64 - labels map[string]string + TMetricName string + TValue float64 + TLabels map[string]string } -func (t *TimerEvent) MetricName() string { return t.metricName } -func (t *TimerEvent) Value() float64 { return t.value } -func (c *TimerEvent) Labels() map[string]string { return c.labels } +func (t *TimerEvent) MetricName() string { return t.TMetricName } +func (t *TimerEvent) Value() float64 { return t.TValue } +func (c *TimerEvent) Labels() map[string]string { return c.TLabels } func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer } type Events []Event -type eventQueue struct { - c chan Events +type EventQueue struct { + C chan Events q Events m sync.Mutex - flushThreshold int flushTicker *time.Ticker + flushThreshold int + flushInterval time.Duration + eventsFlushed prometheus.Counter } -type eventHandler interface { - queue(event Events) +type EventHandler interface { + Queue(event Events) } -func newEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration) *eventQueue { +func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration, eventsFlushed prometheus.Counter) *EventQueue { ticker := clock.NewTicker(flushInterval) - eq := &eventQueue{ - c: c, + eq := &EventQueue{ + C: c, flushThreshold: flushThreshold, flushTicker: ticker, q: make([]Event, 0, flushThreshold), + eventsFlushed: eventsFlushed, } go func() { for { <-ticker.C - eq.flush() + eq.Flush() } }() return eq } -func (eq *eventQueue) queue(events Events) { +func (eq *EventQueue) Queue(events Events) { eq.m.Lock() defer eq.m.Unlock() for _, e := range events { eq.q = append(eq.q, e) if len(eq.q) >= eq.flushThreshold { - eq.flushUnlocked() + eq.FlushUnlocked() } } } -func (eq *eventQueue) flush() { +func (eq *EventQueue) Flush() { eq.m.Lock() defer eq.m.Unlock() - eq.flushUnlocked() + eq.FlushUnlocked() } -func (eq *eventQueue) flushUnlocked() { - eq.c <- eq.q +func (eq *EventQueue) FlushUnlocked() { + eq.C <- eq.q eq.q = make([]Event, 0, cap(eq.q)) - eventsFlushed.Inc() + eq.eventsFlushed.Inc() } -func (eq *eventQueue) len() int { +func (eq *EventQueue) Len() int { eq.m.Lock() defer eq.m.Unlock() return len(eq.q) } -type unbufferedEventHandler struct { - c chan Events +type UnbufferedEventHandler struct { + C chan Events } -func (ueh *unbufferedEventHandler) queue(events Events) { - ueh.c <- events +func (ueh *UnbufferedEventHandler) Queue(events Events) { + ueh.C <- events } diff --git a/event_test.go b/pkg/event/event_test.go similarity index 74% rename from event_test.go rename to pkg/event/event_test.go index 97a2722..d458ca5 100644 --- a/event_test.go +++ b/pkg/event/event_test.go @@ -11,22 +11,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package event import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/clock" ) +var eventsFlushed = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_event_queue_flushed_total", + Help: "Number of times events were flushed to exporter", + }, +) + func TestEventThresholdFlush(t *testing.T) { c := make(chan Events, 100) // We're not going to flush during this test, so the duration doesn't matter. - eq := newEventQueue(c, 5, time.Second) + eq := NewEventQueue(c, 5, time.Second, eventsFlushed) e := make(Events, 13) go func() { - eq.queue(e) + eq.Queue(e) }() batch := <-c @@ -52,25 +60,25 @@ func TestEventIntervalFlush(t *testing.T) { clock.ClockInstance.Instant = time.Unix(0, 0) c := make(chan Events, 100) - eq := newEventQueue(c, 1000, time.Second*1000) + eq := NewEventQueue(c, 1000, time.Second*1000, eventsFlushed) e := make(Events, 10) - eq.queue(e) + eq.Queue(e) - if eq.len() != 10 { - t.Fatal("Expected 10 events to be queued, but got", eq.len()) + if eq.Len() != 10 { + t.Fatal("Expected 10 events to be queued, but got", eq.Len()) } - if len(eq.c) != 0 { - t.Fatal("Expected 0 events in the event channel, but got", len(eq.c)) + if len(eq.C) != 0 { + t.Fatal("Expected 0 events in the event channel, but got", len(eq.C)) } // Tick time forward to trigger a flush clock.ClockInstance.Instant = time.Unix(10000, 0) clock.ClockInstance.TickerCh <- time.Unix(10000, 0) - events := <-eq.c - if eq.len() != 0 { - t.Fatal("Expected 0 events to be queued, but got", eq.len()) + events := <-eq.C + if eq.Len() != 0 { + t.Fatal("Expected 0 events to be queued, but got", eq.Len()) } if len(events) != 10 { diff --git a/pkg/exporter/exporter.go b/pkg/exporter/exporter.go new file mode 100644 index 0000000..66876cc --- /dev/null +++ b/pkg/exporter/exporter.go @@ -0,0 +1,196 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "os" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/clock" + "github.com/prometheus/statsd_exporter/pkg/event" + "github.com/prometheus/statsd_exporter/pkg/mapper" + "github.com/prometheus/statsd_exporter/pkg/registry" +) + +const ( + defaultHelp = "Metric autogenerated by statsd_exporter." + regErrF = "Failed to update metric" +) + +type Exporter struct { + Mapper *mapper.MetricMapper + Registry *registry.Registry + Logger log.Logger + EventsActions *prometheus.CounterVec + EventsUnmapped prometheus.Counter + ErrorEventStats *prometheus.CounterVec + EventStats *prometheus.CounterVec + ConflictingEventStats *prometheus.CounterVec + MetricsCount *prometheus.GaugeVec +} + +// Listen handles all events sent to the given channel sequentially. It +// terminates when the channel is closed. +func (b *Exporter) Listen(e <-chan event.Events) { + + removeStaleMetricsTicker := clock.NewTicker(time.Second) + + for { + select { + case <-removeStaleMetricsTicker.C: + b.Registry.RemoveStaleMetrics() + case events, ok := <-e: + if !ok { + level.Debug(b.Logger).Log("msg", "Channel is closed. Break out of Exporter.Listener.") + removeStaleMetricsTicker.Stop() + return + } + for _, event := range events { + b.handleEvent(event) + } + } + } +} + +// handleEvent processes a single Event according to the configured mapping. +func (b *Exporter) handleEvent(thisEvent event.Event) { + + mapping, labels, present := b.Mapper.GetMapping(thisEvent.MetricName(), thisEvent.MetricType()) + if mapping == nil { + mapping = &mapper.MetricMapping{} + if b.Mapper.Defaults.Ttl != 0 { + mapping.Ttl = b.Mapper.Defaults.Ttl + } + } + + if mapping.Action == mapper.ActionTypeDrop { + b.EventsActions.WithLabelValues("drop").Inc() + return + } + + metricName := "" + + help := defaultHelp + if mapping.HelpText != "" { + help = mapping.HelpText + } + + prometheusLabels := thisEvent.Labels() + if present { + if mapping.Name == "" { + level.Debug(b.Logger).Log("msg", "The mapping generates an empty metric name", "metric_name", thisEvent.MetricName(), "match", mapping.Match) + b.ErrorEventStats.WithLabelValues("empty_metric_name").Inc() + return + } + metricName = mapper.EscapeMetricName(mapping.Name) + for label, value := range labels { + prometheusLabels[label] = value + } + b.EventsActions.WithLabelValues(string(mapping.Action)).Inc() + } else { + b.EventsUnmapped.Inc() + metricName = mapper.EscapeMetricName(thisEvent.MetricName()) + } + + switch ev := thisEvent.(type) { + case *event.CounterEvent: + // We don't accept negative values for counters. Incrementing the counter with a negative number + // will cause the exporter to panic. Instead we will warn and continue to the next event. + if thisEvent.Value() < 0.0 { + level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value()) + b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc() + return + } + + counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount) + if err == nil { + counter.Add(thisEvent.Value()) + b.EventStats.WithLabelValues("counter").Inc() + } else { + level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) + b.ConflictingEventStats.WithLabelValues("counter").Inc() + } + + case *event.GaugeEvent: + gauge, err := b.Registry.GetGauge(metricName, prometheusLabels, help, mapping, b.MetricsCount) + + if err == nil { + if ev.GRelative { + gauge.Add(thisEvent.Value()) + } else { + gauge.Set(thisEvent.Value()) + } + b.EventStats.WithLabelValues("gauge").Inc() + } else { + level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) + b.ConflictingEventStats.WithLabelValues("gauge").Inc() + } + + case *event.TimerEvent: + t := mapper.TimerTypeDefault + if mapping != nil { + t = mapping.TimerType + } + if t == mapper.TimerTypeDefault { + t = b.Mapper.Defaults.TimerType + } + + switch t { + case mapper.TimerTypeHistogram: + histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount) + if err == nil { + histogram.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond + b.EventStats.WithLabelValues("timer").Inc() + } else { + level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) + b.ConflictingEventStats.WithLabelValues("timer").Inc() + } + + case mapper.TimerTypeDefault, mapper.TimerTypeSummary: + summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount) + if err == nil { + summary.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond + b.EventStats.WithLabelValues("timer").Inc() + } else { + level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) + b.ConflictingEventStats.WithLabelValues("timer").Inc() + } + + default: + level.Error(b.Logger).Log("msg", "unknown timer type", "type", t) + os.Exit(1) + } + + default: + level.Debug(b.Logger).Log("msg", "Unsupported event type") + b.EventStats.WithLabelValues("illegal").Inc() + } +} + +func NewExporter(mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter { + return &Exporter{ + Mapper: mapper, + Registry: registry.NewRegistry(mapper), + Logger: logger, + EventsActions: eventsActions, + EventsUnmapped: eventsUnmapped, + ErrorEventStats: errorEventStats, + EventStats: eventStats, + ConflictingEventStats: conflictingEventStats, + MetricsCount: metricsCount, + } +} diff --git a/exporter_test.go b/pkg/exporter/exporter_test.go similarity index 59% rename from exporter_test.go rename to pkg/exporter/exporter_test.go index c64a5a2..86cb040 100644 --- a/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package main +package exporter import ( "fmt" @@ -24,7 +24,132 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/prometheus/statsd_exporter/pkg/clock" + "github.com/prometheus/statsd_exporter/pkg/event" + "github.com/prometheus/statsd_exporter/pkg/line" + "github.com/prometheus/statsd_exporter/pkg/listener" "github.com/prometheus/statsd_exporter/pkg/mapper" + "github.com/prometheus/statsd_exporter/pkg/registry" +) + +var ( + eventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_total", + Help: "The total number of StatsD events seen.", + }, + []string{"type"}, + ) + eventsFlushed = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_event_queue_flushed_total", + Help: "Number of times events were flushed to exporter", + }, + ) + eventsUnmapped = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_unmapped_total", + Help: "The total number of StatsD events no mapping was found for.", + }) + udpPackets = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_udp_packets_total", + Help: "The total number of StatsD packets received over UDP.", + }, + ) + tcpConnections = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_connections_total", + Help: "The total number of TCP connections handled.", + }, + ) + tcpErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_connection_errors_total", + Help: "The number of errors encountered reading from TCP.", + }, + ) + tcpLineTooLong = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tcp_too_long_lines_total", + Help: "The number of lines discarded due to being too long.", + }, + ) + unixgramPackets = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_unixgram_packets_total", + Help: "The total number of StatsD packets received over Unixgram.", + }, + ) + linesReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_lines_total", + Help: "The total number of StatsD lines received.", + }, + ) + samplesReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_samples_total", + Help: "The total number of StatsD samples received.", + }, + ) + sampleErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_sample_errors_total", + Help: "The total number of errors parsing StatsD samples.", + }, + []string{"reason"}, + ) + tagsReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tags_total", + Help: "The total number of DogStatsD tags processed.", + }, + ) + tagErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_tag_errors_total", + Help: "The number of errors parsing DogStatsD tags.", + }, + ) + configLoads = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_config_reloads_total", + Help: "The number of configuration reloads.", + }, + []string{"outcome"}, + ) + mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "statsd_exporter_loaded_mappings", + Help: "The current number of configured metric mappings.", + }) + conflictingEventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_conflict_total", + Help: "The total number of StatsD events with conflicting names.", + }, + []string{"type"}, + ) + errorEventStats = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_error_total", + Help: "The total number of StatsD events discarded due to errors.", + }, + []string{"reason"}, + ) + eventsActions = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "statsd_exporter_events_actions_total", + Help: "The total number of StatsD events by action.", + }, + []string{"action"}, + ) + metricsCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "statsd_exporter_metrics_total", + Help: "The total number of metrics.", + }, + []string{"type"}, + ) ) // TestNegativeCounter validates when we send a negative @@ -41,12 +166,12 @@ func TestNegativeCounter(t *testing.T) { } }() - events := make(chan Events) + events := make(chan event.Events) go func() { - c := Events{ - &CounterEvent{ - metricName: "foo", - value: -1, + c := event.Events{ + &event.CounterEvent{ + CMetricName: "foo", + CValue: -1, }, } events <- c @@ -59,7 +184,7 @@ func TestNegativeCounter(t *testing.T) { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := NewExporter(&testMapper, log.NewNopLogger()) + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) updated := getTelemetryCounterValue(errorCounter) @@ -80,48 +205,48 @@ func TestInconsistentLabelSets(t *testing.T) { secondLabelSet["foo"] = "1" secondLabelSet["bar"] = "2" - events := make(chan Events) + events := make(chan event.Events) go func() { - c := Events{ - &CounterEvent{ - metricName: "counter_test", - value: 1, - labels: firstLabelSet, + c := event.Events{ + &event.CounterEvent{ + CMetricName: "counter_test", + CValue: 1, + CLabels: firstLabelSet, }, - &CounterEvent{ - metricName: "counter_test", - value: 1, - labels: secondLabelSet, + &event.CounterEvent{ + CMetricName: "counter_test", + CValue: 1, + CLabels: secondLabelSet, }, - &GaugeEvent{ - metricName: "gauge_test", - value: 1, - labels: firstLabelSet, + &event.GaugeEvent{ + GMetricName: "gauge_test", + GValue: 1, + GLabels: firstLabelSet, }, - &GaugeEvent{ - metricName: "gauge_test", - value: 1, - labels: secondLabelSet, + &event.GaugeEvent{ + GMetricName: "gauge_test", + GValue: 1, + GLabels: secondLabelSet, }, - &TimerEvent{ - metricName: "histogram.test", - value: 1, - labels: firstLabelSet, + &event.TimerEvent{ + TMetricName: "histogram.test", + TValue: 1, + TLabels: firstLabelSet, }, - &TimerEvent{ - metricName: "histogram.test", - value: 1, - labels: secondLabelSet, + &event.TimerEvent{ + TMetricName: "histogram.test", + TValue: 1, + TLabels: secondLabelSet, }, - &TimerEvent{ - metricName: "summary_test", - value: 1, - labels: firstLabelSet, + &event.TimerEvent{ + TMetricName: "summary_test", + TValue: 1, + TLabels: firstLabelSet, }, - &TimerEvent{ - metricName: "summary_test", - value: 1, - labels: secondLabelSet, + &event.TimerEvent{ + TMetricName: "summary_test", + TValue: 1, + TLabels: secondLabelSet, }, } events <- c @@ -140,7 +265,7 @@ mappings: t.Fatalf("Config load error: %s %s", config, err) } - ex := NewExporter(testMapper, log.NewNopLogger()) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) metrics, err := prometheus.DefaultGatherer.Gather() @@ -166,18 +291,18 @@ mappings: func TestLabelParsing(t *testing.T) { codes := [2]string{"200", "300"} - events := make(chan Events) + events := make(chan event.Events) go func() { - c := Events{ - &CounterEvent{ - metricName: "counter.test.200", - value: 1, - labels: make(map[string]string), + c := event.Events{ + &event.CounterEvent{ + CMetricName: "counter.test.200", + CValue: 1, + CLabels: make(map[string]string), }, - &CounterEvent{ - metricName: "counter.test.300", - value: 1, - labels: make(map[string]string), + &event.CounterEvent{ + CMetricName: "counter.test.300", + CValue: 1, + CLabels: make(map[string]string), }, } events <- c @@ -198,7 +323,7 @@ mappings: t.Fatalf("Config load error: %s %s", config, err) } - ex := NewExporter(testMapper, log.NewNopLogger()) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) metrics, err := prometheus.DefaultGatherer.Gather() @@ -222,173 +347,173 @@ func TestConflictingMetrics(t *testing.T) { scenarios := []struct { name string expected []float64 - in Events + in event.Events }{ { name: "counter vs gauge", expected: []float64{1}, - in: Events{ - &CounterEvent{ - metricName: "cvg_test", - value: 1, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "cvg_test", + CValue: 1, }, - &GaugeEvent{ - metricName: "cvg_test", - value: 2, + &event.GaugeEvent{ + GMetricName: "cvg_test", + GValue: 2, }, }, }, { name: "counter vs gauge with different labels", expected: []float64{1, 2}, - in: Events{ - &CounterEvent{ - metricName: "cvgl_test", - value: 1, - labels: map[string]string{"tag": "1"}, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "cvgl_test", + CValue: 1, + CLabels: map[string]string{"tag": "1"}, }, - &CounterEvent{ - metricName: "cvgl_test", - value: 2, - labels: map[string]string{"tag": "2"}, + &event.CounterEvent{ + CMetricName: "cvgl_test", + CValue: 2, + CLabels: map[string]string{"tag": "2"}, }, - &GaugeEvent{ - metricName: "cvgl_test", - value: 3, - labels: map[string]string{"tag": "1"}, + &event.GaugeEvent{ + GMetricName: "cvgl_test", + GValue: 3, + GLabels: map[string]string{"tag": "1"}, }, }, }, { name: "counter vs gauge with same labels", expected: []float64{3}, - in: Events{ - &CounterEvent{ - metricName: "cvgsl_test", - value: 1, - labels: map[string]string{"tag": "1"}, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "cvgsl_test", + CValue: 1, + CLabels: map[string]string{"tag": "1"}, }, - &CounterEvent{ - metricName: "cvgsl_test", - value: 2, - labels: map[string]string{"tag": "1"}, + &event.CounterEvent{ + CMetricName: "cvgsl_test", + CValue: 2, + CLabels: map[string]string{"tag": "1"}, }, - &GaugeEvent{ - metricName: "cvgsl_test", - value: 3, - labels: map[string]string{"tag": "1"}, + &event.GaugeEvent{ + GMetricName: "cvgsl_test", + GValue: 3, + GLabels: map[string]string{"tag": "1"}, }, }, }, { name: "gauge vs counter", expected: []float64{2}, - in: Events{ - &GaugeEvent{ - metricName: "gvc_test", - value: 2, + in: event.Events{ + &event.GaugeEvent{ + GMetricName: "gvc_test", + GValue: 2, }, - &CounterEvent{ - metricName: "gvc_test", - value: 1, + &event.CounterEvent{ + CMetricName: "gvc_test", + CValue: 1, }, }, }, { name: "counter vs histogram", expected: []float64{1}, - in: Events{ - &CounterEvent{ - metricName: "histogram_test1", - value: 1, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "histogram_test1", + CValue: 1, }, - &TimerEvent{ - metricName: "histogram.test1", - value: 2, + &event.TimerEvent{ + TMetricName: "histogram.test1", + TValue: 2, }, }, }, { name: "counter vs histogram sum", expected: []float64{1}, - in: Events{ - &CounterEvent{ - metricName: "histogram_test1_sum", - value: 1, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "histogram_test1_sum", + CValue: 1, }, - &TimerEvent{ - metricName: "histogram.test1", - value: 2, + &event.TimerEvent{ + TMetricName: "histogram.test1", + TValue: 2, }, }, }, { name: "counter vs histogram count", expected: []float64{1}, - in: Events{ - &CounterEvent{ - metricName: "histogram_test2_count", - value: 1, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "histogram_test2_count", + CValue: 1, }, - &TimerEvent{ - metricName: "histogram.test2", - value: 2, + &event.TimerEvent{ + TMetricName: "histogram.test2", + TValue: 2, }, }, }, { name: "counter vs histogram bucket", expected: []float64{1}, - in: Events{ - &CounterEvent{ - metricName: "histogram_test3_bucket", - value: 1, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "histogram_test3_bucket", + CValue: 1, }, - &TimerEvent{ - metricName: "histogram.test3", - value: 2, + &event.TimerEvent{ + TMetricName: "histogram.test3", + TValue: 2, }, }, }, { name: "counter vs summary quantile", expected: []float64{1}, - in: Events{ - &CounterEvent{ - metricName: "cvsq_test", - value: 1, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "cvsq_test", + CValue: 1, }, - &TimerEvent{ - metricName: "cvsq_test", - value: 2, + &event.TimerEvent{ + TMetricName: "cvsq_test", + TValue: 2, }, }, }, { name: "counter vs summary count", expected: []float64{1}, - in: Events{ - &CounterEvent{ - metricName: "cvsc_count", - value: 1, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "cvsc_count", + CValue: 1, }, - &TimerEvent{ - metricName: "cvsc", - value: 2, + &event.TimerEvent{ + TMetricName: "cvsc", + TValue: 2, }, }, }, { name: "counter vs summary sum", expected: []float64{1}, - in: Events{ - &CounterEvent{ - metricName: "cvss_sum", - value: 1, + in: event.Events{ + &event.CounterEvent{ + CMetricName: "cvss_sum", + CValue: 1, }, - &TimerEvent{ - metricName: "cvss", - value: 2, + &event.TimerEvent{ + TMetricName: "cvss", + TValue: 2, }, }, }, @@ -408,12 +533,12 @@ mappings: t.Fatalf("Config load error: %s %s", config, err) } - events := make(chan Events) + events := make(chan event.Events) go func() { events <- s.in close(events) }() - ex := NewExporter(testMapper, log.NewNopLogger()) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) metrics, err := prometheus.DefaultGatherer.Gather() @@ -441,12 +566,12 @@ mappings: // being the empty string after applying the match replacements // tha we don't panic the Exporter Listener. func TestEmptyStringMetric(t *testing.T) { - events := make(chan Events) + events := make(chan event.Events) go func() { - c := Events{ - &CounterEvent{ - metricName: "foo_bar", - value: 1, + c := event.Events{ + &event.CounterEvent{ + CMetricName: "foo_bar", + CValue: 1, }, } events <- c @@ -468,7 +593,7 @@ mappings: errorCounter := errorEventStats.WithLabelValues("empty_metric_name") prev := getTelemetryCounterValue(errorCounter) - ex := NewExporter(testMapper, log.NewNopLogger()) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) updated := getTelemetryCounterValue(errorCounter) @@ -489,13 +614,37 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { } }() - events := make(chan Events) - ueh := &unbufferedEventHandler{c: events} + events := make(chan event.Events) + ueh := &event.UnbufferedEventHandler{C: events} go func() { - for _, l := range []statsDPacketHandler{&StatsDUDPListener{nil, nil, log.NewNopLogger()}, &mockStatsDTCPListener{StatsDTCPListener{nil, nil, log.NewNopLogger()}, log.NewNopLogger()}} { + for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ + Conn: nil, + EventHandler: nil, + Logger: log.NewNopLogger(), + UDPPackets: udpPackets, + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + }, &mockStatsDTCPListener{listener.StatsDTCPListener{ + Conn: nil, + EventHandler: nil, + Logger: log.NewNopLogger(), + LinesReceived: linesReceived, + EventsFlushed: eventsFlushed, + SampleErrors: *sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + TCPConnections: tcpConnections, + TCPErrors: tcpErrors, + TCPLineTooLong: tcpLineTooLong, + }, log.NewNopLogger()}} { l.SetEventHandler(ueh) - l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid")) + l.HandlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid")) } close(events) }() @@ -503,7 +652,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := NewExporter(&testMapper, log.NewNopLogger()) + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) } @@ -512,24 +661,24 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { // as well as the sum/count metrics func TestSummaryWithQuantilesEmptyMapping(t *testing.T) { // Start exporter with a synchronous channel - events := make(chan Events) + events := make(chan event.Events) go func() { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := NewExporter(&testMapper, log.NewNopLogger()) + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) }() name := "default_foo" - c := Events{ - &TimerEvent{ - metricName: name, - value: 300, + c := event.Events{ + &event.TimerEvent{ + TMetricName: name, + TValue: 300, }, } events <- c - events <- Events{} + events <- event.Events{} close(events) metrics, err := prometheus.DefaultGatherer.Gather() @@ -557,26 +706,26 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) { func TestHistogramUnits(t *testing.T) { // Start exporter with a synchronous channel - events := make(chan Events) + events := make(chan event.Events) go func() { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := NewExporter(&testMapper, log.NewNopLogger()) - ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + ex.Mapper.Defaults.TimerType = mapper.TimerTypeHistogram ex.Listen(events) }() // Synchronously send a statsd event to wait for handleEvent execution. // Then close events channel to stop a listener. name := "foo" - c := Events{ - &TimerEvent{ - metricName: name, - value: 300, + c := event.Events{ + &event.TimerEvent{ + TMetricName: name, + TValue: 300, }, } events <- c - events <- Events{} + events <- event.Events{} close(events) // Check histogram value @@ -596,11 +745,11 @@ func TestHistogramUnits(t *testing.T) { } func TestCounterIncrement(t *testing.T) { // Start exporter with a synchronous channel - events := make(chan Events) + events := make(chan event.Events) go func() { testMapper := mapper.MetricMapper{} testMapper.InitCache(0) - ex := NewExporter(&testMapper, log.NewNopLogger()) + ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) }() @@ -610,21 +759,21 @@ func TestCounterIncrement(t *testing.T) { labels := map[string]string{ "foo": "bar", } - c := Events{ - &CounterEvent{ - metricName: name, - value: 1, - labels: labels, + c := event.Events{ + &event.CounterEvent{ + CMetricName: name, + CValue: 1, + CLabels: labels, }, - &CounterEvent{ - metricName: name, - value: 1, - labels: labels, + &event.CounterEvent{ + CMetricName: name, + CValue: 1, + CLabels: labels, }, } events <- c // Push empty event so that we block until the first event is consumed. - events <- Events{} + events <- event.Events{} close(events) // Check histogram value @@ -642,16 +791,16 @@ func TestCounterIncrement(t *testing.T) { } type statsDPacketHandler interface { - handlePacket(packet []byte) - SetEventHandler(eh eventHandler) + HandlePacket(packet []byte) + SetEventHandler(eh event.EventHandler) } type mockStatsDTCPListener struct { - StatsDTCPListener + listener.StatsDTCPListener log.Logger } -func (ml *mockStatsDTCPListener) handlePacket(packet []byte) { +func (ml *mockStatsDTCPListener) HandlePacket(packet []byte) { // Forcing IPv4 because the TravisCI build environment does not have IPv6 // addresses. lc, err := net.ListenTCP("tcp4", nil) @@ -679,7 +828,7 @@ func (ml *mockStatsDTCPListener) handlePacket(packet []byte) { if err != nil { panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err)) } - ml.handleConn(sc) + ml.HandleConn(sc) } // TestTtlExpiration validates expiration of time series. @@ -706,23 +855,23 @@ mappings: if err != nil { t.Fatalf("Config load error: %s %s", config, err) } - events := make(chan Events) + events := make(chan event.Events) defer close(events) go func() { - ex := NewExporter(testMapper, log.NewNopLogger()) + ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) }() - ev := Events{ + ev := event.Events{ // event with default ttl = 1s - &GaugeEvent{ - metricName: "foobar", - value: 200, + &event.GaugeEvent{ + GMetricName: "foobar", + GValue: 200, }, // event with ttl = 2s from a mapping - &TimerEvent{ - metricName: "bazqux.main", - value: 42000, + &event.TimerEvent{ + TMetricName: "bazqux.main", + TValue: 42000, }, } @@ -735,7 +884,7 @@ mappings: // saveLabelValues will use fake instant as a lastRegisteredAt time. clock.ClockInstance.Instant = time.Unix(0, 0) events <- ev - events <- Events{} + events <- event.Events{} // Check values metrics, err = prometheus.DefaultGatherer.Gather() @@ -757,7 +906,7 @@ mappings: // Step 2. Increase Instant to emulate metrics expiration after 1s clock.ClockInstance.Instant = time.Unix(1, 10) clock.ClockInstance.TickerCh <- time.Unix(0, 0) - events <- Events{} + events <- event.Events{} // Check values metrics, err = prometheus.DefaultGatherer.Gather() @@ -779,7 +928,7 @@ mappings: // Step 3. Increase Instant to emulate metrics expiration after 2s clock.ClockInstance.Instant = time.Unix(2, 200) clock.ClockInstance.TickerCh <- time.Unix(0, 0) - events <- Events{} + events <- event.Events{} // Check values metrics, err = prometheus.DefaultGatherer.Gather() @@ -797,32 +946,32 @@ mappings: } func TestHashLabelNames(t *testing.T) { - r := newRegistry(nil) + r := registry.NewRegistry(nil) // Validate value hash changes and name has doesn't when just the value changes. - hash1, _ := r.hashLabels(map[string]string{ + hash1, _ := r.HashLabels(map[string]string{ "label": "value1", }) - hash2, _ := r.hashLabels(map[string]string{ + hash2, _ := r.HashLabels(map[string]string{ "label": "value2", }) - if hash1.names != hash2.names { + if hash1.Names != hash2.Names { t.Fatal("Hash of label names should match, but doesn't") } - if hash1.values == hash2.values { + if hash1.Values == hash2.Values { t.Fatal("Hash of label names shouldn't match, but do") } // Validate value and name hashes change when the name changes. - hash1, _ = r.hashLabels(map[string]string{ + hash1, _ = r.HashLabels(map[string]string{ "label1": "value", }) - hash2, _ = r.hashLabels(map[string]string{ + hash2, _ = r.HashLabels(map[string]string{ "label2": "value", }) - if hash1.names == hash2.names { + if hash1.Names == hash2.Names { t.Fatal("Hash of label names shouldn't match, but do") } - if hash1.values == hash2.values { + if hash1.Values == hash2.Values { t.Fatal("Hash of label names shouldn't match, but do") } } @@ -916,7 +1065,7 @@ func BenchmarkParseDogStatsDTags(b *testing.B) { b.Run(name, func(b *testing.B) { for n := 0; n < b.N; n++ { labels := map[string]string{} - parseDogStatsDTags(tags, labels, log.NewNopLogger()) + line.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger()) } }) } @@ -953,11 +1102,11 @@ func BenchmarkHashNameAndLabels(b *testing.B) { }, } - r := newRegistry(nil) + r := registry.NewRegistry(nil) for _, s := range scenarios { b.Run(s.name, func(b *testing.B) { for n := 0; n < b.N; n++ { - r.hashLabels(s.labels) + r.HashLabels(s.labels) } }) } diff --git a/pkg/line/line.go b/pkg/line/line.go new file mode 100644 index 0000000..730dbcf --- /dev/null +++ b/pkg/line/line.go @@ -0,0 +1,254 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package line + +import ( + "fmt" + "strconv" + "strings" + "unicode/utf8" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/event" + "github.com/prometheus/statsd_exporter/pkg/mapper" +) + +func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (event.Event, error) { + switch statType { + case "c": + return &event.CounterEvent{ + CMetricName: metric, + CValue: float64(value), + CLabels: labels, + }, nil + case "g": + return &event.GaugeEvent{ + GMetricName: metric, + GValue: float64(value), + GRelative: relative, + GLabels: labels, + }, nil + case "ms", "h", "d": + return &event.TimerEvent{ + TMetricName: metric, + TValue: float64(value), + TLabels: labels, + }, nil + case "s": + return nil, fmt.Errorf("no support for StatsD sets") + default: + return nil, fmt.Errorf("bad stat type %s", statType) + } +} + +func parseTag(component, tag string, separator rune, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) { + // Entirely empty tag is an error + if len(tag) == 0 { + tagErrors.Inc() + level.Debug(logger).Log("msg", "Empty name tag", "component", component) + return + } + + for i, c := range tag { + if c == separator { + k := tag[:i] + v := tag[i+1:] + + if len(k) == 0 || len(v) == 0 { + // Empty key or value is an error + tagErrors.Inc() + level.Debug(logger).Log("msg", "Malformed name tag", "k", k, "v", v, "component", component) + } else { + labels[mapper.EscapeMetricName(k)] = v + } + return + } + } + + // Missing separator (no value) is an error + tagErrors.Inc() + level.Debug(logger).Log("msg", "Malformed name tag", "tag", tag, "component", component) +} + +func parseNameTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) { + lastTagEndIndex := 0 + for i, c := range component { + if c == ',' { + tag := component[lastTagEndIndex:i] + lastTagEndIndex = i + 1 + parseTag(component, tag, '=', labels, tagErrors, logger) + } + } + + // If we're not off the end of the string, add the last tag + if lastTagEndIndex < len(component) { + tag := component[lastTagEndIndex:] + parseTag(component, tag, '=', labels, tagErrors, logger) + } +} + +func trimLeftHash(s string) string { + if s != "" && s[0] == '#' { + return s[1:] + } + return s +} + +func ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) { + lastTagEndIndex := 0 + for i, c := range component { + if c == ',' { + tag := component[lastTagEndIndex:i] + lastTagEndIndex = i + 1 + parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger) + } + } + + // If we're not off the end of the string, add the last tag + if lastTagEndIndex < len(component) { + tag := component[lastTagEndIndex:] + parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger) + } +} + +func parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string { + for i, c := range name { + // `#` delimits start of tags by Librato + // https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags + // `,` delimits start of tags by InfluxDB + // https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd + if c == '#' || c == ',' { + parseNameTags(name[i+1:], labels, tagErrors, logger) + return name[:i] + } + } + return name +} + +func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events { + events := event.Events{} + if line == "" { + return events + } + + elements := strings.SplitN(line, ":", 2) + if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { + sampleErrors.WithLabelValues("malformed_line").Inc() + level.Debug(logger).Log("msg", "Bad line from StatsD", "line", line) + return events + } + + labels := map[string]string{} + metric := parseNameAndTags(elements[0], labels, tagErrors, logger) + + var samples []string + if strings.Contains(elements[1], "|#") { + // using DogStatsD tags + + // don't allow mixed tagging styles + if len(labels) > 0 { + sampleErrors.WithLabelValues("mixed_tagging_styles").Inc() + level.Debug(logger).Log("msg", "Bad line (multiple tagging styles) from StatsD", "line", line) + return events + } + + // disable multi-metrics + samples = elements[1:] + } else { + samples = strings.Split(elements[1], ":") + } + +samples: + for _, sample := range samples { + samplesReceived.Inc() + components := strings.Split(sample, "|") + samplingFactor := 1.0 + if len(components) < 2 || len(components) > 4 { + sampleErrors.WithLabelValues("malformed_component").Inc() + level.Debug(logger).Log("msg", "Bad component", "line", line) + continue + } + valueStr, statType := components[0], components[1] + + var relative = false + if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 { + relative = true + } + + value, err := strconv.ParseFloat(valueStr, 64) + if err != nil { + level.Debug(logger).Log("msg", "Bad value", "value", valueStr, "line", line) + sampleErrors.WithLabelValues("malformed_value").Inc() + continue + } + + multiplyEvents := 1 + if len(components) >= 3 { + for _, component := range components[2:] { + if len(component) == 0 { + level.Debug(logger).Log("msg", "Empty component", "line", line) + sampleErrors.WithLabelValues("malformed_component").Inc() + continue samples + } + } + + for _, component := range components[2:] { + switch component[0] { + case '@': + + samplingFactor, err = strconv.ParseFloat(component[1:], 64) + if err != nil { + level.Debug(logger).Log("msg", "Invalid sampling factor", "component", component[1:], "line", line) + sampleErrors.WithLabelValues("invalid_sample_factor").Inc() + } + if samplingFactor == 0 { + samplingFactor = 1 + } + + if statType == "g" { + continue + } else if statType == "c" { + value /= samplingFactor + } else if statType == "ms" || statType == "h" || statType == "d" { + multiplyEvents = int(1 / samplingFactor) + } + case '#': + ParseDogStatsDTags(component[1:], labels, tagErrors, logger) + default: + level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line) + sampleErrors.WithLabelValues("invalid_sample_factor").Inc() + continue + } + } + } + + if len(labels) > 0 { + tagsReceived.Inc() + } + + for i := 0; i < multiplyEvents; i++ { + event, err := buildEvent(statType, metric, value, relative, labels) + if err != nil { + level.Debug(logger).Log("msg", "Error building event", "line", line, "error", err) + sampleErrors.WithLabelValues("illegal_event").Inc() + continue + } + events = append(events, event) + } + } + return events +} diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go new file mode 100644 index 0000000..9b9eb7d --- /dev/null +++ b/pkg/listener/listener.go @@ -0,0 +1,174 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package listener + +import ( + "bufio" + "io" + "net" + "os" + "strings" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/event" + pkgLine "github.com/prometheus/statsd_exporter/pkg/line" +) + +type StatsDUDPListener struct { + Conn *net.UDPConn + EventHandler event.EventHandler + Logger log.Logger + UDPPackets prometheus.Counter + LinesReceived prometheus.Counter + EventsFlushed prometheus.Counter + SampleErrors prometheus.CounterVec + SamplesReceived prometheus.Counter + TagErrors prometheus.Counter + TagsReceived prometheus.Counter +} + +func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { + l.EventHandler = eh +} + +func (l *StatsDUDPListener) Listen() { + buf := make([]byte, 65535) + for { + n, _, err := l.Conn.ReadFromUDP(buf) + if err != nil { + // https://github.com/golang/go/issues/4373 + // ignore net: errClosing error as it will occur during shutdown + if strings.HasSuffix(err.Error(), "use of closed network connection") { + return + } + level.Error(l.Logger).Log("error", err) + return + } + l.HandlePacket(buf[0:n]) + } +} + +func (l *StatsDUDPListener) HandlePacket(packet []byte) { + l.UDPPackets.Inc() + lines := strings.Split(string(packet), "\n") + for _, line := range lines { + l.LinesReceived.Inc() + l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) + } +} + +type StatsDTCPListener struct { + Conn *net.TCPListener + EventHandler event.EventHandler + Logger log.Logger + LinesReceived prometheus.Counter + EventsFlushed prometheus.Counter + SampleErrors prometheus.CounterVec + SamplesReceived prometheus.Counter + TagErrors prometheus.Counter + TagsReceived prometheus.Counter + TCPConnections prometheus.Counter + TCPErrors prometheus.Counter + TCPLineTooLong prometheus.Counter +} + +func (l *StatsDTCPListener) SetEventHandler(eh event.EventHandler) { + l.EventHandler = eh +} + +func (l *StatsDTCPListener) Listen() { + for { + c, err := l.Conn.AcceptTCP() + if err != nil { + // https://github.com/golang/go/issues/4373 + // ignore net: errClosing error as it will occur during shutdown + if strings.HasSuffix(err.Error(), "use of closed network connection") { + return + } + level.Error(l.Logger).Log("msg", "AcceptTCP failed", "error", err) + os.Exit(1) + } + go l.HandleConn(c) + } +} + +func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) { + defer c.Close() + + l.TCPConnections.Inc() + + r := bufio.NewReader(c) + for { + line, isPrefix, err := r.ReadLine() + if err != nil { + if err != io.EOF { + l.TCPErrors.Inc() + level.Debug(l.Logger).Log("msg", "Read failed", "addr", c.RemoteAddr(), "error", err) + } + break + } + if isPrefix { + l.TCPLineTooLong.Inc() + level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr()) + break + } + l.LinesReceived.Inc() + l.EventHandler.Queue(pkgLine.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) + } +} + +type StatsDUnixgramListener struct { + Conn *net.UnixConn + EventHandler event.EventHandler + Logger log.Logger + UnixgramPackets prometheus.Counter + LinesReceived prometheus.Counter + EventsFlushed prometheus.Counter + SampleErrors prometheus.CounterVec + SamplesReceived prometheus.Counter + TagErrors prometheus.Counter + TagsReceived prometheus.Counter +} + +func (l *StatsDUnixgramListener) SetEventHandler(eh event.EventHandler) { + l.EventHandler = eh +} + +func (l *StatsDUnixgramListener) Listen() { + buf := make([]byte, 65535) + for { + n, _, err := l.Conn.ReadFromUnix(buf) + if err != nil { + // https://github.com/golang/go/issues/4373 + // ignore net: errClosing error as it will occur during shutdown + if strings.HasSuffix(err.Error(), "use of closed network connection") { + return + } + level.Error(l.Logger).Log(err) + os.Exit(1) + } + l.HandlePacket(buf[:n]) + } +} + +func (l *StatsDUnixgramListener) HandlePacket(packet []byte) { + l.UnixgramPackets.Inc() + lines := strings.Split(string(packet), "\n") + for _, line := range lines { + l.LinesReceived.Inc() + l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) + } +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000..23186bd --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,67 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type MetricType int + +const ( + CounterMetricType MetricType = iota + GaugeMetricType + SummaryMetricType + HistogramMetricType +) + +type NameHash uint64 + +type ValueHash uint64 + +type LabelHash struct { + // This is a hash over the label names + Names NameHash + // This is a hash over the label names + label values + Values ValueHash +} + +type MetricHolder interface{} + +type VectorHolder interface { + Delete(label prometheus.Labels) bool +} + +type Vector struct { + Holder VectorHolder + RefCount uint64 +} + +type Metric struct { + MetricType MetricType + // Vectors key is the hash of the label names + Vectors map[NameHash]*Vector + // Metrics key is a hash of the label names + label values + Metrics map[ValueHash]*RegisteredMetric +} + +type RegisteredMetric struct { + LastRegisteredAt time.Time + Labels prometheus.Labels + TTL time.Duration + Metric MetricHolder + VecKey NameHash +} diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go new file mode 100644 index 0000000..0022848 --- /dev/null +++ b/pkg/registry/registry.go @@ -0,0 +1,383 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "bytes" + "fmt" + "hash" + "hash/fnv" + "sort" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/statsd_exporter/pkg/clock" + "github.com/prometheus/statsd_exporter/pkg/mapper" + "github.com/prometheus/statsd_exporter/pkg/metrics" +) + +// uncheckedCollector wraps a Collector but its Describe method yields no Desc. +// This allows incoming metrics to have inconsistent label sets +type uncheckedCollector struct { + c prometheus.Collector +} + +func (u uncheckedCollector) Describe(_ chan<- *prometheus.Desc) {} +func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) { + u.c.Collect(c) +} + +type Registry struct { + Metrics map[string]metrics.Metric + Mapper *mapper.MetricMapper + // The below value and label variables are allocated in the registry struct + // so that we don't have to allocate them every time have to compute a label + // hash. + ValueBuf, NameBuf bytes.Buffer + Hasher hash.Hash64 +} + +func NewRegistry(mapper *mapper.MetricMapper) *Registry { + return &Registry{ + Metrics: make(map[string]metrics.Metric), + Mapper: mapper, + Hasher: fnv.New64a(), + } +} + +func (r *Registry) MetricConflicts(metricName string, metricType metrics.MetricType) bool { + vector, hasMetrics := r.Metrics[metricName] + if !hasMetrics { + // No metrics.Metric with this name exists + return false + } + + if vector.MetricType == metricType { + // We've found a copy of this metrics.Metric with this type, but different + // labels, so it's safe to create a new one. + return false + } + + // The metrics.Metric exists, but it's of a different type than we're trying to + // create. + return true +} + +func (r *Registry) StoreCounter(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vec *prometheus.CounterVec, c prometheus.Counter, ttl time.Duration) { + r.Store(metricName, hash, labels, vec, c, metrics.CounterMetricType, ttl) +} + +func (r *Registry) StoreGauge(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vec *prometheus.GaugeVec, g prometheus.Counter, ttl time.Duration) { + r.Store(metricName, hash, labels, vec, g, metrics.GaugeMetricType, ttl) +} + +func (r *Registry) StoreHistogram(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vec *prometheus.HistogramVec, o prometheus.Observer, ttl time.Duration) { + r.Store(metricName, hash, labels, vec, o, metrics.HistogramMetricType, ttl) +} + +func (r *Registry) StoreSummary(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vec *prometheus.SummaryVec, o prometheus.Observer, ttl time.Duration) { + r.Store(metricName, hash, labels, vec, o, metrics.SummaryMetricType, ttl) +} + +func (r *Registry) Store(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vh metrics.VectorHolder, mh metrics.MetricHolder, metricType metrics.MetricType, ttl time.Duration) { + metric, hasMetrics := r.Metrics[metricName] + if !hasMetrics { + metric.MetricType = metricType + metric.Vectors = make(map[metrics.NameHash]*metrics.Vector) + metric.Metrics = make(map[metrics.ValueHash]*metrics.RegisteredMetric) + + r.Metrics[metricName] = metric + } + + v, ok := metric.Vectors[hash.Names] + if !ok { + v = &metrics.Vector{Holder: vh} + metric.Vectors[hash.Names] = v + } + + rm, ok := metric.Metrics[hash.Values] + if !ok { + rm = &metrics.RegisteredMetric{ + Labels: labels, + TTL: ttl, + Metric: mh, + VecKey: hash.Names, + } + metric.Metrics[hash.Values] = rm + v.RefCount++ + } + now := clock.Now() + rm.LastRegisteredAt = now + // Update ttl from mapping + rm.TTL = ttl +} + +func (r *Registry) Get(metricName string, hash metrics.LabelHash, metricType metrics.MetricType) (metrics.VectorHolder, metrics.MetricHolder) { + metric, hasMetric := r.Metrics[metricName] + + if !hasMetric { + return nil, nil + } + if metric.MetricType != metricType { + return nil, nil + } + + rm, ok := metric.Metrics[hash.Values] + if ok { + now := clock.Now() + rm.LastRegisteredAt = now + return metric.Vectors[hash.Names].Holder, rm.Metric + } + + vector, ok := metric.Vectors[hash.Names] + if ok { + return vector.Holder, nil + } + + return nil, nil +} + +func (r *Registry) GetCounter(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping, metricsCount *prometheus.GaugeVec) (prometheus.Counter, error) { + hash, labelNames := r.HashLabels(labels) + vh, mh := r.Get(metricName, hash, metrics.CounterMetricType) + if mh != nil { + return mh.(prometheus.Counter), nil + } + + if r.MetricConflicts(metricName, metrics.CounterMetricType) { + return nil, fmt.Errorf("Metric with name %s is already registered", metricName) + } + + var counterVec *prometheus.CounterVec + if vh == nil { + metricsCount.WithLabelValues("counter").Inc() + counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: metricName, + Help: help, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil { + return nil, err + } + } else { + counterVec = vh.(*prometheus.CounterVec) + } + + var counter prometheus.Counter + var err error + if counter, err = counterVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.StoreCounter(metricName, hash, labels, counterVec, counter, mapping.Ttl) + + return counter, nil +} + +func (r *Registry) GetGauge(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping, metricsCount *prometheus.GaugeVec) (prometheus.Gauge, error) { + hash, labelNames := r.HashLabels(labels) + vh, mh := r.Get(metricName, hash, metrics.GaugeMetricType) + if mh != nil { + return mh.(prometheus.Gauge), nil + } + + if r.MetricConflicts(metricName, metrics.GaugeMetricType) { + return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName) + } + + var gaugeVec *prometheus.GaugeVec + if vh == nil { + metricsCount.WithLabelValues("gauge").Inc() + gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: metricName, + Help: help, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil { + return nil, err + } + } else { + gaugeVec = vh.(*prometheus.GaugeVec) + } + + var gauge prometheus.Gauge + var err error + if gauge, err = gaugeVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.StoreGauge(metricName, hash, labels, gaugeVec, gauge, mapping.Ttl) + + return gauge, nil +} + +func (r *Registry) GetHistogram(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping, metricsCount *prometheus.GaugeVec) (prometheus.Observer, error) { + hash, labelNames := r.HashLabels(labels) + vh, mh := r.Get(metricName, hash, metrics.HistogramMetricType) + if mh != nil { + return mh.(prometheus.Observer), nil + } + + if r.MetricConflicts(metricName, metrics.HistogramMetricType) { + return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName) + } + if r.MetricConflicts(metricName+"_sum", metrics.HistogramMetricType) { + return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName) + } + if r.MetricConflicts(metricName+"_count", metrics.HistogramMetricType) { + return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName) + } + if r.MetricConflicts(metricName+"_bucket", metrics.HistogramMetricType) { + return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName) + } + + var histogramVec *prometheus.HistogramVec + if vh == nil { + metricsCount.WithLabelValues("histogram").Inc() + buckets := r.Mapper.Defaults.Buckets + if mapping.HistogramOptions != nil && len(mapping.HistogramOptions.Buckets) > 0 { + buckets = mapping.HistogramOptions.Buckets + } + histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: metricName, + Help: help, + Buckets: buckets, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil { + return nil, err + } + } else { + histogramVec = vh.(*prometheus.HistogramVec) + } + + var observer prometheus.Observer + var err error + if observer, err = histogramVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.StoreHistogram(metricName, hash, labels, histogramVec, observer, mapping.Ttl) + + return observer, nil +} + +func (r *Registry) GetSummary(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping, metricsCount *prometheus.GaugeVec) (prometheus.Observer, error) { + hash, labelNames := r.HashLabels(labels) + vh, mh := r.Get(metricName, hash, metrics.SummaryMetricType) + if mh != nil { + return mh.(prometheus.Observer), nil + } + + if r.MetricConflicts(metricName, metrics.SummaryMetricType) { + return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName) + } + if r.MetricConflicts(metricName+"_sum", metrics.SummaryMetricType) { + return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName) + } + if r.MetricConflicts(metricName+"_count", metrics.SummaryMetricType) { + return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName) + } + + var summaryVec *prometheus.SummaryVec + if vh == nil { + metricsCount.WithLabelValues("summary").Inc() + quantiles := r.Mapper.Defaults.Quantiles + if mapping != nil && mapping.SummaryOptions != nil && len(mapping.SummaryOptions.Quantiles) > 0 { + quantiles = mapping.SummaryOptions.Quantiles + } + summaryOptions := mapper.SummaryOptions{} + if mapping != nil && mapping.SummaryOptions != nil { + summaryOptions = *mapping.SummaryOptions + } + objectives := make(map[float64]float64) + for _, q := range quantiles { + objectives[q.Quantile] = q.Error + } + // In the case of no mapping file, explicitly define the default quantiles + if len(objectives) == 0 { + objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} + } + summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: metricName, + Help: help, + Objectives: objectives, + MaxAge: summaryOptions.MaxAge, + AgeBuckets: summaryOptions.AgeBuckets, + BufCap: summaryOptions.BufCap, + }, labelNames) + + if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil { + return nil, err + } + } else { + summaryVec = vh.(*prometheus.SummaryVec) + } + + var observer prometheus.Observer + var err error + if observer, err = summaryVec.GetMetricWith(labels); err != nil { + return nil, err + } + r.StoreSummary(metricName, hash, labels, summaryVec, observer, mapping.Ttl) + + return observer, nil +} + +func (r *Registry) RemoveStaleMetrics() { + now := clock.Now() + // delete timeseries with expired ttl + for _, metric := range r.Metrics { + for hash, rm := range metric.Metrics { + if rm.TTL == 0 { + continue + } + if rm.LastRegisteredAt.Add(rm.TTL).Before(now) { + metric.Vectors[rm.VecKey].Holder.Delete(rm.Labels) + metric.Vectors[rm.VecKey].RefCount-- + delete(metric.Metrics, hash) + } + } + } +} + +// Calculates a hash of both the label names and the label names and values. +func (r *Registry) HashLabels(labels prometheus.Labels) (metrics.LabelHash, []string) { + r.Hasher.Reset() + r.NameBuf.Reset() + r.ValueBuf.Reset() + labelNames := make([]string, 0, len(labels)) + + for labelName := range labels { + labelNames = append(labelNames, labelName) + } + sort.Strings(labelNames) + + r.ValueBuf.WriteByte(model.SeparatorByte) + for _, labelName := range labelNames { + r.ValueBuf.WriteString(labels[labelName]) + r.ValueBuf.WriteByte(model.SeparatorByte) + + r.NameBuf.WriteString(labelName) + r.NameBuf.WriteByte(model.SeparatorByte) + } + + lh := metrics.LabelHash{} + r.Hasher.Write(r.NameBuf.Bytes()) + lh.Names = metrics.NameHash(r.Hasher.Sum64()) + + // Now add the values to the names we've already hashed. + r.Hasher.Write(r.ValueBuf.Bytes()) + lh.Values = metrics.ValueHash(r.Hasher.Sum64()) + + return lh, labelNames +} diff --git a/registry.go b/registry.go deleted file mode 100644 index d29dbce..0000000 --- a/registry.go +++ /dev/null @@ -1,416 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "bytes" - "fmt" - "hash" - "hash/fnv" - "sort" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/statsd_exporter/pkg/clock" - "github.com/prometheus/statsd_exporter/pkg/mapper" -) - -type metricType int - -const ( - CounterMetricType metricType = iota - GaugeMetricType - SummaryMetricType - HistogramMetricType -) - -type nameHash uint64 -type valueHash uint64 -type labelHash struct { - // This is a hash over the label names - names nameHash - // This is a hash over the label names + label values - values valueHash -} - -type metricHolder interface{} - -type registeredMetric struct { - lastRegisteredAt time.Time - labels prometheus.Labels - ttl time.Duration - metric metricHolder - vecKey nameHash -} - -type vectorHolder interface { - Delete(label prometheus.Labels) bool -} - -type vector struct { - holder vectorHolder - refCount uint64 -} - -type metric struct { - metricType metricType - // Vectors key is the hash of the label names - vectors map[nameHash]*vector - // Metrics key is a hash of the label names + label values - metrics map[valueHash]*registeredMetric -} - -type registry struct { - metrics map[string]metric - mapper *mapper.MetricMapper - // The below value and label variables are allocated in the registry struct - // so that we don't have to allocate them every time have to compute a label - // hash. - valueBuf, nameBuf bytes.Buffer - hasher hash.Hash64 -} - -func newRegistry(mapper *mapper.MetricMapper) *registry { - return ®istry{ - metrics: make(map[string]metric), - mapper: mapper, - hasher: fnv.New64a(), - } -} - -func (r *registry) metricConflicts(metricName string, metricType metricType) bool { - vector, hasMetric := r.metrics[metricName] - if !hasMetric { - // No metric with this name exists - return false - } - - if vector.metricType == metricType { - // We've found a copy of this metric with this type, but different - // labels, so it's safe to create a new one. - return false - } - - // The metric exists, but it's of a different type than we're trying to - // create. - return true -} - -func (r *registry) storeCounter(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.CounterVec, c prometheus.Counter, ttl time.Duration) { - r.store(metricName, hash, labels, vec, c, CounterMetricType, ttl) -} - -func (r *registry) storeGauge(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.GaugeVec, g prometheus.Counter, ttl time.Duration) { - r.store(metricName, hash, labels, vec, g, GaugeMetricType, ttl) -} - -func (r *registry) storeHistogram(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.HistogramVec, o prometheus.Observer, ttl time.Duration) { - r.store(metricName, hash, labels, vec, o, HistogramMetricType, ttl) -} - -func (r *registry) storeSummary(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.SummaryVec, o prometheus.Observer, ttl time.Duration) { - r.store(metricName, hash, labels, vec, o, SummaryMetricType, ttl) -} - -func (r *registry) store(metricName string, hash labelHash, labels prometheus.Labels, vh vectorHolder, mh metricHolder, metricType metricType, ttl time.Duration) { - metric, hasMetric := r.metrics[metricName] - if !hasMetric { - metric.metricType = metricType - metric.vectors = make(map[nameHash]*vector) - metric.metrics = make(map[valueHash]*registeredMetric) - - r.metrics[metricName] = metric - } - - v, ok := metric.vectors[hash.names] - if !ok { - v = &vector{holder: vh} - metric.vectors[hash.names] = v - } - - rm, ok := metric.metrics[hash.values] - if !ok { - rm = ®isteredMetric{ - labels: labels, - ttl: ttl, - metric: mh, - vecKey: hash.names, - } - metric.metrics[hash.values] = rm - v.refCount++ - } - now := clock.Now() - rm.lastRegisteredAt = now - // Update ttl from mapping - rm.ttl = ttl -} - -func (r *registry) get(metricName string, hash labelHash, metricType metricType) (vectorHolder, metricHolder) { - metric, hasMetric := r.metrics[metricName] - - if !hasMetric { - return nil, nil - } - if metric.metricType != metricType { - return nil, nil - } - - rm, ok := metric.metrics[hash.values] - if ok { - now := clock.Now() - rm.lastRegisteredAt = now - return metric.vectors[hash.names].holder, rm.metric - } - - vector, ok := metric.vectors[hash.names] - if ok { - return vector.holder, nil - } - - return nil, nil -} - -func (r *registry) getCounter(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Counter, error) { - hash, labelNames := r.hashLabels(labels) - vh, mh := r.get(metricName, hash, CounterMetricType) - if mh != nil { - return mh.(prometheus.Counter), nil - } - - if r.metricConflicts(metricName, CounterMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - - var counterVec *prometheus.CounterVec - if vh == nil { - metricsCount.WithLabelValues("counter").Inc() - counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: metricName, - Help: help, - }, labelNames) - - if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil { - return nil, err - } - } else { - counterVec = vh.(*prometheus.CounterVec) - } - - var counter prometheus.Counter - var err error - if counter, err = counterVec.GetMetricWith(labels); err != nil { - return nil, err - } - r.storeCounter(metricName, hash, labels, counterVec, counter, mapping.Ttl) - - return counter, nil -} - -func (r *registry) getGauge(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Gauge, error) { - hash, labelNames := r.hashLabels(labels) - vh, mh := r.get(metricName, hash, GaugeMetricType) - if mh != nil { - return mh.(prometheus.Gauge), nil - } - - if r.metricConflicts(metricName, GaugeMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - - var gaugeVec *prometheus.GaugeVec - if vh == nil { - metricsCount.WithLabelValues("gauge").Inc() - gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: metricName, - Help: help, - }, labelNames) - - if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil { - return nil, err - } - } else { - gaugeVec = vh.(*prometheus.GaugeVec) - } - - var gauge prometheus.Gauge - var err error - if gauge, err = gaugeVec.GetMetricWith(labels); err != nil { - return nil, err - } - r.storeGauge(metricName, hash, labels, gaugeVec, gauge, mapping.Ttl) - - return gauge, nil -} - -func (r *registry) getHistogram(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { - hash, labelNames := r.hashLabels(labels) - vh, mh := r.get(metricName, hash, HistogramMetricType) - if mh != nil { - return mh.(prometheus.Observer), nil - } - - if r.metricConflicts(metricName, HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_sum", HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_count", HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_bucket", HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - - var histogramVec *prometheus.HistogramVec - if vh == nil { - metricsCount.WithLabelValues("histogram").Inc() - buckets := r.mapper.Defaults.Buckets - if mapping.HistogramOptions != nil && len(mapping.HistogramOptions.Buckets) > 0 { - buckets = mapping.HistogramOptions.Buckets - } - histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: metricName, - Help: help, - Buckets: buckets, - }, labelNames) - - if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil { - return nil, err - } - } else { - histogramVec = vh.(*prometheus.HistogramVec) - } - - var observer prometheus.Observer - var err error - if observer, err = histogramVec.GetMetricWith(labels); err != nil { - return nil, err - } - r.storeHistogram(metricName, hash, labels, histogramVec, observer, mapping.Ttl) - - return observer, nil -} - -func (r *registry) getSummary(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { - hash, labelNames := r.hashLabels(labels) - vh, mh := r.get(metricName, hash, SummaryMetricType) - if mh != nil { - return mh.(prometheus.Observer), nil - } - - if r.metricConflicts(metricName, SummaryMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_sum", SummaryMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_count", SummaryMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - - var summaryVec *prometheus.SummaryVec - if vh == nil { - metricsCount.WithLabelValues("summary").Inc() - quantiles := r.mapper.Defaults.Quantiles - if mapping != nil && mapping.SummaryOptions != nil && len(mapping.SummaryOptions.Quantiles) > 0 { - quantiles = mapping.SummaryOptions.Quantiles - } - summaryOptions := mapper.SummaryOptions{} - if mapping != nil && mapping.SummaryOptions != nil { - summaryOptions = *mapping.SummaryOptions - } - objectives := make(map[float64]float64) - for _, q := range quantiles { - objectives[q.Quantile] = q.Error - } - // In the case of no mapping file, explicitly define the default quantiles - if len(objectives) == 0 { - objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} - } - summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Name: metricName, - Help: help, - Objectives: objectives, - MaxAge: summaryOptions.MaxAge, - AgeBuckets: summaryOptions.AgeBuckets, - BufCap: summaryOptions.BufCap, - }, labelNames) - - if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil { - return nil, err - } - } else { - summaryVec = vh.(*prometheus.SummaryVec) - } - - var observer prometheus.Observer - var err error - if observer, err = summaryVec.GetMetricWith(labels); err != nil { - return nil, err - } - r.storeSummary(metricName, hash, labels, summaryVec, observer, mapping.Ttl) - - return observer, nil -} - -func (r *registry) removeStaleMetrics() { - now := clock.Now() - // delete timeseries with expired ttl - for _, metric := range r.metrics { - for hash, rm := range metric.metrics { - if rm.ttl == 0 { - continue - } - if rm.lastRegisteredAt.Add(rm.ttl).Before(now) { - metric.vectors[rm.vecKey].holder.Delete(rm.labels) - metric.vectors[rm.vecKey].refCount-- - delete(metric.metrics, hash) - } - } - } -} - -// Calculates a hash of both the label names and the label names and values. -func (r *registry) hashLabels(labels prometheus.Labels) (labelHash, []string) { - r.hasher.Reset() - r.nameBuf.Reset() - r.valueBuf.Reset() - labelNames := make([]string, 0, len(labels)) - - for labelName := range labels { - labelNames = append(labelNames, labelName) - } - sort.Strings(labelNames) - - r.valueBuf.WriteByte(model.SeparatorByte) - for _, labelName := range labelNames { - r.valueBuf.WriteString(labels[labelName]) - r.valueBuf.WriteByte(model.SeparatorByte) - - r.nameBuf.WriteString(labelName) - r.nameBuf.WriteByte(model.SeparatorByte) - } - - lh := labelHash{} - r.hasher.Write(r.nameBuf.Bytes()) - lh.names = nameHash(r.hasher.Sum64()) - - // Now add the values to the names we've already hashed. - r.hasher.Write(r.valueBuf.Bytes()) - lh.values = valueHash(r.hasher.Sum64()) - - return lh, labelNames -} diff --git a/telemetry.go b/telemetry.go deleted file mode 100644 index cfc22b1..0000000 --- a/telemetry.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -var ( - eventStats = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "statsd_exporter_events_total", - Help: "The total number of StatsD events seen.", - }, - []string{"type"}, - ) - eventsFlushed = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_event_queue_flushed_total", - Help: "Number of times events were flushed to exporter", - }, - ) - eventsUnmapped = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "statsd_exporter_events_unmapped_total", - Help: "The total number of StatsD events no mapping was found for.", - }) - udpPackets = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_udp_packets_total", - Help: "The total number of StatsD packets received over UDP.", - }, - ) - tcpConnections = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_tcp_connections_total", - Help: "The total number of TCP connections handled.", - }, - ) - tcpErrors = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_tcp_connection_errors_total", - Help: "The number of errors encountered reading from TCP.", - }, - ) - tcpLineTooLong = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_tcp_too_long_lines_total", - Help: "The number of lines discarded due to being too long.", - }, - ) - unixgramPackets = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_unixgram_packets_total", - Help: "The total number of StatsD packets received over Unixgram.", - }, - ) - linesReceived = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_lines_total", - Help: "The total number of StatsD lines received.", - }, - ) - samplesReceived = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_samples_total", - Help: "The total number of StatsD samples received.", - }, - ) - sampleErrors = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "statsd_exporter_sample_errors_total", - Help: "The total number of errors parsing StatsD samples.", - }, - []string{"reason"}, - ) - tagsReceived = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_tags_total", - Help: "The total number of DogStatsD tags processed.", - }, - ) - tagErrors = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_tag_errors_total", - Help: "The number of errors parsing DogStatsD tags.", - }, - ) - configLoads = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "statsd_exporter_config_reloads_total", - Help: "The number of configuration reloads.", - }, - []string{"outcome"}, - ) - mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "statsd_exporter_loaded_mappings", - Help: "The current number of configured metric mappings.", - }) - conflictingEventStats = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "statsd_exporter_events_conflict_total", - Help: "The total number of StatsD events with conflicting names.", - }, - []string{"type"}, - ) - errorEventStats = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "statsd_exporter_events_error_total", - Help: "The total number of StatsD events discarded due to errors.", - }, - []string{"reason"}, - ) - eventsActions = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "statsd_exporter_events_actions_total", - Help: "The total number of StatsD events by action.", - }, - []string{"action"}, - ) - metricsCount = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "statsd_exporter_metrics_total", - Help: "The total number of metrics.", - }, - []string{"type"}, - ) -) - -func init() { - prometheus.MustRegister(eventStats) - prometheus.MustRegister(eventsFlushed) - prometheus.MustRegister(eventsUnmapped) - prometheus.MustRegister(udpPackets) - prometheus.MustRegister(tcpConnections) - prometheus.MustRegister(tcpErrors) - prometheus.MustRegister(tcpLineTooLong) - prometheus.MustRegister(unixgramPackets) - prometheus.MustRegister(linesReceived) - prometheus.MustRegister(samplesReceived) - prometheus.MustRegister(sampleErrors) - prometheus.MustRegister(tagsReceived) - prometheus.MustRegister(tagErrors) - prometheus.MustRegister(configLoads) - prometheus.MustRegister(mappingsCount) - prometheus.MustRegister(conflictingEventStats) - prometheus.MustRegister(errorEventStats) - prometheus.MustRegister(eventsActions) - prometheus.MustRegister(metricsCount) -} diff --git a/vendor/github.com/sirupsen/logrus/appveyor.yml b/vendor/github.com/sirupsen/logrus/appveyor.yml index 96c2ce1..b4ffca2 100644 --- a/vendor/github.com/sirupsen/logrus/appveyor.yml +++ b/vendor/github.com/sirupsen/logrus/appveyor.yml @@ -1,14 +1,14 @@ -version: "{build}" -platform: x64 -clone_folder: c:\gopath\src\github.com\sirupsen\logrus -environment: - GOPATH: c:\gopath -branches: - only: - - master -install: - - set PATH=%GOPATH%\bin;c:\go\bin;%PATH% - - go version -build_script: - - go get -t - - go test +version: "{build}" +platform: x64 +clone_folder: c:\gopath\src\github.com\sirupsen\logrus +environment: + GOPATH: c:\gopath +branches: + only: + - master +install: + - set PATH=%GOPATH%\bin;c:\go\bin;%PATH% + - go version +build_script: + - go get -t + - go test