Merge pull request #298 from davidsonff/issue-234

:Issue 234: Split into reusable packages.
This commit is contained in:
Matthias Rampke 2020-04-17 13:43:30 +02:00 committed by GitHub
commit 1f3adf69c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 2287 additions and 1693 deletions

4
.gitattributes vendored Normal file
View file

@ -0,0 +1,4 @@
# Managing line ending conversions
# See http://git-scm.com/docs/gitattributes#_end-of-line_conversion
* text=auto
* eol=lf

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ dependencies-stamp
/.deps
/.release
/.tarballs
*~

View file

@ -14,281 +14,291 @@
package main
import (
"fmt"
"net"
"reflect"
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
func TestHandlePacket(t *testing.T) {
scenarios := []struct {
name string
in string
out Events
out event.Events
}{
{
name: "empty",
}, {
name: "simple counter",
in: "foo:2|c",
out: Events{
&CounterEvent{
metricName: "foo",
value: 2,
labels: map[string]string{},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 2,
CLabels: map[string]string{},
},
},
}, {
name: "simple gauge",
in: "foo:3|g",
out: Events{
&GaugeEvent{
metricName: "foo",
value: 3,
labels: map[string]string{},
out: event.Events{
&event.GaugeEvent{
GMetricName: "foo",
GValue: 3,
GLabels: map[string]string{},
},
},
}, {
name: "gauge with sampling",
in: "foo:3|g|@0.2",
out: Events{
&GaugeEvent{
metricName: "foo",
value: 3,
labels: map[string]string{},
out: event.Events{
&event.GaugeEvent{
GMetricName: "foo",
GValue: 3,
GLabels: map[string]string{},
},
},
}, {
name: "gauge decrement",
in: "foo:-10|g",
out: Events{
&GaugeEvent{
metricName: "foo",
value: -10,
relative: true,
labels: map[string]string{},
out: event.Events{
&event.GaugeEvent{
GMetricName: "foo",
GValue: -10,
GRelative: true,
GLabels: map[string]string{},
},
},
}, {
name: "simple timer",
in: "foo:200|ms",
out: Events{
&TimerEvent{
metricName: "foo",
value: 200,
labels: map[string]string{},
out: event.Events{
&event.TimerEvent{
TMetricName: "foo",
TValue: 200,
TLabels: map[string]string{},
},
},
}, {
name: "simple histogram",
in: "foo:200|h",
out: Events{
&TimerEvent{
metricName: "foo",
value: 200,
labels: map[string]string{},
out: event.Events{
&event.TimerEvent{
TMetricName: "foo",
TValue: 200,
TLabels: map[string]string{},
},
},
}, {
name: "simple distribution",
in: "foo:200|d",
out: Events{
&TimerEvent{
metricName: "foo",
value: 200,
labels: map[string]string{},
out: event.Events{
&event.TimerEvent{
TMetricName: "foo",
TValue: 200,
TLabels: map[string]string{},
},
},
}, {
name: "distribution with sampling",
in: "foo:0.01|d|@0.2|#tag1:bar,#tag2:baz",
out: Events{
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
out: event.Events{
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
},
}, {
name: "librato tag extension",
in: "foo#tag1=bar,tag2=baz:100|c",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
},
}, {
name: "librato tag extension with tag keys unsupported by prometheus",
in: "foo#09digits=0,tag.with.dots=1:100|c",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{"_09digits": "0", "tag_with_dots": "1"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{"_09digits": "0", "tag_with_dots": "1"},
},
},
}, {
name: "influxdb tag extension",
in: "foo,tag1=bar,tag2=baz:100|c",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
},
}, {
name: "influxdb tag extension with tag keys unsupported by prometheus",
in: "foo,09digits=0,tag.with.dots=1:100|c",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{"_09digits": "0", "tag_with_dots": "1"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{"_09digits": "0", "tag_with_dots": "1"},
},
},
}, {
name: "datadog tag extension",
in: "foo:100|c|#tag1:bar,tag2:baz",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
},
}, {
name: "datadog tag extension with # in all keys (as sent by datadog php client)",
in: "foo:100|c|#tag1:bar,#tag2:baz",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
},
}, {
name: "datadog tag extension with tag keys unsupported by prometheus",
in: "foo:100|c|#09digits:0,tag.with.dots:1",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{"_09digits": "0", "tag_with_dots": "1"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{"_09digits": "0", "tag_with_dots": "1"},
},
},
}, {
name: "datadog tag extension with valueless tags: ignored",
in: "foo:100|c|#tag_without_a_value",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{},
},
},
}, {
name: "datadog tag extension with valueless tags (edge case)",
in: "foo:100|c|#tag_without_a_value,tag:value",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{"tag": "value"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{"tag": "value"},
},
},
}, {
name: "datadog tag extension with empty tags (edge case)",
in: "foo:100|c|#tag:value,,",
out: Events{
&CounterEvent{
metricName: "foo",
value: 100,
labels: map[string]string{"tag": "value"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{"tag": "value"},
},
},
}, {
name: "datadog tag extension with sampling",
in: "foo:100|c|@0.1|#tag1:bar,#tag2:baz",
out: Events{
&CounterEvent{
metricName: "foo",
value: 1000,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 1000,
CLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
},
}, {
name: "librato/dogstatsd mixed tag styles without sampling",
in: "foo#tag1=foo,tag3=bing:100|c|#tag1:bar,#tag2:baz",
out: Events{},
out: event.Events{},
}, {
name: "influxdb/dogstatsd mixed tag styles without sampling",
in: "foo,tag1=foo,tag3=bing:100|c|#tag1:bar,#tag2:baz",
out: Events{},
out: event.Events{},
}, {
name: "mixed tag styles with sampling",
in: "foo#tag1=foo,tag3=bing:100|c|@0.1|#tag1:bar,#tag2:baz",
out: Events{},
out: event.Events{},
}, {
name: "histogram with sampling",
in: "foo:0.01|h|@0.2|#tag1:bar,#tag2:baz",
out: Events{
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
out: event.Events{
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
&TimerEvent{
metricName: "foo",
value: 0.01,
labels: map[string]string{"tag1": "bar", "tag2": "baz"},
&event.TimerEvent{
TMetricName: "foo",
TValue: 0.01,
TLabels: map[string]string{"tag1": "bar", "tag2": "baz"},
},
},
}, {
name: "datadog tag extension with multiple colons",
in: "foo:100|c|@0.1|#tag1:foo:bar",
out: Events{
&CounterEvent{
metricName: "foo",
value: 1000,
labels: map[string]string{"tag1": "foo:bar"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 1000,
CLabels: map[string]string{"tag1": "foo:bar"},
},
},
}, {
@ -300,62 +310,62 @@ func TestHandlePacket(t *testing.T) {
}, {
name: "multiple metrics with invalid datadog utf8 tag values",
in: "foo:200|c|#tag:value\nfoo:300|c|#tag:\xc3\x28invalid",
out: Events{
&CounterEvent{
metricName: "foo",
value: 200,
labels: map[string]string{"tag": "value"},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 200,
CLabels: map[string]string{"tag": "value"},
},
},
}, {
name: "combined multiline metrics",
in: "foo:200|ms:300|ms:5|c|@0.1:6|g\nbar:1|c:5|ms",
out: Events{
&TimerEvent{
metricName: "foo",
value: 200,
labels: map[string]string{},
out: event.Events{
&event.TimerEvent{
TMetricName: "foo",
TValue: 200,
TLabels: map[string]string{},
},
&TimerEvent{
metricName: "foo",
value: 300,
labels: map[string]string{},
&event.TimerEvent{
TMetricName: "foo",
TValue: 300,
TLabels: map[string]string{},
},
&CounterEvent{
metricName: "foo",
value: 50,
labels: map[string]string{},
&event.CounterEvent{
CMetricName: "foo",
CValue: 50,
CLabels: map[string]string{},
},
&GaugeEvent{
metricName: "foo",
value: 6,
labels: map[string]string{},
&event.GaugeEvent{
GMetricName: "foo",
GValue: 6,
GLabels: map[string]string{},
},
&CounterEvent{
metricName: "bar",
value: 1,
labels: map[string]string{},
&event.CounterEvent{
CMetricName: "bar",
CValue: 1,
CLabels: map[string]string{},
},
&TimerEvent{
metricName: "bar",
value: 5,
labels: map[string]string{},
&event.TimerEvent{
TMetricName: "bar",
TValue: 5,
TLabels: map[string]string{},
},
},
}, {
name: "timings with sampling factor",
in: "foo.timing:0.5|ms|@0.1",
out: Events{
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
&TimerEvent{metricName: "foo.timing", value: 0.5, labels: map[string]string{}},
out: event.Events{
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
&event.TimerEvent{TMetricName: "foo.timing", TValue: 0.5, TLabels: map[string]string{}},
},
}, {
name: "bad line",
@ -369,21 +379,21 @@ func TestHandlePacket(t *testing.T) {
}, {
name: "illegal sampling factor",
in: "foo:1|c|@bar",
out: Events{
&CounterEvent{
metricName: "foo",
value: 1,
labels: map[string]string{},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 1,
CLabels: map[string]string{},
},
},
}, {
name: "zero sampling factor",
in: "foo:2|c|@0",
out: Events{
&CounterEvent{
metricName: "foo",
value: 2,
labels: map[string]string{},
out: event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: 2,
CLabels: map[string]string{},
},
},
}, {
@ -405,25 +415,49 @@ func TestHandlePacket(t *testing.T) {
{
name: "some invalid utf8",
in: "valid_utf8:1|c\ninvalid\xc3\x28utf8:1|c",
out: Events{
&CounterEvent{
metricName: "valid_utf8",
value: 1,
labels: map[string]string{},
out: event.Events{
&event.CounterEvent{
CMetricName: "valid_utf8",
CValue: 1,
CLabels: map[string]string{},
},
},
},
}
for k, l := range []statsDPacketHandler{&StatsDUDPListener{nil, nil, log.NewNopLogger()}, &mockStatsDTCPListener{StatsDTCPListener{nil, nil, log.NewNopLogger()}, log.NewNopLogger()}} {
events := make(chan Events, 32)
l.SetEventHandler(&unbufferedEventHandler{c: events})
for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}, &mockStatsDTCPListener{listener.StatsDTCPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}, log.NewNopLogger()}} {
events := make(chan event.Events, 32)
l.SetEventHandler(&event.UnbufferedEventHandler{C: events})
for i, scenario := range scenarios {
l.handlePacket([]byte(scenario.in))
l.HandlePacket([]byte(scenario.in))
le := len(events)
// Flatten actual events.
actual := Events{}
actual := event.Events{}
for i := 0; i < le; i++ {
actual = append(actual, <-events...)
}
@ -440,3 +474,225 @@ func TestHandlePacket(t *testing.T) {
}
}
}
type statsDPacketHandler interface {
HandlePacket(packet []byte)
SetEventHandler(eh event.EventHandler)
}
type mockStatsDTCPListener struct {
listener.StatsDTCPListener
log.Logger
}
func (ml *mockStatsDTCPListener) HandlePacket(packet []byte) {
// Forcing IPv4 because the TravisCI build environment does not have IPv6
// addresses.
lc, err := net.ListenTCP("tcp4", nil)
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: listen failed: %v", err))
}
defer lc.Close()
go func() {
cc, err := net.DialTCP("tcp", nil, lc.Addr().(*net.TCPAddr))
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: dial failed: %v", err))
}
defer cc.Close()
n, err := cc.Write(packet)
if err != nil || n != len(packet) {
panic(fmt.Sprintf("mockStatsDTCPListener: write failed: %v,%d", err, n))
}
}()
sc, err := lc.AcceptTCP()
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err))
}
ml.HandleConn(sc)
}
// TestTtlExpiration validates expiration of time series.
// foobar metric without mapping should expire with default ttl of 1s
// bazqux metric should expire with ttl of 2s
func TestTtlExpiration(t *testing.T) {
// Mock a time.NewTicker
tickerCh := make(chan time.Time)
clock.ClockInstance = &clock.Clock{
TickerCh: tickerCh,
}
config := `
defaults:
ttl: 1s
mappings:
- match: bazqux.*
name: bazqux
ttl: 2s
`
// Create mapper from config and start an Exporter with a synchronous channel
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config, 0)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
events := make(chan event.Events)
defer close(events)
go func() {
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
ev := event.Events{
// event with default ttl = 1s
&event.GaugeEvent{
GMetricName: "foobar",
GValue: 200,
},
// event with ttl = 2s from a mapping
&event.TimerEvent{
TMetricName: "bazqux.main",
TValue: 42000,
},
}
var metrics []*dto.MetricFamily
var foobarValue *float64
var bazquxValue *float64
// Step 1. Send events with statsd metrics.
// Send empty Events to wait for events are handled.
// saveLabelValues will use fake instant as a lastRegisteredAt time.
clock.ClockInstance.Instant = time.Unix(0, 0)
events <- ev
events <- event.Events{}
// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal("Gather should not fail")
}
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
if foobarValue == nil || bazquxValue == nil {
t.Fatalf("Gauge `foobar` and Summary `bazqux` should be gathered")
}
if *foobarValue != 200 {
t.Fatalf("Gauge `foobar` observation %f is not expected. Should be 200", *foobarValue)
}
if *bazquxValue != 42 {
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
}
// Step 2. Increase Instant to emulate metrics expiration after 1s
clock.ClockInstance.Instant = time.Unix(1, 10)
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
events <- event.Events{}
// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal("Gather should not fail")
}
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
if foobarValue != nil {
t.Fatalf("Gauge `foobar` should be expired")
}
if bazquxValue == nil {
t.Fatalf("Summary `bazqux` should be gathered")
}
if *bazquxValue != 42 {
t.Fatalf("Summary `bazqux` observation %f is not expected. Should be 42", *bazquxValue)
}
// Step 3. Increase Instant to emulate metrics expiration after 2s
clock.ClockInstance.Instant = time.Unix(2, 200)
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
events <- event.Events{}
// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
if err != nil {
t.Fatal("Gather should not fail")
}
foobarValue = getFloat64(metrics, "foobar", prometheus.Labels{})
bazquxValue = getFloat64(metrics, "bazqux", prometheus.Labels{})
if bazquxValue != nil {
t.Fatalf("Summary `bazqux` should be expired")
}
if foobarValue != nil {
t.Fatalf("Gauge `foobar` should not be gathered after expiration")
}
}
// getFloat64 search for metric by name in array of MetricFamily and then search a value by labels.
// Method returns a value or nil if metric is not found.
func getFloat64(metrics []*dto.MetricFamily, name string, labels prometheus.Labels) *float64 {
var metricFamily *dto.MetricFamily
for _, m := range metrics {
if *m.Name == name {
metricFamily = m
break
}
}
if metricFamily == nil {
return nil
}
var metric *dto.Metric
labelStr := fmt.Sprintf("%v", labels)
for _, m := range metricFamily.Metric {
l := labelPairsAsLabels(m.GetLabel())
ls := fmt.Sprintf("%v", l)
if labelStr == ls {
metric = m
break
}
}
if metric == nil {
return nil
}
var value float64
if metric.Gauge != nil {
value = metric.Gauge.GetValue()
return &value
}
if metric.Counter != nil {
value = metric.Counter.GetValue()
return &value
}
if metric.Histogram != nil {
value = metric.Histogram.GetSampleSum()
return &value
}
if metric.Summary != nil {
value = metric.Summary.GetSampleSum()
return &value
}
if metric.Untyped != nil {
value = metric.Untyped.GetValue()
return &value
}
panic(fmt.Errorf("collected a non-gauge/counter/histogram/summary/untyped metric: %s", metric))
}
func labelPairsAsLabels(pairs []*dto.LabelPair) (labels prometheus.Labels) {
labels = prometheus.Labels{}
for _, pair := range pairs {
if pair.Name == nil {
continue
}
value := ""
if pair.Value != nil {
value = *pair.Value
}
labels[*pair.Name] = value
}
return
}

View file

@ -1,546 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bufio"
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"time"
"unicode/utf8"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
const (
defaultHelp = "Metric autogenerated by statsd_exporter."
regErrF = "Failed to update metric"
)
// uncheckedCollector wraps a Collector but its Describe method yields no Desc.
// This allows incoming metrics to have inconsistent label sets
type uncheckedCollector struct {
c prometheus.Collector
}
func (u uncheckedCollector) Describe(_ chan<- *prometheus.Desc) {}
func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
u.c.Collect(c)
}
type Exporter struct {
mapper *mapper.MetricMapper
registry *registry
logger log.Logger
}
// Listen handles all events sent to the given channel sequentially. It
// terminates when the channel is closed.
func (b *Exporter) Listen(e <-chan Events) {
removeStaleMetricsTicker := clock.NewTicker(time.Second)
for {
select {
case <-removeStaleMetricsTicker.C:
b.registry.removeStaleMetrics()
case events, ok := <-e:
if !ok {
level.Debug(b.logger).Log("msg", "Channel is closed. Break out of Exporter.Listener.")
removeStaleMetricsTicker.Stop()
return
}
for _, event := range events {
b.handleEvent(event)
}
}
}
}
// handleEvent processes a single Event according to the configured mapping.
func (b *Exporter) handleEvent(event Event) {
mapping, labels, present := b.mapper.GetMapping(event.MetricName(), event.MetricType())
if mapping == nil {
mapping = &mapper.MetricMapping{}
if b.mapper.Defaults.Ttl != 0 {
mapping.Ttl = b.mapper.Defaults.Ttl
}
}
if mapping.Action == mapper.ActionTypeDrop {
eventsActions.WithLabelValues("drop").Inc()
return
}
help := defaultHelp
if mapping.HelpText != "" {
help = mapping.HelpText
}
metricName := ""
prometheusLabels := event.Labels()
if present {
if mapping.Name == "" {
level.Debug(b.logger).Log("msg", "The mapping generates an empty metric name", "metric_name", event.MetricName(), "match", mapping.Match)
errorEventStats.WithLabelValues("empty_metric_name").Inc()
return
}
metricName = mapper.EscapeMetricName(mapping.Name)
for label, value := range labels {
prometheusLabels[label] = value
}
eventsActions.WithLabelValues(string(mapping.Action)).Inc()
} else {
eventsUnmapped.Inc()
metricName = mapper.EscapeMetricName(event.MetricName())
}
switch ev := event.(type) {
case *CounterEvent:
// We don't accept negative values for counters. Incrementing the counter with a negative number
// will cause the exporter to panic. Instead we will warn and continue to the next event.
if event.Value() < 0.0 {
level.Debug(b.logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", event.Value())
errorEventStats.WithLabelValues("illegal_negative_counter").Inc()
return
}
counter, err := b.registry.getCounter(metricName, prometheusLabels, help, mapping)
if err == nil {
counter.Add(event.Value())
eventStats.WithLabelValues("counter").Inc()
} else {
level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("counter").Inc()
}
case *GaugeEvent:
gauge, err := b.registry.getGauge(metricName, prometheusLabels, help, mapping)
if err == nil {
if ev.relative {
gauge.Add(event.Value())
} else {
gauge.Set(event.Value())
}
eventStats.WithLabelValues("gauge").Inc()
} else {
level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("gauge").Inc()
}
case *TimerEvent:
t := mapper.TimerTypeDefault
if mapping != nil {
t = mapping.TimerType
}
if t == mapper.TimerTypeDefault {
t = b.mapper.Defaults.TimerType
}
switch t {
case mapper.TimerTypeHistogram:
histogram, err := b.registry.getHistogram(metricName, prometheusLabels, help, mapping)
if err == nil {
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
eventStats.WithLabelValues("timer").Inc()
} else {
level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("timer").Inc()
}
case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
summary, err := b.registry.getSummary(metricName, prometheusLabels, help, mapping)
if err == nil {
summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
eventStats.WithLabelValues("timer").Inc()
} else {
level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("timer").Inc()
}
default:
level.Error(b.logger).Log("msg", "unknown timer type", "type", t)
os.Exit(1)
}
default:
level.Debug(b.logger).Log("msg", "Unsupported event type")
eventStats.WithLabelValues("illegal").Inc()
}
}
func NewExporter(mapper *mapper.MetricMapper, logger log.Logger) *Exporter {
return &Exporter{
mapper: mapper,
registry: newRegistry(mapper),
logger: logger,
}
}
func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (Event, error) {
switch statType {
case "c":
return &CounterEvent{
metricName: metric,
value: float64(value),
labels: labels,
}, nil
case "g":
return &GaugeEvent{
metricName: metric,
value: float64(value),
relative: relative,
labels: labels,
}, nil
case "ms", "h", "d":
return &TimerEvent{
metricName: metric,
value: float64(value),
labels: labels,
}, nil
case "s":
return nil, fmt.Errorf("no support for StatsD sets")
default:
return nil, fmt.Errorf("bad stat type %s", statType)
}
}
func parseTag(component, tag string, separator rune, labels map[string]string, logger log.Logger) {
// Entirely empty tag is an error
if len(tag) == 0 {
tagErrors.Inc()
level.Debug(logger).Log("msg", "Empty name tag", "component", component)
return
}
for i, c := range tag {
if c == separator {
k := tag[:i]
v := tag[i+1:]
if len(k) == 0 || len(v) == 0 {
// Empty key or value is an error
tagErrors.Inc()
level.Debug(logger).Log("msg", "Malformed name tag", "k", k, "v", v, "component", component)
} else {
labels[mapper.EscapeMetricName(k)] = v
}
return
}
}
// Missing separator (no value) is an error
tagErrors.Inc()
level.Debug(logger).Log("msg", "Malformed name tag", "tag", tag, "component", component)
}
func parseNameTags(component string, labels map[string]string, logger log.Logger) {
lastTagEndIndex := 0
for i, c := range component {
if c == ',' {
tag := component[lastTagEndIndex:i]
lastTagEndIndex = i + 1
parseTag(component, tag, '=', labels, logger)
}
}
// If we're not off the end of the string, add the last tag
if lastTagEndIndex < len(component) {
tag := component[lastTagEndIndex:]
parseTag(component, tag, '=', labels, logger)
}
}
func trimLeftHash(s string) string {
if s != "" && s[0] == '#' {
return s[1:]
}
return s
}
func parseDogStatsDTags(component string, labels map[string]string, logger log.Logger) {
lastTagEndIndex := 0
for i, c := range component {
if c == ',' {
tag := component[lastTagEndIndex:i]
lastTagEndIndex = i + 1
parseTag(component, trimLeftHash(tag), ':', labels, logger)
}
}
// If we're not off the end of the string, add the last tag
if lastTagEndIndex < len(component) {
tag := component[lastTagEndIndex:]
parseTag(component, trimLeftHash(tag), ':', labels, logger)
}
}
func parseNameAndTags(name string, labels map[string]string, logger log.Logger) string {
for i, c := range name {
// `#` delimits start of tags by Librato
// https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags
// `,` delimits start of tags by InfluxDB
// https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd
if c == '#' || c == ',' {
parseNameTags(name[i+1:], labels, logger)
return name[:i]
}
}
return name
}
func lineToEvents(line string, logger log.Logger) Events {
events := Events{}
if line == "" {
return events
}
elements := strings.SplitN(line, ":", 2)
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
sampleErrors.WithLabelValues("malformed_line").Inc()
level.Debug(logger).Log("msg", "Bad line from StatsD", "line", line)
return events
}
labels := map[string]string{}
metric := parseNameAndTags(elements[0], labels, logger)
var samples []string
if strings.Contains(elements[1], "|#") {
// using DogStatsD tags
// don't allow mixed tagging styles
if len(labels) > 0 {
sampleErrors.WithLabelValues("mixed_tagging_styles").Inc()
level.Debug(logger).Log("msg", "Bad line (multiple tagging styles) from StatsD", "line", line)
return events
}
// disable multi-metrics
samples = elements[1:]
} else {
samples = strings.Split(elements[1], ":")
}
samples:
for _, sample := range samples {
samplesReceived.Inc()
components := strings.Split(sample, "|")
samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 {
sampleErrors.WithLabelValues("malformed_component").Inc()
level.Debug(logger).Log("msg", "Bad component", "line", line)
continue
}
valueStr, statType := components[0], components[1]
var relative = false
if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 {
relative = true
}
value, err := strconv.ParseFloat(valueStr, 64)
if err != nil {
level.Debug(logger).Log("msg", "Bad value", "value", valueStr, "line", line)
sampleErrors.WithLabelValues("malformed_value").Inc()
continue
}
multiplyEvents := 1
if len(components) >= 3 {
for _, component := range components[2:] {
if len(component) == 0 {
level.Debug(logger).Log("msg", "Empty component", "line", line)
sampleErrors.WithLabelValues("malformed_component").Inc()
continue samples
}
}
for _, component := range components[2:] {
switch component[0] {
case '@':
samplingFactor, err = strconv.ParseFloat(component[1:], 64)
if err != nil {
level.Debug(logger).Log("msg", "Invalid sampling factor", "component", component[1:], "line", line)
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
}
if samplingFactor == 0 {
samplingFactor = 1
}
if statType == "g" {
continue
} else if statType == "c" {
value /= samplingFactor
} else if statType == "ms" || statType == "h" || statType == "d" {
multiplyEvents = int(1 / samplingFactor)
}
case '#':
parseDogStatsDTags(component[1:], labels, logger)
default:
level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line)
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
continue
}
}
}
if len(labels) > 0 {
tagsReceived.Inc()
}
for i := 0; i < multiplyEvents; i++ {
event, err := buildEvent(statType, metric, value, relative, labels)
if err != nil {
level.Debug(logger).Log("msg", "Error building event", "line", line, "error", err)
sampleErrors.WithLabelValues("illegal_event").Inc()
continue
}
events = append(events, event)
}
}
return events
}
type StatsDUDPListener struct {
conn *net.UDPConn
eventHandler eventHandler
logger log.Logger
}
func (l *StatsDUDPListener) SetEventHandler(eh eventHandler) {
l.eventHandler = eh
}
func (l *StatsDUDPListener) Listen() {
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUDP(buf)
if err != nil {
// https://github.com/golang/go/issues/4373
// ignore net: errClosing error as it will occur during shutdown
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
level.Error(l.logger).Log("error", err)
return
}
l.handlePacket(buf[0:n])
}
}
func (l *StatsDUDPListener) handlePacket(packet []byte) {
udpPackets.Inc()
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
linesReceived.Inc()
l.eventHandler.queue(lineToEvents(line, l.logger))
}
}
type StatsDTCPListener struct {
conn *net.TCPListener
eventHandler eventHandler
logger log.Logger
}
func (l *StatsDTCPListener) SetEventHandler(eh eventHandler) {
l.eventHandler = eh
}
func (l *StatsDTCPListener) Listen() {
for {
c, err := l.conn.AcceptTCP()
if err != nil {
// https://github.com/golang/go/issues/4373
// ignore net: errClosing error as it will occur during shutdown
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
level.Error(l.logger).Log("msg", "AcceptTCP failed", "error", err)
os.Exit(1)
}
go l.handleConn(c)
}
}
func (l *StatsDTCPListener) handleConn(c *net.TCPConn) {
defer c.Close()
tcpConnections.Inc()
r := bufio.NewReader(c)
for {
line, isPrefix, err := r.ReadLine()
if err != nil {
if err != io.EOF {
tcpErrors.Inc()
level.Debug(l.logger).Log("msg", "Read failed", "addr", c.RemoteAddr(), "error", err)
}
break
}
if isPrefix {
tcpLineTooLong.Inc()
level.Debug(l.logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr())
break
}
linesReceived.Inc()
l.eventHandler.queue(lineToEvents(string(line), l.logger))
}
}
type StatsDUnixgramListener struct {
conn *net.UnixConn
eventHandler eventHandler
logger log.Logger
}
func (l *StatsDUnixgramListener) SetEventHandler(eh eventHandler) {
l.eventHandler = eh
}
func (l *StatsDUnixgramListener) Listen() {
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUnix(buf)
if err != nil {
// https://github.com/golang/go/issues/4373
// ignore net: errClosing error as it will occur during shutdown
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
level.Error(l.logger).Log(err)
os.Exit(1)
}
l.handlePacket(buf[:n])
}
}
func (l *StatsDUnixgramListener) handlePacket(packet []byte) {
unixgramPackets.Inc()
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
linesReceived.Inc()
l.eventHandler.queue(lineToEvents(string(line), l.logger))
}
}

View file

@ -18,6 +18,9 @@ import (
"testing"
"github.com/go-kit/kit/log"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
@ -42,12 +45,12 @@ func benchmarkUDPListener(times int, b *testing.B) {
}
for n := 0; n < b.N; n++ {
// there are more events than input lines, need bigger buffer
events := make(chan Events, len(bytesInput)*times*2)
l := StatsDUDPListener{eventHandler: &unbufferedEventHandler{c: events}}
events := make(chan event.Events, len(bytesInput)*times*2)
l := listener.StatsDUDPListener{EventHandler: &event.UnbufferedEventHandler{C: events}}
for i := 0; i < times; i++ {
for _, line := range bytesInput {
l.handlePacket([]byte(line))
l.HandlePacket([]byte(line))
}
}
}
@ -64,52 +67,52 @@ func BenchmarkUDPListener50(b *testing.B) {
}
func BenchmarkExporterListener(b *testing.B) {
events := Events{
&CounterEvent{ // simple counter
metricName: "counter",
value: 2,
events := event.Events{
&event.CounterEvent{ // simple counter
CMetricName: "counter",
CValue: 2,
},
&GaugeEvent{ // simple gauge
metricName: "gauge",
value: 10,
&event.GaugeEvent{ // simple gauge
GMetricName: "gauge",
GValue: 10,
},
&TimerEvent{ // simple timer
metricName: "timer",
value: 200,
&event.TimerEvent{ // simple timer
TMetricName: "timer",
TValue: 200,
},
&TimerEvent{ // simple histogram
metricName: "histogram.test",
value: 200,
&event.TimerEvent{ // simple histogram
TMetricName: "histogram.test",
TValue: 200,
},
&CounterEvent{ // simple_tags
metricName: "simple_tags",
value: 100,
labels: map[string]string{
&event.CounterEvent{ // simple_tags
CMetricName: "simple_tags",
CValue: 100,
CLabels: map[string]string{
"alpha": "bar",
"bravo": "baz",
},
},
&CounterEvent{ // slightly different tags
metricName: "simple_tags",
value: 100,
labels: map[string]string{
&event.CounterEvent{ // slightly different tags
CMetricName: "simple_tags",
CValue: 100,
CLabels: map[string]string{
"alpha": "bar",
"charlie": "baz",
},
},
&CounterEvent{ // and even more different tags
metricName: "simple_tags",
value: 100,
labels: map[string]string{
&event.CounterEvent{ // and even more different tags
CMetricName: "simple_tags",
CValue: 100,
CLabels: map[string]string{
"alpha": "bar",
"bravo": "baz",
"golf": "looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong",
},
},
&CounterEvent{ // datadog tag extension with complex tags
metricName: "foo",
value: 100,
labels: map[string]string{
&event.CounterEvent{ // datadog tag extension with complex tags
CMetricName: "foo",
CValue: 100,
CLabels: map[string]string{
"action": "test",
"application": "testapp",
"application_component": "testcomp",
@ -139,9 +142,9 @@ mappings:
b.Fatalf("Config load error: %s %s", config, err)
}
ex := NewExporter(testMapper, log.NewNopLogger())
ex := exporter.NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
for i := 0; i < b.N; i++ {
ec := make(chan Events, 1000)
ec := make(chan event.Events, 1000)
go func() {
for i := 0; i < 1000; i++ {
ec <- events

261
main.go
View file

@ -15,7 +15,6 @@ package main
import (
"bufio"
"fmt"
"net"
"net/http"
_ "net/http/pprof"
@ -33,11 +32,171 @@ import (
"github.com/prometheus/common/version"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/prometheus/statsd_exporter/pkg/address"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
const (
defaultHelp = "Metric autogenerated by statsd_exporter."
regErrF = "Failed to update metric"
)
var (
eventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_total",
Help: "The total number of StatsD events seen.",
},
[]string{"type"},
)
eventsFlushed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_event_queue_flushed_total",
Help: "Number of times events were flushed to exporter",
},
)
eventsUnmapped = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_events_unmapped_total",
Help: "The total number of StatsD events no mapping was found for.",
})
udpPackets = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packets_total",
Help: "The total number of StatsD packets received over UDP.",
},
)
tcpConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total",
Help: "The total number of TCP connections handled.",
},
)
tcpErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connection_errors_total",
Help: "The number of errors encountered reading from TCP.",
},
)
tcpLineTooLong = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_too_long_lines_total",
Help: "The number of lines discarded due to being too long.",
},
)
unixgramPackets = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_unixgram_packets_total",
Help: "The total number of StatsD packets received over Unixgram.",
},
)
linesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_lines_total",
Help: "The total number of StatsD lines received.",
},
)
samplesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_samples_total",
Help: "The total number of StatsD samples received.",
},
)
sampleErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_sample_errors_total",
Help: "The total number of errors parsing StatsD samples.",
},
[]string{"reason"},
)
tagsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tags_total",
Help: "The total number of DogStatsD tags processed.",
},
)
tagErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tag_errors_total",
Help: "The number of errors parsing DogStatsD tags.",
},
)
configLoads = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_config_reloads_total",
Help: "The number of configuration reloads.",
},
[]string{"outcome"},
)
mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "statsd_exporter_loaded_mappings",
Help: "The current number of configured metric mappings.",
})
conflictingEventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_conflict_total",
Help: "The total number of StatsD events with conflicting names.",
},
[]string{"type"},
)
errorEventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_error_total",
Help: "The total number of StatsD events discarded due to errors.",
},
[]string{"reason"},
)
eventsActions = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_actions_total",
Help: "The total number of StatsD events by action.",
},
[]string{"action"},
)
metricsCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "statsd_exporter_metrics_total",
Help: "The total number of metrics.",
},
[]string{"type"},
)
)
func init() {
prometheus.MustRegister(version.NewCollector("statsd_exporter"))
prometheus.MustRegister(eventStats)
prometheus.MustRegister(eventsFlushed)
prometheus.MustRegister(eventsUnmapped)
prometheus.MustRegister(udpPackets)
prometheus.MustRegister(tcpConnections)
prometheus.MustRegister(tcpErrors)
prometheus.MustRegister(tcpLineTooLong)
prometheus.MustRegister(unixgramPackets)
prometheus.MustRegister(linesReceived)
prometheus.MustRegister(samplesReceived)
prometheus.MustRegister(sampleErrors)
prometheus.MustRegister(tagsReceived)
prometheus.MustRegister(tagErrors)
prometheus.MustRegister(configLoads)
prometheus.MustRegister(mappingsCount)
prometheus.MustRegister(conflictingEventStats)
prometheus.MustRegister(errorEventStats)
prometheus.MustRegister(eventsActions)
prometheus.MustRegister(metricsCount)
}
// uncheckedCollector wraps a Collector but its Describe method yields no Desc.
// This allows incoming metrics to have inconsistent label sets
type uncheckedCollector struct {
c prometheus.Collector
}
func (u uncheckedCollector) Describe(_ chan<- *prometheus.Desc) {}
func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
u.c.Collect(c)
}
func serveHTTP(listenAddress, metricsEndpoint string, logger log.Logger) {
@ -55,52 +214,6 @@ func serveHTTP(listenAddress, metricsEndpoint string, logger log.Logger) {
os.Exit(1)
}
func ipPortFromString(addr string) (*net.IPAddr, int, error) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, 0, fmt.Errorf("bad StatsD listening address: %s", addr)
}
if host == "" {
host = "0.0.0.0"
}
ip, err := net.ResolveIPAddr("ip", host)
if err != nil {
return nil, 0, fmt.Errorf("Unable to resolve %s: %s", host, err)
}
port, err := strconv.Atoi(portStr)
if err != nil || port < 0 || port > 65535 {
return nil, 0, fmt.Errorf("Bad port %s: %s", portStr, err)
}
return ip, port, nil
}
func udpAddrFromString(addr string) (*net.UDPAddr, error) {
ip, port, err := ipPortFromString(addr)
if err != nil {
return nil, err
}
return &net.UDPAddr{
IP: ip.IP,
Port: port,
Zone: ip.Zone,
}, nil
}
func tcpAddrFromString(addr string) (*net.TCPAddr, error) {
ip, port, err := ipPortFromString(addr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: ip.IP,
Port: port,
Zone: ip.Zone,
}, nil
}
func configReloader(fileName string, mapper *mapper.MetricMapper, cacheSize int, logger log.Logger, option mapper.CacheOption) {
signals := make(chan os.Signal, 1)
@ -177,12 +290,12 @@ func main() {
go serveHTTP(*listenAddress, *metricsEndpoint, logger)
events := make(chan Events, *eventQueueSize)
events := make(chan event.Events, *eventQueueSize)
defer close(events)
eventQueue := newEventQueue(events, *eventFlushThreshold, *eventFlushInterval)
eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed)
if *statsdListenUDP != "" {
udpListenAddr, err := udpAddrFromString(*statsdListenUDP)
udpListenAddr, err := address.UDPAddrFromString(*statsdListenUDP)
if err != nil {
level.Error(logger).Log("msg", "invalid UDP listen address", "address", *statsdListenUDP, "error", err)
os.Exit(1)
@ -201,12 +314,24 @@ func main() {
}
}
ul := &StatsDUDPListener{conn: uconn, eventHandler: eventQueue, logger: logger}
ul := &listener.StatsDUDPListener{
Conn: uconn,
EventHandler: eventQueue,
Logger: logger,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}
go ul.Listen()
}
if *statsdListenTCP != "" {
tcpListenAddr, err := tcpAddrFromString(*statsdListenTCP)
tcpListenAddr, err := address.TCPAddrFromString(*statsdListenTCP)
if err != nil {
level.Error(logger).Log("msg", "invalid TCP listen address", "address", *statsdListenUDP, "error", err)
os.Exit(1)
@ -218,7 +343,21 @@ func main() {
}
defer tconn.Close()
tl := &StatsDTCPListener{conn: tconn, eventHandler: eventQueue, logger: logger}
tl := &listener.StatsDTCPListener{
Conn: tconn,
EventHandler: eventQueue,
Logger: logger,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}
go tl.Listen()
}
@ -247,7 +386,19 @@ func main() {
}
}
ul := &StatsDUnixgramListener{conn: uxgconn, eventHandler: eventQueue, logger: logger}
ul := &listener.StatsDUnixgramListener{
Conn: uxgconn,
EventHandler: eventQueue,
Logger: logger,
UnixgramPackets: unixgramPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}
go ul.Listen()
// if it's an abstract unix domain socket, it won't exist on fs
@ -291,7 +442,7 @@ func main() {
go configReloader(*mappingConfig, mapper, *cacheSize, logger, cacheOption)
exporter := NewExporter(mapper, logger)
exporter := exporter.NewExporter(mapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)

66
pkg/address/address.go Normal file
View file

@ -0,0 +1,66 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package address
import (
"fmt"
"net"
"strconv"
)
func IPPortFromString(addr string) (*net.IPAddr, int, error) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, 0, fmt.Errorf("bad StatsD listening address: %s", addr)
}
if host == "" {
host = "0.0.0.0"
}
ip, err := net.ResolveIPAddr("ip", host)
if err != nil {
return nil, 0, fmt.Errorf("Unable to resolve %s: %s", host, err)
}
port, err := strconv.Atoi(portStr)
if err != nil || port < 0 || port > 65535 {
return nil, 0, fmt.Errorf("Bad port %s: %s", portStr, err)
}
return ip, port, nil
}
func UDPAddrFromString(addr string) (*net.UDPAddr, error) {
ip, port, err := IPPortFromString(addr)
if err != nil {
return nil, err
}
return &net.UDPAddr{
IP: ip.IP,
Port: port,
Zone: ip.Zone,
}, nil
}
func TCPAddrFromString(addr string) (*net.TCPAddr, error) {
ip, port, err := IPPortFromString(addr)
if err != nil {
return nil, err
}
return &net.TCPAddr{
IP: ip.IP,
Port: port,
Zone: ip.Zone,
}, nil
}

View file

@ -11,12 +11,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package main
package event
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
@ -29,105 +30,108 @@ type Event interface {
}
type CounterEvent struct {
metricName string
value float64
labels map[string]string
CMetricName string
CValue float64
CLabels map[string]string
}
func (c *CounterEvent) MetricName() string { return c.metricName }
func (c *CounterEvent) Value() float64 { return c.value }
func (c *CounterEvent) Labels() map[string]string { return c.labels }
func (c *CounterEvent) MetricName() string { return c.CMetricName }
func (c *CounterEvent) Value() float64 { return c.CValue }
func (c *CounterEvent) Labels() map[string]string { return c.CLabels }
func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter }
type GaugeEvent struct {
metricName string
value float64
relative bool
labels map[string]string
GMetricName string
GValue float64
GRelative bool
GLabels map[string]string
}
func (g *GaugeEvent) MetricName() string { return g.metricName }
func (g *GaugeEvent) Value() float64 { return g.value }
func (c *GaugeEvent) Labels() map[string]string { return c.labels }
func (g *GaugeEvent) MetricName() string { return g.GMetricName }
func (g *GaugeEvent) Value() float64 { return g.GValue }
func (c *GaugeEvent) Labels() map[string]string { return c.GLabels }
func (c *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge }
type TimerEvent struct {
metricName string
value float64
labels map[string]string
TMetricName string
TValue float64
TLabels map[string]string
}
func (t *TimerEvent) MetricName() string { return t.metricName }
func (t *TimerEvent) Value() float64 { return t.value }
func (c *TimerEvent) Labels() map[string]string { return c.labels }
func (t *TimerEvent) MetricName() string { return t.TMetricName }
func (t *TimerEvent) Value() float64 { return t.TValue }
func (c *TimerEvent) Labels() map[string]string { return c.TLabels }
func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer }
type Events []Event
type eventQueue struct {
c chan Events
type EventQueue struct {
C chan Events
q Events
m sync.Mutex
flushThreshold int
flushTicker *time.Ticker
flushThreshold int
flushInterval time.Duration
eventsFlushed prometheus.Counter
}
type eventHandler interface {
queue(event Events)
type EventHandler interface {
Queue(event Events)
}
func newEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration) *eventQueue {
func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration, eventsFlushed prometheus.Counter) *EventQueue {
ticker := clock.NewTicker(flushInterval)
eq := &eventQueue{
c: c,
eq := &EventQueue{
C: c,
flushThreshold: flushThreshold,
flushTicker: ticker,
q: make([]Event, 0, flushThreshold),
eventsFlushed: eventsFlushed,
}
go func() {
for {
<-ticker.C
eq.flush()
eq.Flush()
}
}()
return eq
}
func (eq *eventQueue) queue(events Events) {
func (eq *EventQueue) Queue(events Events) {
eq.m.Lock()
defer eq.m.Unlock()
for _, e := range events {
eq.q = append(eq.q, e)
if len(eq.q) >= eq.flushThreshold {
eq.flushUnlocked()
eq.FlushUnlocked()
}
}
}
func (eq *eventQueue) flush() {
func (eq *EventQueue) Flush() {
eq.m.Lock()
defer eq.m.Unlock()
eq.flushUnlocked()
eq.FlushUnlocked()
}
func (eq *eventQueue) flushUnlocked() {
eq.c <- eq.q
func (eq *EventQueue) FlushUnlocked() {
eq.C <- eq.q
eq.q = make([]Event, 0, cap(eq.q))
eventsFlushed.Inc()
eq.eventsFlushed.Inc()
}
func (eq *eventQueue) len() int {
func (eq *EventQueue) Len() int {
eq.m.Lock()
defer eq.m.Unlock()
return len(eq.q)
}
type unbufferedEventHandler struct {
c chan Events
type UnbufferedEventHandler struct {
C chan Events
}
func (ueh *unbufferedEventHandler) queue(events Events) {
ueh.c <- events
func (ueh *UnbufferedEventHandler) Queue(events Events) {
ueh.C <- events
}

View file

@ -11,22 +11,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package main
package event
import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock"
)
var eventsFlushed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_event_queue_flushed_total",
Help: "Number of times events were flushed to exporter",
},
)
func TestEventThresholdFlush(t *testing.T) {
c := make(chan Events, 100)
// We're not going to flush during this test, so the duration doesn't matter.
eq := newEventQueue(c, 5, time.Second)
eq := NewEventQueue(c, 5, time.Second, eventsFlushed)
e := make(Events, 13)
go func() {
eq.queue(e)
eq.Queue(e)
}()
batch := <-c
@ -52,25 +60,25 @@ func TestEventIntervalFlush(t *testing.T) {
clock.ClockInstance.Instant = time.Unix(0, 0)
c := make(chan Events, 100)
eq := newEventQueue(c, 1000, time.Second*1000)
eq := NewEventQueue(c, 1000, time.Second*1000, eventsFlushed)
e := make(Events, 10)
eq.queue(e)
eq.Queue(e)
if eq.len() != 10 {
t.Fatal("Expected 10 events to be queued, but got", eq.len())
if eq.Len() != 10 {
t.Fatal("Expected 10 events to be queued, but got", eq.Len())
}
if len(eq.c) != 0 {
t.Fatal("Expected 0 events in the event channel, but got", len(eq.c))
if len(eq.C) != 0 {
t.Fatal("Expected 0 events in the event channel, but got", len(eq.C))
}
// Tick time forward to trigger a flush
clock.ClockInstance.Instant = time.Unix(10000, 0)
clock.ClockInstance.TickerCh <- time.Unix(10000, 0)
events := <-eq.c
if eq.len() != 0 {
t.Fatal("Expected 0 events to be queued, but got", eq.len())
events := <-eq.C
if eq.Len() != 0 {
t.Fatal("Expected 0 events to be queued, but got", eq.Len())
}
if len(events) != 10 {

196
pkg/exporter/exporter.go Normal file
View file

@ -0,0 +1,196 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exporter
import (
"os"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/registry"
)
const (
defaultHelp = "Metric autogenerated by statsd_exporter."
regErrF = "Failed to update metric"
)
type Exporter struct {
Mapper *mapper.MetricMapper
Registry *registry.Registry
Logger log.Logger
EventsActions *prometheus.CounterVec
EventsUnmapped prometheus.Counter
ErrorEventStats *prometheus.CounterVec
EventStats *prometheus.CounterVec
ConflictingEventStats *prometheus.CounterVec
MetricsCount *prometheus.GaugeVec
}
// Listen handles all events sent to the given channel sequentially. It
// terminates when the channel is closed.
func (b *Exporter) Listen(e <-chan event.Events) {
removeStaleMetricsTicker := clock.NewTicker(time.Second)
for {
select {
case <-removeStaleMetricsTicker.C:
b.Registry.RemoveStaleMetrics()
case events, ok := <-e:
if !ok {
level.Debug(b.Logger).Log("msg", "Channel is closed. Break out of Exporter.Listener.")
removeStaleMetricsTicker.Stop()
return
}
for _, event := range events {
b.handleEvent(event)
}
}
}
}
// handleEvent processes a single Event according to the configured mapping.
func (b *Exporter) handleEvent(thisEvent event.Event) {
mapping, labels, present := b.Mapper.GetMapping(thisEvent.MetricName(), thisEvent.MetricType())
if mapping == nil {
mapping = &mapper.MetricMapping{}
if b.Mapper.Defaults.Ttl != 0 {
mapping.Ttl = b.Mapper.Defaults.Ttl
}
}
if mapping.Action == mapper.ActionTypeDrop {
b.EventsActions.WithLabelValues("drop").Inc()
return
}
metricName := ""
help := defaultHelp
if mapping.HelpText != "" {
help = mapping.HelpText
}
prometheusLabels := thisEvent.Labels()
if present {
if mapping.Name == "" {
level.Debug(b.Logger).Log("msg", "The mapping generates an empty metric name", "metric_name", thisEvent.MetricName(), "match", mapping.Match)
b.ErrorEventStats.WithLabelValues("empty_metric_name").Inc()
return
}
metricName = mapper.EscapeMetricName(mapping.Name)
for label, value := range labels {
prometheusLabels[label] = value
}
b.EventsActions.WithLabelValues(string(mapping.Action)).Inc()
} else {
b.EventsUnmapped.Inc()
metricName = mapper.EscapeMetricName(thisEvent.MetricName())
}
switch ev := thisEvent.(type) {
case *event.CounterEvent:
// We don't accept negative values for counters. Incrementing the counter with a negative number
// will cause the exporter to panic. Instead we will warn and continue to the next event.
if thisEvent.Value() < 0.0 {
level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value())
b.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc()
return
}
counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil {
counter.Add(thisEvent.Value())
b.EventStats.WithLabelValues("counter").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
b.ConflictingEventStats.WithLabelValues("counter").Inc()
}
case *event.GaugeEvent:
gauge, err := b.Registry.GetGauge(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil {
if ev.GRelative {
gauge.Add(thisEvent.Value())
} else {
gauge.Set(thisEvent.Value())
}
b.EventStats.WithLabelValues("gauge").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
b.ConflictingEventStats.WithLabelValues("gauge").Inc()
}
case *event.TimerEvent:
t := mapper.TimerTypeDefault
if mapping != nil {
t = mapping.TimerType
}
if t == mapper.TimerTypeDefault {
t = b.Mapper.Defaults.TimerType
}
switch t {
case mapper.TimerTypeHistogram:
histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil {
histogram.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond
b.EventStats.WithLabelValues("timer").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
b.ConflictingEventStats.WithLabelValues("timer").Inc()
}
case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, b.MetricsCount)
if err == nil {
summary.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond
b.EventStats.WithLabelValues("timer").Inc()
} else {
level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err)
b.ConflictingEventStats.WithLabelValues("timer").Inc()
}
default:
level.Error(b.Logger).Log("msg", "unknown timer type", "type", t)
os.Exit(1)
}
default:
level.Debug(b.Logger).Log("msg", "Unsupported event type")
b.EventStats.WithLabelValues("illegal").Inc()
}
}
func NewExporter(mapper *mapper.MetricMapper, logger log.Logger, eventsActions *prometheus.CounterVec, eventsUnmapped prometheus.Counter, errorEventStats *prometheus.CounterVec, eventStats *prometheus.CounterVec, conflictingEventStats *prometheus.CounterVec, metricsCount *prometheus.GaugeVec) *Exporter {
return &Exporter{
Mapper: mapper,
Registry: registry.NewRegistry(mapper),
Logger: logger,
EventsActions: eventsActions,
EventsUnmapped: eventsUnmapped,
ErrorEventStats: errorEventStats,
EventStats: eventStats,
ConflictingEventStats: conflictingEventStats,
MetricsCount: metricsCount,
}
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package main
package exporter
import (
"fmt"
@ -24,7 +24,132 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/registry"
)
var (
eventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_total",
Help: "The total number of StatsD events seen.",
},
[]string{"type"},
)
eventsFlushed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_event_queue_flushed_total",
Help: "Number of times events were flushed to exporter",
},
)
eventsUnmapped = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_events_unmapped_total",
Help: "The total number of StatsD events no mapping was found for.",
})
udpPackets = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packets_total",
Help: "The total number of StatsD packets received over UDP.",
},
)
tcpConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total",
Help: "The total number of TCP connections handled.",
},
)
tcpErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connection_errors_total",
Help: "The number of errors encountered reading from TCP.",
},
)
tcpLineTooLong = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_too_long_lines_total",
Help: "The number of lines discarded due to being too long.",
},
)
unixgramPackets = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_unixgram_packets_total",
Help: "The total number of StatsD packets received over Unixgram.",
},
)
linesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_lines_total",
Help: "The total number of StatsD lines received.",
},
)
samplesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_samples_total",
Help: "The total number of StatsD samples received.",
},
)
sampleErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_sample_errors_total",
Help: "The total number of errors parsing StatsD samples.",
},
[]string{"reason"},
)
tagsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tags_total",
Help: "The total number of DogStatsD tags processed.",
},
)
tagErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tag_errors_total",
Help: "The number of errors parsing DogStatsD tags.",
},
)
configLoads = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_config_reloads_total",
Help: "The number of configuration reloads.",
},
[]string{"outcome"},
)
mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "statsd_exporter_loaded_mappings",
Help: "The current number of configured metric mappings.",
})
conflictingEventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_conflict_total",
Help: "The total number of StatsD events with conflicting names.",
},
[]string{"type"},
)
errorEventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_error_total",
Help: "The total number of StatsD events discarded due to errors.",
},
[]string{"reason"},
)
eventsActions = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_actions_total",
Help: "The total number of StatsD events by action.",
},
[]string{"action"},
)
metricsCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "statsd_exporter_metrics_total",
Help: "The total number of metrics.",
},
[]string{"type"},
)
)
// TestNegativeCounter validates when we send a negative
@ -41,12 +166,12 @@ func TestNegativeCounter(t *testing.T) {
}
}()
events := make(chan Events)
events := make(chan event.Events)
go func() {
c := Events{
&CounterEvent{
metricName: "foo",
value: -1,
c := event.Events{
&event.CounterEvent{
CMetricName: "foo",
CValue: -1,
},
}
events <- c
@ -59,7 +184,7 @@ func TestNegativeCounter(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger())
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
updated := getTelemetryCounterValue(errorCounter)
@ -80,48 +205,48 @@ func TestInconsistentLabelSets(t *testing.T) {
secondLabelSet["foo"] = "1"
secondLabelSet["bar"] = "2"
events := make(chan Events)
events := make(chan event.Events)
go func() {
c := Events{
&CounterEvent{
metricName: "counter_test",
value: 1,
labels: firstLabelSet,
c := event.Events{
&event.CounterEvent{
CMetricName: "counter_test",
CValue: 1,
CLabels: firstLabelSet,
},
&CounterEvent{
metricName: "counter_test",
value: 1,
labels: secondLabelSet,
&event.CounterEvent{
CMetricName: "counter_test",
CValue: 1,
CLabels: secondLabelSet,
},
&GaugeEvent{
metricName: "gauge_test",
value: 1,
labels: firstLabelSet,
&event.GaugeEvent{
GMetricName: "gauge_test",
GValue: 1,
GLabels: firstLabelSet,
},
&GaugeEvent{
metricName: "gauge_test",
value: 1,
labels: secondLabelSet,
&event.GaugeEvent{
GMetricName: "gauge_test",
GValue: 1,
GLabels: secondLabelSet,
},
&TimerEvent{
metricName: "histogram.test",
value: 1,
labels: firstLabelSet,
&event.TimerEvent{
TMetricName: "histogram.test",
TValue: 1,
TLabels: firstLabelSet,
},
&TimerEvent{
metricName: "histogram.test",
value: 1,
labels: secondLabelSet,
&event.TimerEvent{
TMetricName: "histogram.test",
TValue: 1,
TLabels: secondLabelSet,
},
&TimerEvent{
metricName: "summary_test",
value: 1,
labels: firstLabelSet,
&event.TimerEvent{
TMetricName: "summary_test",
TValue: 1,
TLabels: firstLabelSet,
},
&TimerEvent{
metricName: "summary_test",
value: 1,
labels: secondLabelSet,
&event.TimerEvent{
TMetricName: "summary_test",
TValue: 1,
TLabels: secondLabelSet,
},
}
events <- c
@ -140,7 +265,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err)
}
ex := NewExporter(testMapper, log.NewNopLogger())
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather()
@ -166,18 +291,18 @@ mappings:
func TestLabelParsing(t *testing.T) {
codes := [2]string{"200", "300"}
events := make(chan Events)
events := make(chan event.Events)
go func() {
c := Events{
&CounterEvent{
metricName: "counter.test.200",
value: 1,
labels: make(map[string]string),
c := event.Events{
&event.CounterEvent{
CMetricName: "counter.test.200",
CValue: 1,
CLabels: make(map[string]string),
},
&CounterEvent{
metricName: "counter.test.300",
value: 1,
labels: make(map[string]string),
&event.CounterEvent{
CMetricName: "counter.test.300",
CValue: 1,
CLabels: make(map[string]string),
},
}
events <- c
@ -198,7 +323,7 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err)
}
ex := NewExporter(testMapper, log.NewNopLogger())
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather()
@ -222,173 +347,173 @@ func TestConflictingMetrics(t *testing.T) {
scenarios := []struct {
name string
expected []float64
in Events
in event.Events
}{
{
name: "counter vs gauge",
expected: []float64{1},
in: Events{
&CounterEvent{
metricName: "cvg_test",
value: 1,
in: event.Events{
&event.CounterEvent{
CMetricName: "cvg_test",
CValue: 1,
},
&GaugeEvent{
metricName: "cvg_test",
value: 2,
&event.GaugeEvent{
GMetricName: "cvg_test",
GValue: 2,
},
},
},
{
name: "counter vs gauge with different labels",
expected: []float64{1, 2},
in: Events{
&CounterEvent{
metricName: "cvgl_test",
value: 1,
labels: map[string]string{"tag": "1"},
in: event.Events{
&event.CounterEvent{
CMetricName: "cvgl_test",
CValue: 1,
CLabels: map[string]string{"tag": "1"},
},
&CounterEvent{
metricName: "cvgl_test",
value: 2,
labels: map[string]string{"tag": "2"},
&event.CounterEvent{
CMetricName: "cvgl_test",
CValue: 2,
CLabels: map[string]string{"tag": "2"},
},
&GaugeEvent{
metricName: "cvgl_test",
value: 3,
labels: map[string]string{"tag": "1"},
&event.GaugeEvent{
GMetricName: "cvgl_test",
GValue: 3,
GLabels: map[string]string{"tag": "1"},
},
},
},
{
name: "counter vs gauge with same labels",
expected: []float64{3},
in: Events{
&CounterEvent{
metricName: "cvgsl_test",
value: 1,
labels: map[string]string{"tag": "1"},
in: event.Events{
&event.CounterEvent{
CMetricName: "cvgsl_test",
CValue: 1,
CLabels: map[string]string{"tag": "1"},
},
&CounterEvent{
metricName: "cvgsl_test",
value: 2,
labels: map[string]string{"tag": "1"},
&event.CounterEvent{
CMetricName: "cvgsl_test",
CValue: 2,
CLabels: map[string]string{"tag": "1"},
},
&GaugeEvent{
metricName: "cvgsl_test",
value: 3,
labels: map[string]string{"tag": "1"},
&event.GaugeEvent{
GMetricName: "cvgsl_test",
GValue: 3,
GLabels: map[string]string{"tag": "1"},
},
},
},
{
name: "gauge vs counter",
expected: []float64{2},
in: Events{
&GaugeEvent{
metricName: "gvc_test",
value: 2,
in: event.Events{
&event.GaugeEvent{
GMetricName: "gvc_test",
GValue: 2,
},
&CounterEvent{
metricName: "gvc_test",
value: 1,
&event.CounterEvent{
CMetricName: "gvc_test",
CValue: 1,
},
},
},
{
name: "counter vs histogram",
expected: []float64{1},
in: Events{
&CounterEvent{
metricName: "histogram_test1",
value: 1,
in: event.Events{
&event.CounterEvent{
CMetricName: "histogram_test1",
CValue: 1,
},
&TimerEvent{
metricName: "histogram.test1",
value: 2,
&event.TimerEvent{
TMetricName: "histogram.test1",
TValue: 2,
},
},
},
{
name: "counter vs histogram sum",
expected: []float64{1},
in: Events{
&CounterEvent{
metricName: "histogram_test1_sum",
value: 1,
in: event.Events{
&event.CounterEvent{
CMetricName: "histogram_test1_sum",
CValue: 1,
},
&TimerEvent{
metricName: "histogram.test1",
value: 2,
&event.TimerEvent{
TMetricName: "histogram.test1",
TValue: 2,
},
},
},
{
name: "counter vs histogram count",
expected: []float64{1},
in: Events{
&CounterEvent{
metricName: "histogram_test2_count",
value: 1,
in: event.Events{
&event.CounterEvent{
CMetricName: "histogram_test2_count",
CValue: 1,
},
&TimerEvent{
metricName: "histogram.test2",
value: 2,
&event.TimerEvent{
TMetricName: "histogram.test2",
TValue: 2,
},
},
},
{
name: "counter vs histogram bucket",
expected: []float64{1},
in: Events{
&CounterEvent{
metricName: "histogram_test3_bucket",
value: 1,
in: event.Events{
&event.CounterEvent{
CMetricName: "histogram_test3_bucket",
CValue: 1,
},
&TimerEvent{
metricName: "histogram.test3",
value: 2,
&event.TimerEvent{
TMetricName: "histogram.test3",
TValue: 2,
},
},
},
{
name: "counter vs summary quantile",
expected: []float64{1},
in: Events{
&CounterEvent{
metricName: "cvsq_test",
value: 1,
in: event.Events{
&event.CounterEvent{
CMetricName: "cvsq_test",
CValue: 1,
},
&TimerEvent{
metricName: "cvsq_test",
value: 2,
&event.TimerEvent{
TMetricName: "cvsq_test",
TValue: 2,
},
},
},
{
name: "counter vs summary count",
expected: []float64{1},
in: Events{
&CounterEvent{
metricName: "cvsc_count",
value: 1,
in: event.Events{
&event.CounterEvent{
CMetricName: "cvsc_count",
CValue: 1,
},
&TimerEvent{
metricName: "cvsc",
value: 2,
&event.TimerEvent{
TMetricName: "cvsc",
TValue: 2,
},
},
},
{
name: "counter vs summary sum",
expected: []float64{1},
in: Events{
&CounterEvent{
metricName: "cvss_sum",
value: 1,
in: event.Events{
&event.CounterEvent{
CMetricName: "cvss_sum",
CValue: 1,
},
&TimerEvent{
metricName: "cvss",
value: 2,
&event.TimerEvent{
TMetricName: "cvss",
TValue: 2,
},
},
},
@ -408,12 +533,12 @@ mappings:
t.Fatalf("Config load error: %s %s", config, err)
}
events := make(chan Events)
events := make(chan event.Events)
go func() {
events <- s.in
close(events)
}()
ex := NewExporter(testMapper, log.NewNopLogger())
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
metrics, err := prometheus.DefaultGatherer.Gather()
@ -441,12 +566,12 @@ mappings:
// being the empty string after applying the match replacements
// tha we don't panic the Exporter Listener.
func TestEmptyStringMetric(t *testing.T) {
events := make(chan Events)
events := make(chan event.Events)
go func() {
c := Events{
&CounterEvent{
metricName: "foo_bar",
value: 1,
c := event.Events{
&event.CounterEvent{
CMetricName: "foo_bar",
CValue: 1,
},
}
events <- c
@ -468,7 +593,7 @@ mappings:
errorCounter := errorEventStats.WithLabelValues("empty_metric_name")
prev := getTelemetryCounterValue(errorCounter)
ex := NewExporter(testMapper, log.NewNopLogger())
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
updated := getTelemetryCounterValue(errorCounter)
@ -489,13 +614,37 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
}
}()
events := make(chan Events)
ueh := &unbufferedEventHandler{c: events}
events := make(chan event.Events)
ueh := &event.UnbufferedEventHandler{C: events}
go func() {
for _, l := range []statsDPacketHandler{&StatsDUDPListener{nil, nil, log.NewNopLogger()}, &mockStatsDTCPListener{StatsDTCPListener{nil, nil, log.NewNopLogger()}, log.NewNopLogger()}} {
for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}, &mockStatsDTCPListener{listener.StatsDTCPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}, log.NewNopLogger()}} {
l.SetEventHandler(ueh)
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"))
l.HandlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"))
}
close(events)
}()
@ -503,7 +652,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger())
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}
@ -512,24 +661,24 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
// as well as the sum/count metrics
func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
// Start exporter with a synchronous channel
events := make(chan Events)
events := make(chan event.Events)
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger())
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
name := "default_foo"
c := Events{
&TimerEvent{
metricName: name,
value: 300,
c := event.Events{
&event.TimerEvent{
TMetricName: name,
TValue: 300,
},
}
events <- c
events <- Events{}
events <- event.Events{}
close(events)
metrics, err := prometheus.DefaultGatherer.Gather()
@ -557,26 +706,26 @@ func TestSummaryWithQuantilesEmptyMapping(t *testing.T) {
func TestHistogramUnits(t *testing.T) {
// Start exporter with a synchronous channel
events := make(chan Events)
events := make(chan event.Events)
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger())
ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Mapper.Defaults.TimerType = mapper.TimerTypeHistogram
ex.Listen(events)
}()
// Synchronously send a statsd event to wait for handleEvent execution.
// Then close events channel to stop a listener.
name := "foo"
c := Events{
&TimerEvent{
metricName: name,
value: 300,
c := event.Events{
&event.TimerEvent{
TMetricName: name,
TValue: 300,
},
}
events <- c
events <- Events{}
events <- event.Events{}
close(events)
// Check histogram value
@ -596,11 +745,11 @@ func TestHistogramUnits(t *testing.T) {
}
func TestCounterIncrement(t *testing.T) {
// Start exporter with a synchronous channel
events := make(chan Events)
events := make(chan event.Events)
go func() {
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper, log.NewNopLogger())
ex := NewExporter(&testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
@ -610,21 +759,21 @@ func TestCounterIncrement(t *testing.T) {
labels := map[string]string{
"foo": "bar",
}
c := Events{
&CounterEvent{
metricName: name,
value: 1,
labels: labels,
c := event.Events{
&event.CounterEvent{
CMetricName: name,
CValue: 1,
CLabels: labels,
},
&CounterEvent{
metricName: name,
value: 1,
labels: labels,
&event.CounterEvent{
CMetricName: name,
CValue: 1,
CLabels: labels,
},
}
events <- c
// Push empty event so that we block until the first event is consumed.
events <- Events{}
events <- event.Events{}
close(events)
// Check histogram value
@ -642,16 +791,16 @@ func TestCounterIncrement(t *testing.T) {
}
type statsDPacketHandler interface {
handlePacket(packet []byte)
SetEventHandler(eh eventHandler)
HandlePacket(packet []byte)
SetEventHandler(eh event.EventHandler)
}
type mockStatsDTCPListener struct {
StatsDTCPListener
listener.StatsDTCPListener
log.Logger
}
func (ml *mockStatsDTCPListener) handlePacket(packet []byte) {
func (ml *mockStatsDTCPListener) HandlePacket(packet []byte) {
// Forcing IPv4 because the TravisCI build environment does not have IPv6
// addresses.
lc, err := net.ListenTCP("tcp4", nil)
@ -679,7 +828,7 @@ func (ml *mockStatsDTCPListener) handlePacket(packet []byte) {
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err))
}
ml.handleConn(sc)
ml.HandleConn(sc)
}
// TestTtlExpiration validates expiration of time series.
@ -706,23 +855,23 @@ mappings:
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
events := make(chan Events)
events := make(chan event.Events)
defer close(events)
go func() {
ex := NewExporter(testMapper, log.NewNopLogger())
ex := NewExporter(testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}()
ev := Events{
ev := event.Events{
// event with default ttl = 1s
&GaugeEvent{
metricName: "foobar",
value: 200,
&event.GaugeEvent{
GMetricName: "foobar",
GValue: 200,
},
// event with ttl = 2s from a mapping
&TimerEvent{
metricName: "bazqux.main",
value: 42000,
&event.TimerEvent{
TMetricName: "bazqux.main",
TValue: 42000,
},
}
@ -735,7 +884,7 @@ mappings:
// saveLabelValues will use fake instant as a lastRegisteredAt time.
clock.ClockInstance.Instant = time.Unix(0, 0)
events <- ev
events <- Events{}
events <- event.Events{}
// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
@ -757,7 +906,7 @@ mappings:
// Step 2. Increase Instant to emulate metrics expiration after 1s
clock.ClockInstance.Instant = time.Unix(1, 10)
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
events <- Events{}
events <- event.Events{}
// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
@ -779,7 +928,7 @@ mappings:
// Step 3. Increase Instant to emulate metrics expiration after 2s
clock.ClockInstance.Instant = time.Unix(2, 200)
clock.ClockInstance.TickerCh <- time.Unix(0, 0)
events <- Events{}
events <- event.Events{}
// Check values
metrics, err = prometheus.DefaultGatherer.Gather()
@ -797,32 +946,32 @@ mappings:
}
func TestHashLabelNames(t *testing.T) {
r := newRegistry(nil)
r := registry.NewRegistry(nil)
// Validate value hash changes and name has doesn't when just the value changes.
hash1, _ := r.hashLabels(map[string]string{
hash1, _ := r.HashLabels(map[string]string{
"label": "value1",
})
hash2, _ := r.hashLabels(map[string]string{
hash2, _ := r.HashLabels(map[string]string{
"label": "value2",
})
if hash1.names != hash2.names {
if hash1.Names != hash2.Names {
t.Fatal("Hash of label names should match, but doesn't")
}
if hash1.values == hash2.values {
if hash1.Values == hash2.Values {
t.Fatal("Hash of label names shouldn't match, but do")
}
// Validate value and name hashes change when the name changes.
hash1, _ = r.hashLabels(map[string]string{
hash1, _ = r.HashLabels(map[string]string{
"label1": "value",
})
hash2, _ = r.hashLabels(map[string]string{
hash2, _ = r.HashLabels(map[string]string{
"label2": "value",
})
if hash1.names == hash2.names {
if hash1.Names == hash2.Names {
t.Fatal("Hash of label names shouldn't match, but do")
}
if hash1.values == hash2.values {
if hash1.Values == hash2.Values {
t.Fatal("Hash of label names shouldn't match, but do")
}
}
@ -916,7 +1065,7 @@ func BenchmarkParseDogStatsDTags(b *testing.B) {
b.Run(name, func(b *testing.B) {
for n := 0; n < b.N; n++ {
labels := map[string]string{}
parseDogStatsDTags(tags, labels, log.NewNopLogger())
line.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger())
}
})
}
@ -953,11 +1102,11 @@ func BenchmarkHashNameAndLabels(b *testing.B) {
},
}
r := newRegistry(nil)
r := registry.NewRegistry(nil)
for _, s := range scenarios {
b.Run(s.name, func(b *testing.B) {
for n := 0; n < b.N; n++ {
r.hashLabels(s.labels)
r.HashLabels(s.labels)
}
})
}

254
pkg/line/line.go Normal file
View file

@ -0,0 +1,254 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package line
import (
"fmt"
"strconv"
"strings"
"unicode/utf8"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (event.Event, error) {
switch statType {
case "c":
return &event.CounterEvent{
CMetricName: metric,
CValue: float64(value),
CLabels: labels,
}, nil
case "g":
return &event.GaugeEvent{
GMetricName: metric,
GValue: float64(value),
GRelative: relative,
GLabels: labels,
}, nil
case "ms", "h", "d":
return &event.TimerEvent{
TMetricName: metric,
TValue: float64(value),
TLabels: labels,
}, nil
case "s":
return nil, fmt.Errorf("no support for StatsD sets")
default:
return nil, fmt.Errorf("bad stat type %s", statType)
}
}
func parseTag(component, tag string, separator rune, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) {
// Entirely empty tag is an error
if len(tag) == 0 {
tagErrors.Inc()
level.Debug(logger).Log("msg", "Empty name tag", "component", component)
return
}
for i, c := range tag {
if c == separator {
k := tag[:i]
v := tag[i+1:]
if len(k) == 0 || len(v) == 0 {
// Empty key or value is an error
tagErrors.Inc()
level.Debug(logger).Log("msg", "Malformed name tag", "k", k, "v", v, "component", component)
} else {
labels[mapper.EscapeMetricName(k)] = v
}
return
}
}
// Missing separator (no value) is an error
tagErrors.Inc()
level.Debug(logger).Log("msg", "Malformed name tag", "tag", tag, "component", component)
}
func parseNameTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) {
lastTagEndIndex := 0
for i, c := range component {
if c == ',' {
tag := component[lastTagEndIndex:i]
lastTagEndIndex = i + 1
parseTag(component, tag, '=', labels, tagErrors, logger)
}
}
// If we're not off the end of the string, add the last tag
if lastTagEndIndex < len(component) {
tag := component[lastTagEndIndex:]
parseTag(component, tag, '=', labels, tagErrors, logger)
}
}
func trimLeftHash(s string) string {
if s != "" && s[0] == '#' {
return s[1:]
}
return s
}
func ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) {
lastTagEndIndex := 0
for i, c := range component {
if c == ',' {
tag := component[lastTagEndIndex:i]
lastTagEndIndex = i + 1
parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger)
}
}
// If we're not off the end of the string, add the last tag
if lastTagEndIndex < len(component) {
tag := component[lastTagEndIndex:]
parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger)
}
}
func parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string {
for i, c := range name {
// `#` delimits start of tags by Librato
// https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags
// `,` delimits start of tags by InfluxDB
// https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd
if c == '#' || c == ',' {
parseNameTags(name[i+1:], labels, tagErrors, logger)
return name[:i]
}
}
return name
}
func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events {
events := event.Events{}
if line == "" {
return events
}
elements := strings.SplitN(line, ":", 2)
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
sampleErrors.WithLabelValues("malformed_line").Inc()
level.Debug(logger).Log("msg", "Bad line from StatsD", "line", line)
return events
}
labels := map[string]string{}
metric := parseNameAndTags(elements[0], labels, tagErrors, logger)
var samples []string
if strings.Contains(elements[1], "|#") {
// using DogStatsD tags
// don't allow mixed tagging styles
if len(labels) > 0 {
sampleErrors.WithLabelValues("mixed_tagging_styles").Inc()
level.Debug(logger).Log("msg", "Bad line (multiple tagging styles) from StatsD", "line", line)
return events
}
// disable multi-metrics
samples = elements[1:]
} else {
samples = strings.Split(elements[1], ":")
}
samples:
for _, sample := range samples {
samplesReceived.Inc()
components := strings.Split(sample, "|")
samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 {
sampleErrors.WithLabelValues("malformed_component").Inc()
level.Debug(logger).Log("msg", "Bad component", "line", line)
continue
}
valueStr, statType := components[0], components[1]
var relative = false
if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 {
relative = true
}
value, err := strconv.ParseFloat(valueStr, 64)
if err != nil {
level.Debug(logger).Log("msg", "Bad value", "value", valueStr, "line", line)
sampleErrors.WithLabelValues("malformed_value").Inc()
continue
}
multiplyEvents := 1
if len(components) >= 3 {
for _, component := range components[2:] {
if len(component) == 0 {
level.Debug(logger).Log("msg", "Empty component", "line", line)
sampleErrors.WithLabelValues("malformed_component").Inc()
continue samples
}
}
for _, component := range components[2:] {
switch component[0] {
case '@':
samplingFactor, err = strconv.ParseFloat(component[1:], 64)
if err != nil {
level.Debug(logger).Log("msg", "Invalid sampling factor", "component", component[1:], "line", line)
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
}
if samplingFactor == 0 {
samplingFactor = 1
}
if statType == "g" {
continue
} else if statType == "c" {
value /= samplingFactor
} else if statType == "ms" || statType == "h" || statType == "d" {
multiplyEvents = int(1 / samplingFactor)
}
case '#':
ParseDogStatsDTags(component[1:], labels, tagErrors, logger)
default:
level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line)
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
continue
}
}
}
if len(labels) > 0 {
tagsReceived.Inc()
}
for i := 0; i < multiplyEvents; i++ {
event, err := buildEvent(statType, metric, value, relative, labels)
if err != nil {
level.Debug(logger).Log("msg", "Error building event", "line", line, "error", err)
sampleErrors.WithLabelValues("illegal_event").Inc()
continue
}
events = append(events, event)
}
}
return events
}

174
pkg/listener/listener.go Normal file
View file

@ -0,0 +1,174 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package listener
import (
"bufio"
"io"
"net"
"os"
"strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/event"
pkgLine "github.com/prometheus/statsd_exporter/pkg/line"
)
type StatsDUDPListener struct {
Conn *net.UDPConn
EventHandler event.EventHandler
Logger log.Logger
UDPPackets prometheus.Counter
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
}
func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) {
l.EventHandler = eh
}
func (l *StatsDUDPListener) Listen() {
buf := make([]byte, 65535)
for {
n, _, err := l.Conn.ReadFromUDP(buf)
if err != nil {
// https://github.com/golang/go/issues/4373
// ignore net: errClosing error as it will occur during shutdown
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
level.Error(l.Logger).Log("error", err)
return
}
l.HandlePacket(buf[0:n])
}
}
func (l *StatsDUDPListener) HandlePacket(packet []byte) {
l.UDPPackets.Inc()
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
}
}
type StatsDTCPListener struct {
Conn *net.TCPListener
EventHandler event.EventHandler
Logger log.Logger
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
TCPConnections prometheus.Counter
TCPErrors prometheus.Counter
TCPLineTooLong prometheus.Counter
}
func (l *StatsDTCPListener) SetEventHandler(eh event.EventHandler) {
l.EventHandler = eh
}
func (l *StatsDTCPListener) Listen() {
for {
c, err := l.Conn.AcceptTCP()
if err != nil {
// https://github.com/golang/go/issues/4373
// ignore net: errClosing error as it will occur during shutdown
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
level.Error(l.Logger).Log("msg", "AcceptTCP failed", "error", err)
os.Exit(1)
}
go l.HandleConn(c)
}
}
func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) {
defer c.Close()
l.TCPConnections.Inc()
r := bufio.NewReader(c)
for {
line, isPrefix, err := r.ReadLine()
if err != nil {
if err != io.EOF {
l.TCPErrors.Inc()
level.Debug(l.Logger).Log("msg", "Read failed", "addr", c.RemoteAddr(), "error", err)
}
break
}
if isPrefix {
l.TCPLineTooLong.Inc()
level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr())
break
}
l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
}
}
type StatsDUnixgramListener struct {
Conn *net.UnixConn
EventHandler event.EventHandler
Logger log.Logger
UnixgramPackets prometheus.Counter
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
}
func (l *StatsDUnixgramListener) SetEventHandler(eh event.EventHandler) {
l.EventHandler = eh
}
func (l *StatsDUnixgramListener) Listen() {
buf := make([]byte, 65535)
for {
n, _, err := l.Conn.ReadFromUnix(buf)
if err != nil {
// https://github.com/golang/go/issues/4373
// ignore net: errClosing error as it will occur during shutdown
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
level.Error(l.Logger).Log(err)
os.Exit(1)
}
l.HandlePacket(buf[:n])
}
}
func (l *StatsDUnixgramListener) HandlePacket(packet []byte) {
l.UnixgramPackets.Inc()
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
}
}

67
pkg/metrics/metrics.go Normal file
View file

@ -0,0 +1,67 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type MetricType int
const (
CounterMetricType MetricType = iota
GaugeMetricType
SummaryMetricType
HistogramMetricType
)
type NameHash uint64
type ValueHash uint64
type LabelHash struct {
// This is a hash over the label names
Names NameHash
// This is a hash over the label names + label values
Values ValueHash
}
type MetricHolder interface{}
type VectorHolder interface {
Delete(label prometheus.Labels) bool
}
type Vector struct {
Holder VectorHolder
RefCount uint64
}
type Metric struct {
MetricType MetricType
// Vectors key is the hash of the label names
Vectors map[NameHash]*Vector
// Metrics key is a hash of the label names + label values
Metrics map[ValueHash]*RegisteredMetric
}
type RegisteredMetric struct {
LastRegisteredAt time.Time
Labels prometheus.Labels
TTL time.Duration
Metric MetricHolder
VecKey NameHash
}

383
pkg/registry/registry.go Normal file
View file

@ -0,0 +1,383 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry
import (
"bytes"
"fmt"
"hash"
"hash/fnv"
"sort"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/metrics"
)
// uncheckedCollector wraps a Collector but its Describe method yields no Desc.
// This allows incoming metrics to have inconsistent label sets
type uncheckedCollector struct {
c prometheus.Collector
}
func (u uncheckedCollector) Describe(_ chan<- *prometheus.Desc) {}
func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
u.c.Collect(c)
}
type Registry struct {
Metrics map[string]metrics.Metric
Mapper *mapper.MetricMapper
// The below value and label variables are allocated in the registry struct
// so that we don't have to allocate them every time have to compute a label
// hash.
ValueBuf, NameBuf bytes.Buffer
Hasher hash.Hash64
}
func NewRegistry(mapper *mapper.MetricMapper) *Registry {
return &Registry{
Metrics: make(map[string]metrics.Metric),
Mapper: mapper,
Hasher: fnv.New64a(),
}
}
func (r *Registry) MetricConflicts(metricName string, metricType metrics.MetricType) bool {
vector, hasMetrics := r.Metrics[metricName]
if !hasMetrics {
// No metrics.Metric with this name exists
return false
}
if vector.MetricType == metricType {
// We've found a copy of this metrics.Metric with this type, but different
// labels, so it's safe to create a new one.
return false
}
// The metrics.Metric exists, but it's of a different type than we're trying to
// create.
return true
}
func (r *Registry) StoreCounter(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vec *prometheus.CounterVec, c prometheus.Counter, ttl time.Duration) {
r.Store(metricName, hash, labels, vec, c, metrics.CounterMetricType, ttl)
}
func (r *Registry) StoreGauge(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vec *prometheus.GaugeVec, g prometheus.Counter, ttl time.Duration) {
r.Store(metricName, hash, labels, vec, g, metrics.GaugeMetricType, ttl)
}
func (r *Registry) StoreHistogram(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vec *prometheus.HistogramVec, o prometheus.Observer, ttl time.Duration) {
r.Store(metricName, hash, labels, vec, o, metrics.HistogramMetricType, ttl)
}
func (r *Registry) StoreSummary(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vec *prometheus.SummaryVec, o prometheus.Observer, ttl time.Duration) {
r.Store(metricName, hash, labels, vec, o, metrics.SummaryMetricType, ttl)
}
func (r *Registry) Store(metricName string, hash metrics.LabelHash, labels prometheus.Labels, vh metrics.VectorHolder, mh metrics.MetricHolder, metricType metrics.MetricType, ttl time.Duration) {
metric, hasMetrics := r.Metrics[metricName]
if !hasMetrics {
metric.MetricType = metricType
metric.Vectors = make(map[metrics.NameHash]*metrics.Vector)
metric.Metrics = make(map[metrics.ValueHash]*metrics.RegisteredMetric)
r.Metrics[metricName] = metric
}
v, ok := metric.Vectors[hash.Names]
if !ok {
v = &metrics.Vector{Holder: vh}
metric.Vectors[hash.Names] = v
}
rm, ok := metric.Metrics[hash.Values]
if !ok {
rm = &metrics.RegisteredMetric{
Labels: labels,
TTL: ttl,
Metric: mh,
VecKey: hash.Names,
}
metric.Metrics[hash.Values] = rm
v.RefCount++
}
now := clock.Now()
rm.LastRegisteredAt = now
// Update ttl from mapping
rm.TTL = ttl
}
func (r *Registry) Get(metricName string, hash metrics.LabelHash, metricType metrics.MetricType) (metrics.VectorHolder, metrics.MetricHolder) {
metric, hasMetric := r.Metrics[metricName]
if !hasMetric {
return nil, nil
}
if metric.MetricType != metricType {
return nil, nil
}
rm, ok := metric.Metrics[hash.Values]
if ok {
now := clock.Now()
rm.LastRegisteredAt = now
return metric.Vectors[hash.Names].Holder, rm.Metric
}
vector, ok := metric.Vectors[hash.Names]
if ok {
return vector.Holder, nil
}
return nil, nil
}
func (r *Registry) GetCounter(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping, metricsCount *prometheus.GaugeVec) (prometheus.Counter, error) {
hash, labelNames := r.HashLabels(labels)
vh, mh := r.Get(metricName, hash, metrics.CounterMetricType)
if mh != nil {
return mh.(prometheus.Counter), nil
}
if r.MetricConflicts(metricName, metrics.CounterMetricType) {
return nil, fmt.Errorf("Metric with name %s is already registered", metricName)
}
var counterVec *prometheus.CounterVec
if vh == nil {
metricsCount.WithLabelValues("counter").Inc()
counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: metricName,
Help: help,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil {
return nil, err
}
} else {
counterVec = vh.(*prometheus.CounterVec)
}
var counter prometheus.Counter
var err error
if counter, err = counterVec.GetMetricWith(labels); err != nil {
return nil, err
}
r.StoreCounter(metricName, hash, labels, counterVec, counter, mapping.Ttl)
return counter, nil
}
func (r *Registry) GetGauge(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping, metricsCount *prometheus.GaugeVec) (prometheus.Gauge, error) {
hash, labelNames := r.HashLabels(labels)
vh, mh := r.Get(metricName, hash, metrics.GaugeMetricType)
if mh != nil {
return mh.(prometheus.Gauge), nil
}
if r.MetricConflicts(metricName, metrics.GaugeMetricType) {
return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName)
}
var gaugeVec *prometheus.GaugeVec
if vh == nil {
metricsCount.WithLabelValues("gauge").Inc()
gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: metricName,
Help: help,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil {
return nil, err
}
} else {
gaugeVec = vh.(*prometheus.GaugeVec)
}
var gauge prometheus.Gauge
var err error
if gauge, err = gaugeVec.GetMetricWith(labels); err != nil {
return nil, err
}
r.StoreGauge(metricName, hash, labels, gaugeVec, gauge, mapping.Ttl)
return gauge, nil
}
func (r *Registry) GetHistogram(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping, metricsCount *prometheus.GaugeVec) (prometheus.Observer, error) {
hash, labelNames := r.HashLabels(labels)
vh, mh := r.Get(metricName, hash, metrics.HistogramMetricType)
if mh != nil {
return mh.(prometheus.Observer), nil
}
if r.MetricConflicts(metricName, metrics.HistogramMetricType) {
return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName)
}
if r.MetricConflicts(metricName+"_sum", metrics.HistogramMetricType) {
return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName)
}
if r.MetricConflicts(metricName+"_count", metrics.HistogramMetricType) {
return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName)
}
if r.MetricConflicts(metricName+"_bucket", metrics.HistogramMetricType) {
return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName)
}
var histogramVec *prometheus.HistogramVec
if vh == nil {
metricsCount.WithLabelValues("histogram").Inc()
buckets := r.Mapper.Defaults.Buckets
if mapping.HistogramOptions != nil && len(mapping.HistogramOptions.Buckets) > 0 {
buckets = mapping.HistogramOptions.Buckets
}
histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: metricName,
Help: help,
Buckets: buckets,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil {
return nil, err
}
} else {
histogramVec = vh.(*prometheus.HistogramVec)
}
var observer prometheus.Observer
var err error
if observer, err = histogramVec.GetMetricWith(labels); err != nil {
return nil, err
}
r.StoreHistogram(metricName, hash, labels, histogramVec, observer, mapping.Ttl)
return observer, nil
}
func (r *Registry) GetSummary(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping, metricsCount *prometheus.GaugeVec) (prometheus.Observer, error) {
hash, labelNames := r.HashLabels(labels)
vh, mh := r.Get(metricName, hash, metrics.SummaryMetricType)
if mh != nil {
return mh.(prometheus.Observer), nil
}
if r.MetricConflicts(metricName, metrics.SummaryMetricType) {
return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName)
}
if r.MetricConflicts(metricName+"_sum", metrics.SummaryMetricType) {
return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName)
}
if r.MetricConflicts(metricName+"_count", metrics.SummaryMetricType) {
return nil, fmt.Errorf("metrics.Metric with name %s is already registered", metricName)
}
var summaryVec *prometheus.SummaryVec
if vh == nil {
metricsCount.WithLabelValues("summary").Inc()
quantiles := r.Mapper.Defaults.Quantiles
if mapping != nil && mapping.SummaryOptions != nil && len(mapping.SummaryOptions.Quantiles) > 0 {
quantiles = mapping.SummaryOptions.Quantiles
}
summaryOptions := mapper.SummaryOptions{}
if mapping != nil && mapping.SummaryOptions != nil {
summaryOptions = *mapping.SummaryOptions
}
objectives := make(map[float64]float64)
for _, q := range quantiles {
objectives[q.Quantile] = q.Error
}
// In the case of no mapping file, explicitly define the default quantiles
if len(objectives) == 0 {
objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
}
summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: metricName,
Help: help,
Objectives: objectives,
MaxAge: summaryOptions.MaxAge,
AgeBuckets: summaryOptions.AgeBuckets,
BufCap: summaryOptions.BufCap,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil {
return nil, err
}
} else {
summaryVec = vh.(*prometheus.SummaryVec)
}
var observer prometheus.Observer
var err error
if observer, err = summaryVec.GetMetricWith(labels); err != nil {
return nil, err
}
r.StoreSummary(metricName, hash, labels, summaryVec, observer, mapping.Ttl)
return observer, nil
}
func (r *Registry) RemoveStaleMetrics() {
now := clock.Now()
// delete timeseries with expired ttl
for _, metric := range r.Metrics {
for hash, rm := range metric.Metrics {
if rm.TTL == 0 {
continue
}
if rm.LastRegisteredAt.Add(rm.TTL).Before(now) {
metric.Vectors[rm.VecKey].Holder.Delete(rm.Labels)
metric.Vectors[rm.VecKey].RefCount--
delete(metric.Metrics, hash)
}
}
}
}
// Calculates a hash of both the label names and the label names and values.
func (r *Registry) HashLabels(labels prometheus.Labels) (metrics.LabelHash, []string) {
r.Hasher.Reset()
r.NameBuf.Reset()
r.ValueBuf.Reset()
labelNames := make([]string, 0, len(labels))
for labelName := range labels {
labelNames = append(labelNames, labelName)
}
sort.Strings(labelNames)
r.ValueBuf.WriteByte(model.SeparatorByte)
for _, labelName := range labelNames {
r.ValueBuf.WriteString(labels[labelName])
r.ValueBuf.WriteByte(model.SeparatorByte)
r.NameBuf.WriteString(labelName)
r.NameBuf.WriteByte(model.SeparatorByte)
}
lh := metrics.LabelHash{}
r.Hasher.Write(r.NameBuf.Bytes())
lh.Names = metrics.NameHash(r.Hasher.Sum64())
// Now add the values to the names we've already hashed.
r.Hasher.Write(r.ValueBuf.Bytes())
lh.Values = metrics.ValueHash(r.Hasher.Sum64())
return lh, labelNames
}

View file

@ -1,416 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"fmt"
"hash"
"hash/fnv"
"sort"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
type metricType int
const (
CounterMetricType metricType = iota
GaugeMetricType
SummaryMetricType
HistogramMetricType
)
type nameHash uint64
type valueHash uint64
type labelHash struct {
// This is a hash over the label names
names nameHash
// This is a hash over the label names + label values
values valueHash
}
type metricHolder interface{}
type registeredMetric struct {
lastRegisteredAt time.Time
labels prometheus.Labels
ttl time.Duration
metric metricHolder
vecKey nameHash
}
type vectorHolder interface {
Delete(label prometheus.Labels) bool
}
type vector struct {
holder vectorHolder
refCount uint64
}
type metric struct {
metricType metricType
// Vectors key is the hash of the label names
vectors map[nameHash]*vector
// Metrics key is a hash of the label names + label values
metrics map[valueHash]*registeredMetric
}
type registry struct {
metrics map[string]metric
mapper *mapper.MetricMapper
// The below value and label variables are allocated in the registry struct
// so that we don't have to allocate them every time have to compute a label
// hash.
valueBuf, nameBuf bytes.Buffer
hasher hash.Hash64
}
func newRegistry(mapper *mapper.MetricMapper) *registry {
return &registry{
metrics: make(map[string]metric),
mapper: mapper,
hasher: fnv.New64a(),
}
}
func (r *registry) metricConflicts(metricName string, metricType metricType) bool {
vector, hasMetric := r.metrics[metricName]
if !hasMetric {
// No metric with this name exists
return false
}
if vector.metricType == metricType {
// We've found a copy of this metric with this type, but different
// labels, so it's safe to create a new one.
return false
}
// The metric exists, but it's of a different type than we're trying to
// create.
return true
}
func (r *registry) storeCounter(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.CounterVec, c prometheus.Counter, ttl time.Duration) {
r.store(metricName, hash, labels, vec, c, CounterMetricType, ttl)
}
func (r *registry) storeGauge(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.GaugeVec, g prometheus.Counter, ttl time.Duration) {
r.store(metricName, hash, labels, vec, g, GaugeMetricType, ttl)
}
func (r *registry) storeHistogram(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.HistogramVec, o prometheus.Observer, ttl time.Duration) {
r.store(metricName, hash, labels, vec, o, HistogramMetricType, ttl)
}
func (r *registry) storeSummary(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.SummaryVec, o prometheus.Observer, ttl time.Duration) {
r.store(metricName, hash, labels, vec, o, SummaryMetricType, ttl)
}
func (r *registry) store(metricName string, hash labelHash, labels prometheus.Labels, vh vectorHolder, mh metricHolder, metricType metricType, ttl time.Duration) {
metric, hasMetric := r.metrics[metricName]
if !hasMetric {
metric.metricType = metricType
metric.vectors = make(map[nameHash]*vector)
metric.metrics = make(map[valueHash]*registeredMetric)
r.metrics[metricName] = metric
}
v, ok := metric.vectors[hash.names]
if !ok {
v = &vector{holder: vh}
metric.vectors[hash.names] = v
}
rm, ok := metric.metrics[hash.values]
if !ok {
rm = &registeredMetric{
labels: labels,
ttl: ttl,
metric: mh,
vecKey: hash.names,
}
metric.metrics[hash.values] = rm
v.refCount++
}
now := clock.Now()
rm.lastRegisteredAt = now
// Update ttl from mapping
rm.ttl = ttl
}
func (r *registry) get(metricName string, hash labelHash, metricType metricType) (vectorHolder, metricHolder) {
metric, hasMetric := r.metrics[metricName]
if !hasMetric {
return nil, nil
}
if metric.metricType != metricType {
return nil, nil
}
rm, ok := metric.metrics[hash.values]
if ok {
now := clock.Now()
rm.lastRegisteredAt = now
return metric.vectors[hash.names].holder, rm.metric
}
vector, ok := metric.vectors[hash.names]
if ok {
return vector.holder, nil
}
return nil, nil
}
func (r *registry) getCounter(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Counter, error) {
hash, labelNames := r.hashLabels(labels)
vh, mh := r.get(metricName, hash, CounterMetricType)
if mh != nil {
return mh.(prometheus.Counter), nil
}
if r.metricConflicts(metricName, CounterMetricType) {
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
}
var counterVec *prometheus.CounterVec
if vh == nil {
metricsCount.WithLabelValues("counter").Inc()
counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: metricName,
Help: help,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil {
return nil, err
}
} else {
counterVec = vh.(*prometheus.CounterVec)
}
var counter prometheus.Counter
var err error
if counter, err = counterVec.GetMetricWith(labels); err != nil {
return nil, err
}
r.storeCounter(metricName, hash, labels, counterVec, counter, mapping.Ttl)
return counter, nil
}
func (r *registry) getGauge(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Gauge, error) {
hash, labelNames := r.hashLabels(labels)
vh, mh := r.get(metricName, hash, GaugeMetricType)
if mh != nil {
return mh.(prometheus.Gauge), nil
}
if r.metricConflicts(metricName, GaugeMetricType) {
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
}
var gaugeVec *prometheus.GaugeVec
if vh == nil {
metricsCount.WithLabelValues("gauge").Inc()
gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: metricName,
Help: help,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil {
return nil, err
}
} else {
gaugeVec = vh.(*prometheus.GaugeVec)
}
var gauge prometheus.Gauge
var err error
if gauge, err = gaugeVec.GetMetricWith(labels); err != nil {
return nil, err
}
r.storeGauge(metricName, hash, labels, gaugeVec, gauge, mapping.Ttl)
return gauge, nil
}
func (r *registry) getHistogram(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
hash, labelNames := r.hashLabels(labels)
vh, mh := r.get(metricName, hash, HistogramMetricType)
if mh != nil {
return mh.(prometheus.Observer), nil
}
if r.metricConflicts(metricName, HistogramMetricType) {
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
}
if r.metricConflicts(metricName+"_sum", HistogramMetricType) {
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
}
if r.metricConflicts(metricName+"_count", HistogramMetricType) {
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
}
if r.metricConflicts(metricName+"_bucket", HistogramMetricType) {
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
}
var histogramVec *prometheus.HistogramVec
if vh == nil {
metricsCount.WithLabelValues("histogram").Inc()
buckets := r.mapper.Defaults.Buckets
if mapping.HistogramOptions != nil && len(mapping.HistogramOptions.Buckets) > 0 {
buckets = mapping.HistogramOptions.Buckets
}
histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: metricName,
Help: help,
Buckets: buckets,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil {
return nil, err
}
} else {
histogramVec = vh.(*prometheus.HistogramVec)
}
var observer prometheus.Observer
var err error
if observer, err = histogramVec.GetMetricWith(labels); err != nil {
return nil, err
}
r.storeHistogram(metricName, hash, labels, histogramVec, observer, mapping.Ttl)
return observer, nil
}
func (r *registry) getSummary(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) {
hash, labelNames := r.hashLabels(labels)
vh, mh := r.get(metricName, hash, SummaryMetricType)
if mh != nil {
return mh.(prometheus.Observer), nil
}
if r.metricConflicts(metricName, SummaryMetricType) {
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
}
if r.metricConflicts(metricName+"_sum", SummaryMetricType) {
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
}
if r.metricConflicts(metricName+"_count", SummaryMetricType) {
return nil, fmt.Errorf("metric with name %s is already registered", metricName)
}
var summaryVec *prometheus.SummaryVec
if vh == nil {
metricsCount.WithLabelValues("summary").Inc()
quantiles := r.mapper.Defaults.Quantiles
if mapping != nil && mapping.SummaryOptions != nil && len(mapping.SummaryOptions.Quantiles) > 0 {
quantiles = mapping.SummaryOptions.Quantiles
}
summaryOptions := mapper.SummaryOptions{}
if mapping != nil && mapping.SummaryOptions != nil {
summaryOptions = *mapping.SummaryOptions
}
objectives := make(map[float64]float64)
for _, q := range quantiles {
objectives[q.Quantile] = q.Error
}
// In the case of no mapping file, explicitly define the default quantiles
if len(objectives) == 0 {
objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}
}
summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: metricName,
Help: help,
Objectives: objectives,
MaxAge: summaryOptions.MaxAge,
AgeBuckets: summaryOptions.AgeBuckets,
BufCap: summaryOptions.BufCap,
}, labelNames)
if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil {
return nil, err
}
} else {
summaryVec = vh.(*prometheus.SummaryVec)
}
var observer prometheus.Observer
var err error
if observer, err = summaryVec.GetMetricWith(labels); err != nil {
return nil, err
}
r.storeSummary(metricName, hash, labels, summaryVec, observer, mapping.Ttl)
return observer, nil
}
func (r *registry) removeStaleMetrics() {
now := clock.Now()
// delete timeseries with expired ttl
for _, metric := range r.metrics {
for hash, rm := range metric.metrics {
if rm.ttl == 0 {
continue
}
if rm.lastRegisteredAt.Add(rm.ttl).Before(now) {
metric.vectors[rm.vecKey].holder.Delete(rm.labels)
metric.vectors[rm.vecKey].refCount--
delete(metric.metrics, hash)
}
}
}
}
// Calculates a hash of both the label names and the label names and values.
func (r *registry) hashLabels(labels prometheus.Labels) (labelHash, []string) {
r.hasher.Reset()
r.nameBuf.Reset()
r.valueBuf.Reset()
labelNames := make([]string, 0, len(labels))
for labelName := range labels {
labelNames = append(labelNames, labelName)
}
sort.Strings(labelNames)
r.valueBuf.WriteByte(model.SeparatorByte)
for _, labelName := range labelNames {
r.valueBuf.WriteString(labels[labelName])
r.valueBuf.WriteByte(model.SeparatorByte)
r.nameBuf.WriteString(labelName)
r.nameBuf.WriteByte(model.SeparatorByte)
}
lh := labelHash{}
r.hasher.Write(r.nameBuf.Bytes())
lh.names = nameHash(r.hasher.Sum64())
// Now add the values to the names we've already hashed.
r.hasher.Write(r.valueBuf.Bytes())
lh.values = valueHash(r.hasher.Sum64())
return lh, labelNames
}

View file

@ -1,160 +0,0 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
eventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_total",
Help: "The total number of StatsD events seen.",
},
[]string{"type"},
)
eventsFlushed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_event_queue_flushed_total",
Help: "Number of times events were flushed to exporter",
},
)
eventsUnmapped = prometheus.NewCounter(prometheus.CounterOpts{
Name: "statsd_exporter_events_unmapped_total",
Help: "The total number of StatsD events no mapping was found for.",
})
udpPackets = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packets_total",
Help: "The total number of StatsD packets received over UDP.",
},
)
tcpConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total",
Help: "The total number of TCP connections handled.",
},
)
tcpErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connection_errors_total",
Help: "The number of errors encountered reading from TCP.",
},
)
tcpLineTooLong = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_too_long_lines_total",
Help: "The number of lines discarded due to being too long.",
},
)
unixgramPackets = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_unixgram_packets_total",
Help: "The total number of StatsD packets received over Unixgram.",
},
)
linesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_lines_total",
Help: "The total number of StatsD lines received.",
},
)
samplesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_samples_total",
Help: "The total number of StatsD samples received.",
},
)
sampleErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_sample_errors_total",
Help: "The total number of errors parsing StatsD samples.",
},
[]string{"reason"},
)
tagsReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tags_total",
Help: "The total number of DogStatsD tags processed.",
},
)
tagErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tag_errors_total",
Help: "The number of errors parsing DogStatsD tags.",
},
)
configLoads = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_config_reloads_total",
Help: "The number of configuration reloads.",
},
[]string{"outcome"},
)
mappingsCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "statsd_exporter_loaded_mappings",
Help: "The current number of configured metric mappings.",
})
conflictingEventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_conflict_total",
Help: "The total number of StatsD events with conflicting names.",
},
[]string{"type"},
)
errorEventStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_error_total",
Help: "The total number of StatsD events discarded due to errors.",
},
[]string{"reason"},
)
eventsActions = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "statsd_exporter_events_actions_total",
Help: "The total number of StatsD events by action.",
},
[]string{"action"},
)
metricsCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "statsd_exporter_metrics_total",
Help: "The total number of metrics.",
},
[]string{"type"},
)
)
func init() {
prometheus.MustRegister(eventStats)
prometheus.MustRegister(eventsFlushed)
prometheus.MustRegister(eventsUnmapped)
prometheus.MustRegister(udpPackets)
prometheus.MustRegister(tcpConnections)
prometheus.MustRegister(tcpErrors)
prometheus.MustRegister(tcpLineTooLong)
prometheus.MustRegister(unixgramPackets)
prometheus.MustRegister(linesReceived)
prometheus.MustRegister(samplesReceived)
prometheus.MustRegister(sampleErrors)
prometheus.MustRegister(tagsReceived)
prometheus.MustRegister(tagErrors)
prometheus.MustRegister(configLoads)
prometheus.MustRegister(mappingsCount)
prometheus.MustRegister(conflictingEventStats)
prometheus.MustRegister(errorEventStats)
prometheus.MustRegister(eventsActions)
prometheus.MustRegister(metricsCount)
}

View file

@ -1,14 +1,14 @@
version: "{build}"
platform: x64
clone_folder: c:\gopath\src\github.com\sirupsen\logrus
environment:
GOPATH: c:\gopath
branches:
only:
- master
install:
- set PATH=%GOPATH%\bin;c:\go\bin;%PATH%
- go version
build_script:
- go get -t
- go test
version: "{build}"
platform: x64
clone_folder: c:\gopath\src\github.com\sirupsen\logrus
environment:
GOPATH: c:\gopath
branches:
only:
- master
install:
- set PATH=%GOPATH%\bin;c:\go\bin;%PATH%
- go version
build_script:
- go get -t
- go test