diff --git a/go.sum b/go.sum index bd2f8b0..5799ea8 100644 --- a/go.sum +++ b/go.sum @@ -56,7 +56,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= -github.com/prometheus/client_golang v1.5.0 h1:Ctq0iGpCmr3jeP77kbF2UxgvRwzWWz+4Bh9/vJTyg1A= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= diff --git a/pkg/event.go~ b/pkg/event.go~ deleted file mode 100644 index 8c9cc50..0000000 --- a/pkg/event.go~ +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "sync" - "time" - - "github.com/prometheus/statsd_exporter/pkg/clock" - "github.com/prometheus/statsd_exporter/pkg/mapper" -) - -type Event interface { - MetricName() string - Value() float64 - Labels() map[string]string - MetricType() mapper.MetricType -} - -type CounterEvent struct { - metricName string - value float64 - labels map[string]string -} - -func (c *CounterEvent) MetricName() string { return c.metricName } -func (c *CounterEvent) Value() float64 { return c.value } -func (c *CounterEvent) Labels() map[string]string { return c.labels } -func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter } - -type GaugeEvent struct { - metricName string - value float64 - relative bool - labels map[string]string -} - -func (g *GaugeEvent) MetricName() string { return g.metricName } -func (g *GaugeEvent) Value() float64 { return g.value } -func (c *GaugeEvent) Labels() map[string]string { return c.labels } -func (c *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge } - -type TimerEvent struct { - metricName string - value float64 - labels map[string]string -} - -func (t *TimerEvent) MetricName() string { return t.metricName } -func (t *TimerEvent) Value() float64 { return t.value } -func (c *TimerEvent) Labels() map[string]string { return c.labels } -func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer } - -type Events []Event - -type eventQueue struct { - c chan Events - q Events - m sync.Mutex - flushThreshold int - flushTicker *time.Ticker -} - -type eventHandler interface { - queue(event Events) -} - -func newEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration) *eventQueue { - ticker := clock.NewTicker(flushInterval) - eq := &eventQueue{ - c: c, - flushThreshold: flushThreshold, - flushTicker: ticker, - q: make([]Event, 0, flushThreshold), - } - go func() { - for { - <-ticker.C - eq.flush() - } - }() - return eq -} - -func (eq *eventQueue) queue(events Events) { - eq.m.Lock() - defer eq.m.Unlock() - - for _, e := range events { - eq.q = append(eq.q, e) - if len(eq.q) >= eq.flushThreshold { - eq.flushUnlocked() - } - } -} - -func (eq *eventQueue) flush() { - eq.m.Lock() - defer eq.m.Unlock() - eq.flushUnlocked() -} - -func (eq *eventQueue) flushUnlocked() { - eq.c <- eq.q - eq.q = make([]Event, 0, cap(eq.q)) - eventsFlushed.Inc() -} - -func (eq *eventQueue) len() int { - eq.m.Lock() - defer eq.m.Unlock() - - return len(eq.q) -} - -type unbufferedEventHandler struct { - c chan Events -} - -func (ueh *unbufferedEventHandler) queue(events Events) { - ueh.c <- events -} diff --git a/pkg/event/event.go~ b/pkg/event/event.go~ deleted file mode 100644 index 14c70b2..0000000 --- a/pkg/event/event.go~ +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package event - -import ( - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/statsd_exporter/pkg/clock" - "github.com/prometheus/statsd_exporter/pkg/mapper" -) - -type Event interface { - MetricName() string - Value() float64 - Labels() map[string]string - MetricType() mapper.MetricType -} - -type CounterEvent struct { - CMetricName string - CValue float64 - CLabels map[string]string -} - -func (c *CounterEvent) MetricName() string { return c.CMetricName } -func (c *CounterEvent) Value() float64 { return c.CValue } -func (c *CounterEvent) Labels() map[string]string { return c.CLabels } -func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter } - -type GaugeEvent struct { - GMetricName string - GValue float64 - GRelative bool - GLabels map[string]string -} - -func (g *GaugeEvent) MetricName() string { return g.GMetricName } -func (g *GaugeEvent) Value() float64 { return g.GValue } -func (c *GaugeEvent) Labels() map[string]string { return c.GLabels } -func (c *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge } - -type TimerEvent struct { - TMetricName string - TValue float64 - TLabels map[string]string -} - -func (t *TimerEvent) MetricName() string { return t.TMetricName } -func (t *TimerEvent) Value() float64 { return t.TValue } -func (c *TimerEvent) Labels() map[string]string { return c.TLabels } -func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer } - -type Events []Event - -type EventQueue struct { - C chan Events - q Events - m sync.Mutex - flushThreshold int - flushTicker *time.Ticker -} - -type EventHandler interface { - Queue(event Events, eventsFlushed *prometheus.Counter) -} - -func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration, eventsFlushed prometheus.Counter) *EventQueue { - ticker := clock.NewTicker(flushInterval) - eq := &EventQueue{ - C: c, - flushThreshold: flushThreshold, - flushTicker: ticker, - q: make([]Event, 0, flushThreshold), - } - go func() { - for { - <-ticker.C - eq.Flush(eventsFlushed) - } - }() - return eq -} - -func (eq *EventQueue) Queue(events Events, eventsFlushed *prometheus.Counter) { - eq.m.Lock() - defer eq.m.Unlock() - - for _, e := range events { - eq.q = append(eq.q, e) - if len(eq.q) >= eq.flushThreshold { - eq.FlushUnlocked(*eventsFlushed) - } - } -} - -func (eq *EventQueue) Flush(eventsFlushed prometheus.Counter) { - eq.m.Lock() - defer eq.m.Unlock() - eq.FlushUnlocked(eventsFlushed) -} - -func (eq *EventQueue) FlushUnlocked(eventsFlushed prometheus.Counter) { - eq.C <- eq.q - eq.q = make([]Event, 0, cap(eq.q)) - eventsFlushed.Inc() -} - -func (eq *EventQueue) Len() int { - eq.m.Lock() - defer eq.m.Unlock() - - return len(eq.q) -} - -type UnbufferedEventHandler struct { - C chan Events -} - -func (ueh *UnbufferedEventHandler) Queue(events Events) { - ueh.C <- events -} diff --git a/pkg/exporter/exporter.go~ b/pkg/exporter/exporter.go~ deleted file mode 100644 index cdc5e2c..0000000 --- a/pkg/exporter/exporter.go~ +++ /dev/null @@ -1,172 +0,0 @@ -package exporter - -import ( - "os" - "time" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/statsd_exporter/pkg/clock" - "github.com/prometheus/statsd_exporter/pkg/event" - "github.com/prometheus/statsd_exporter/pkg/mapper" - "github.com/prometheus/statsd_exporter/pkg/registry" -) - -const ( - defaultHelp = "Metric autogenerated by statsd_exporter." - regErrF = "Failed to update metric" -) - -type Exporter struct { - Mapper *mapper.MetricMapper - Registry *registry.Registry - Logger log.Logger -} - -// Listen handles all events sent to the given channel sequentially. It -// terminates when the channel is closed. -func (b *Exporter) Listen(e <-chan event.Events, thisEvent event.Event, eventsActions prometheus.GaugeVec, eventsUnmapped prometheus.Gauge, - errorEventStats prometheus.GaugeVec, eventStats prometheus.GaugeVec, conflictingEventStats prometheus.GaugeVec, metricsCount prometheus.GaugeVec, l func(string, log.Logger)) { - removeStaleMetricsTicker := clock.NewTicker(time.Second) - - for { - select { - case <-removeStaleMetricsTicker.C: - b.Registry.RemoveStaleMetrics() - case events, ok := <-e: - if !ok { - level.Debug(b.Logger).Log("msg", "Channel is closed. Break out of Exporter.Listener.") - removeStaleMetricsTicker.Stop() - return - } - for _, event := range events { - b.handleEvent(event, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount, l) - } - } - } -} - -// handleEvent processes a single Event according to the configured mapping. -func (b *Exporter) handleEvent(thisEvent event.Event, eventsActions prometheus.GaugeVec, eventsUnmapped prometheus.Gauge, - errorEventStats prometheus.GaugeVec, eventStats prometheus.GaugeVec, conflictingEventStats prometheus.GaugeVec, metricsCount prometheus.GaugeVec, l func(string, log.Logger)) { - - mapping, labels, present := b.Mapper.GetMapping(thisEvent.MetricName(), thisEvent.MetricType()) - if mapping == nil { - mapping = &mapper.MetricMapping{} - if b.Mapper.Defaults.Ttl != 0 { - mapping.Ttl = b.Mapper.Defaults.Ttl - } - } - - if mapping.Action == mapper.ActionTypeDrop { - eventsActions.WithLabelValues("drop").Inc() - return - } - - metricName := "" - - help := defaultHelp - if mapping.HelpText != "" { - help = mapping.HelpText - } - - prometheusLabels := thisEvent.Labels() - if present { - if mapping.Name == "" { - level.Debug(b.Logger).Log("msg", "The mapping generates an empty metric name", "metric_name", thisEvent.MetricName(), "match", mapping.Match) - errorEventStats.WithLabelValues("empty_metric_name").Inc() - return - } - metricName = mapper.EscapeMetricName(mapping.Name) - for label, value := range labels { - prometheusLabels[label] = value - } - eventsActions.WithLabelValues(string(mapping.Action)).Inc() - } else { - eventsUnmapped.Inc() - metricName = mapper.EscapeMetricName(thisEvent.MetricName()) - } - - switch ev := thisEvent.(type) { - case *event.CounterEvent: - // We don't accept negative values for counters. Incrementing the counter with a negative number - // will cause the exporter to panic. Instead we will warn and continue to the next event. - if thisEvent.Value() < 0.0 { - level.Debug(b.Logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", thisEvent.Value()) - errorEventStats.WithLabelValues("illegal_negative_counter").Inc() - return - } - - counter, err := b.Registry.GetCounter(metricName, prometheusLabels, help, mapping, &metricsCount) - if err == nil { - counter.Add(thisEvent.Value()) - eventStats.WithLabelValues("counter").Inc() - } else { - level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("counter").Inc() - } - - case *event.GaugeEvent: - gauge, err := b.Registry.GetGauge(metricName, prometheusLabels, help, mapping, &metricsCount) - - if err == nil { - if ev.GRelative { - gauge.Add(thisEvent.Value()) - } else { - gauge.Set(thisEvent.Value()) - } - eventStats.WithLabelValues("gauge").Inc() - } else { - level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("gauge").Inc() - } - - case *event.TimerEvent: - t := mapper.TimerTypeDefault - if mapping != nil { - t = mapping.TimerType - } - if t == mapper.TimerTypeDefault { - t = b.Mapper.Defaults.TimerType - } - - switch t { - case mapper.TimerTypeHistogram: - histogram, err := b.Registry.GetHistogram(metricName, prometheusLabels, help, mapping, &metricsCount) - if err == nil { - histogram.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond - eventStats.WithLabelValues("timer").Inc() - } else { - level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("timer").Inc() - } - - case mapper.TimerTypeDefault, mapper.TimerTypeSummary: - summary, err := b.Registry.GetSummary(metricName, prometheusLabels, help, mapping, &metricsCount) - if err == nil { - summary.Observe(thisEvent.Value() / 1000) // prometheus presumes seconds, statsd millisecond - eventStats.WithLabelValues("timer").Inc() - } else { - level.Debug(b.Logger).Log("msg", regErrF, "metric", metricName, "error", err) - conflictingEventStats.WithLabelValues("timer").Inc() - } - - default: - level.Error(b.Logger).Log("msg", "unknown timer type", "type", t) - os.Exit(1) - } - - default: - level.Debug(b.Logger).Log("msg", "Unsupported event type") - eventStats.WithLabelValues("illegal").Inc() - } -} - -func NewExporter(mapper *mapper.MetricMapper, logger log.Logger) *Exporter { - return &Exporter{ - Mapper: mapper, - Registry: registry.NewRegistry(mapper), - Logger: logger, - } -} diff --git a/pkg/line/line.go~ b/pkg/line/line.go~ deleted file mode 100644 index 2b26cb2..0000000 --- a/pkg/line/line.go~ +++ /dev/null @@ -1,241 +0,0 @@ -package line - -import ( - "fmt" - "strconv" - "strings" - "unicode/utf8" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/statsd_exporter/pkg/event" - "github.com/prometheus/statsd_exporter/pkg/mapper" -) - -func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (event.Event, error) { - switch statType { - case "c": - return &event.CounterEvent{ - CMetricName: metric, - CValue: float64(value), - CLabels: labels, - }, nil - case "g": - return &event.GaugeEvent{ - GMetricName: metric, - GValue: float64(value), - GRelative: relative, - GLabels: labels, - }, nil - case "ms", "h", "d": - return &event.TimerEvent{ - TMetricName: metric, - TValue: float64(value), - TLabels: labels, - }, nil - case "s": - return nil, fmt.Errorf("no support for StatsD sets") - default: - return nil, fmt.Errorf("bad stat type %s", statType) - } -} - -func parseTag(component, tag string, separator rune, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) { - // Entirely empty tag is an error - if len(tag) == 0 { - tagErrors.Inc() - level.Debug(logger).Log("msg", "Empty name tag", "component", component) - return - } - - for i, c := range tag { - if c == separator { - k := tag[:i] - v := tag[i+1:] - - if len(k) == 0 || len(v) == 0 { - // Empty key or value is an error - tagErrors.Inc() - level.Debug(logger).Log("msg", "Malformed name tag", "k", k, "v", v, "component", component) - } else { - labels[mapper.EscapeMetricName(k)] = v - } - return - } - } - - // Missing separator (no value) is an error - tagErrors.Inc() - level.Debug(logger).Log("msg", "Malformed name tag", "tag", tag, "component", component) -} - -func parseNameTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) { - lastTagEndIndex := 0 - for i, c := range component { - if c == ',' { - tag := component[lastTagEndIndex:i] - lastTagEndIndex = i + 1 - parseTag(component, tag, '=', labels, tagErrors, logger) - } - } - - // If we're not off the end of the string, add the last tag - if lastTagEndIndex < len(component) { - tag := component[lastTagEndIndex:] - parseTag(component, tag, '=', labels, tagErrors, logger) - } -} - -func trimLeftHash(s string) string { - if s != "" && s[0] == '#' { - return s[1:] - } - return s -} - -func parseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) { - lastTagEndIndex := 0 - for i, c := range component { - if c == ',' { - tag := component[lastTagEndIndex:i] - lastTagEndIndex = i + 1 - parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger) - } - } - - // If we're not off the end of the string, add the last tag - if lastTagEndIndex < len(component) { - tag := component[lastTagEndIndex:] - parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger) - } -} - -func parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string { - for i, c := range name { - // `#` delimits start of tags by Librato - // https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags - // `,` delimits start of tags by InfluxDB - // https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd - if c == '#' || c == ',' { - parseNameTags(name[i+1:], labels, tagErrors, logger) - return name[:i] - } - } - return name -} - -func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events { - events := event.Events{} - if line == "" { - return events - } - - elements := strings.SplitN(line, ":", 2) - if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { - sampleErrors.WithLabelValues("malformed_line").Inc() - level.Debug(logger).Log("msg", "Bad line from StatsD", "line", line) - return events - } - - labels := map[string]string{} - metric := parseNameAndTags(elements[0], labels, tagErrors, logger) - - var samples []string - if strings.Contains(elements[1], "|#") { - // using DogStatsD tags - - // don't allow mixed tagging styles - if len(labels) > 0 { - sampleErrors.WithLabelValues("mixed_tagging_styles").Inc() - level.Debug(logger).Log("msg", "Bad line (multiple tagging styles) from StatsD", "line", line) - return events - } - - // disable multi-metrics - samples = elements[1:] - } else { - samples = strings.Split(elements[1], ":") - } - -samples: - for _, sample := range samples { - samplesReceived.Inc() - components := strings.Split(sample, "|") - samplingFactor := 1.0 - if len(components) < 2 || len(components) > 4 { - sampleErrors.WithLabelValues("malformed_component").Inc() - level.Debug(logger).Log("msg", "Bad component", "line", line) - continue - } - valueStr, statType := components[0], components[1] - - var relative = false - if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 { - relative = true - } - - value, err := strconv.ParseFloat(valueStr, 64) - if err != nil { - level.Debug(logger).Log("msg", "Bad value", "value", valueStr, "line", line) - sampleErrors.WithLabelValues("malformed_value").Inc() - continue - } - - multiplyEvents := 1 - if len(components) >= 3 { - for _, component := range components[2:] { - if len(component) == 0 { - level.Debug(logger).Log("msg", "Empty component", "line", line) - sampleErrors.WithLabelValues("malformed_component").Inc() - continue samples - } - } - - for _, component := range components[2:] { - switch component[0] { - case '@': - - samplingFactor, err = strconv.ParseFloat(component[1:], 64) - if err != nil { - level.Debug(logger).Log("msg", "Invalid sampling factor", "component", component[1:], "line", line) - sampleErrors.WithLabelValues("invalid_sample_factor").Inc() - } - if samplingFactor == 0 { - samplingFactor = 1 - } - - if statType == "g" { - continue - } else if statType == "c" { - value /= samplingFactor - } else if statType == "ms" || statType == "h" || statType == "d" { - multiplyEvents = int(1 / samplingFactor) - } - case '#': - parseDogStatsDTags(component[1:], labels, tagErrors, logger) - default: - level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line) - sampleErrors.WithLabelValues("invalid_sample_factor").Inc() - continue - } - } - } - - if len(labels) > 0 { - tagsReceived.Inc() - } - - for i := 0; i < multiplyEvents; i++ { - event, err := buildEvent(statType, metric, value, relative, labels) - if err != nil { - level.Debug(logger).Log("msg", "Error building event", "line", line, "error", err) - sampleErrors.WithLabelValues("illegal_event").Inc() - continue - } - events = append(events, event) - } - } - return events -} diff --git a/pkg/listener/listener.go~ b/pkg/listener/listener.go~ deleted file mode 100644 index 04e6690..0000000 --- a/pkg/listener/listener.go~ +++ /dev/null @@ -1,138 +0,0 @@ -package listener - -import ( - "bufio" - "io" - "net" - "os" - "strings" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/statsd_exporter/pkg/event" - pkgLine "github.com/prometheus/statsd_exporter/pkg/line" -) - -type StatsDUDPListener struct { - Conn *net.UDPConn - EventHandler event.EventHandler - Logger log.Logger -} - -func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { - l.EventHandler = eh -} - -func (l *StatsDUDPListener) Listen(udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { - buf := make([]byte, 65535) - for { - n, _, err := l.Conn.ReadFromUDP(buf) - if err != nil { - // https://github.com/golang/go/issues/4373 - // ignore net: errClosing error as it will occur during shutdown - if strings.HasSuffix(err.Error(), "use of closed network connection") { - return - } - level.Error(l.Logger).Log("error", err) - return - } - l.handlePacket(buf[0:n], udpPackets, linesReceived, eventsFlushed, sampleErrors, samplesReceived, tagErrors, tagsReceived) - } -} - -func (l *StatsDUDPListener) handlePacket(packet []byte, udpPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { - udpPackets.Inc() - lines := strings.Split(string(packet), "\n") - for _, line := range lines { - linesReceived.Inc() - l.EventHandler.Queue(pkgLine.LineToEvents(line, sampleErrors, samplesReceived, tagErrors, tagsReceived, l.Logger), &eventsFlushed) - } -} - -type StatsDTCPListener struct { - Conn *net.TCPListener - EventHandler event.EventHandler - Logger log.Logger -} - -func (l *StatsDTCPListener) SetEventHandler(eh event.EventHandler) { - l.EventHandler = eh -} - -func (l *StatsDTCPListener) Listen(linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, tcpConnections prometheus.Counter, tcpErrors prometheus.Counter, tcpLineTooLong prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { - for { - c, err := l.Conn.AcceptTCP() - if err != nil { - // https://github.com/golang/go/issues/4373 - // ignore net: errClosing error as it will occur during shutdown - if strings.HasSuffix(err.Error(), "use of closed network connection") { - return - } - level.Error(l.Logger).Log("msg", "AcceptTCP failed", "error", err) - os.Exit(1) - } - go l.handleConn(c, linesReceived, eventsFlushed, tcpConnections, tcpErrors, tcpLineTooLong, sampleErrors, samplesReceived, tagErrors, tagsReceived) - } -} - -func (l *StatsDTCPListener) handleConn(c *net.TCPConn, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, tcpConnections prometheus.Counter, tcpErrors prometheus.Counter, tcpLineTooLong prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { - defer c.Close() - - tcpConnections.Inc() - - r := bufio.NewReader(c) - for { - line, isPrefix, err := r.ReadLine() - if err != nil { - if err != io.EOF { - tcpErrors.Inc() - level.Debug(l.Logger).Log("msg", "Read failed", "addr", c.RemoteAddr(), "error", err) - } - break - } - if isPrefix { - tcpLineTooLong.Inc() - level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr()) - break - } - linesReceived.Inc() - l.EventHandler.Queue(pkgLine.LineToEvents(string(line), sampleErrors, samplesReceived, tagErrors, tagsReceived, l.Logger), &eventsFlushed) - } -} - -type StatsDUnixgramListener struct { - Conn *net.UnixConn - EventHandler event.EventHandler - Logger log.Logger -} - -func (l *StatsDUnixgramListener) SetEventHandler(eh event.EventHandler) { - l.EventHandler = eh -} - -func (l *StatsDUnixgramListener) Listen(unixgramPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { - buf := make([]byte, 65535) - for { - n, _, err := l.Conn.ReadFromUnix(buf) - if err != nil { - // https://github.com/golang/go/issues/4373 - // ignore net: errClosing error as it will occur during shutdown - if strings.HasSuffix(err.Error(), "use of closed network connection") { - return - } - level.Error(l.Logger).Log(err) - os.Exit(1) - } - l.handlePacket(buf[:n], unixgramPackets, linesReceived, eventsFlushed, sampleErrors, samplesReceived, tagErrors, tagsReceived) - } -} - -func (l *StatsDUnixgramListener) handlePacket(packet []byte, unixgramPackets prometheus.Counter, linesReceived prometheus.Counter, eventsFlushed prometheus.Counter, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter) { - unixgramPackets.Inc() - lines := strings.Split(string(packet), "\n") - for _, line := range lines { - linesReceived.Inc() - l.EventHandler.Queue(pkgLine.LineToEvents(line, sampleErrors, samplesReceived, tagErrors, tagsReceived, l.Logger), &eventsFlushed) - } -} diff --git a/pkg/metrics/metrics.go~ b/pkg/metrics/metrics.go~ deleted file mode 100644 index 287318e..0000000 --- a/pkg/metrics/metrics.go~ +++ /dev/null @@ -1,42 +0,0 @@ -package metrics - -import ( - "github.com/prometheus/client_golang/prometheus" -) - -type metricType int - -const ( - CounterMetricType metricType = iota - GaugeMetricType - SummaryMetricType - HistogramMetricType -) - -type nameHash uint64 -type valueHash uint64 -type labelHash struct { - // This is a hash over the label names - names nameHash - // This is a hash over the label names + label values - values valueHash -} - -type metricHolder interface{} - -type vectorHolder interface { - Delete(label prometheus.Labels) bool -} - -type vector struct { - holder vectorHolder - refCount uint64 -} - -type metric struct { - metricType metricType - // Vectors key is the hash of the label names - vectors map[nameHash]*vector - // Metrics key is a hash of the label names + label values - metrics map[valueHash]*registeredMetric -} diff --git a/pkg/registry/registry.go~ b/pkg/registry/registry.go~ deleted file mode 100644 index 6a3c8ee..0000000 --- a/pkg/registry/registry.go~ +++ /dev/null @@ -1,356 +0,0 @@ -package registry - -import ( - "github.com/prometheus/statsd_exporter/pkg/metrics" -) - -type RegisteredMetric struct { - lastRegisteredAt time.Time - labels prometheus.Labels - ttl time.Duration - metric metricHolder - vecKey nameHash -} - -type Registry struct { - metrics map[string]metric - mapper *mapper.MetricMapper - // The below value and label variables are allocated in the registry struct - // so that we don't have to allocate them every time have to compute a label - // hash. - valueBuf, nameBuf bytes.Buffer - hasher hash.Hash64 -} - -func NewRegistry(mapper *mapper.MetricMapper) *registry { - return ®istry{ - metrics: make(map[string]metric), - mapper: mapper, - hasher: fnv.New64a(), - } -} - -func (r *registry) MetricConflicts(metricName string, metricType metricType) bool { - vector, hasMetric := r.metrics[metricName] - if !hasMetric { - // No metric with this name exists - return false - } - - if vector.metricType == metricType { - // We've found a copy of this metric with this type, but different - // labels, so it's safe to create a new one. - return false - } - - // The metric exists, but it's of a different type than we're trying to - // create. - return true -} - -func (r *registry) StoreCounter(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.CounterVec, c prometheus.Counter, ttl time.Duration) { - r.store(metricName, hash, labels, vec, c, CounterMetricType, ttl) -} - -func (r *registry) StoreGauge(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.GaugeVec, g prometheus.Counter, ttl time.Duration) { - r.store(metricName, hash, labels, vec, g, GaugeMetricType, ttl) -} - -func (r *registry) StoreHistogram(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.HistogramVec, o prometheus.Observer, ttl time.Duration) { - r.store(metricName, hash, labels, vec, o, HistogramMetricType, ttl) -} - -func (r *registry) StoreSummary(metricName string, hash labelHash, labels prometheus.Labels, vec *prometheus.SummaryVec, o prometheus.Observer, ttl time.Duration) { - r.store(metricName, hash, labels, vec, o, SummaryMetricType, ttl) -} - -func (r *registry) Store(metricName string, hash labelHash, labels prometheus.Labels, vh vectorHolder, mh metricHolder, metricType metricType, ttl time.Duration) { - metric, hasMetric := r.metrics[metricName] - if !hasMetric { - metric.metricType = metricType - metric.vectors = make(map[nameHash]*vector) - metric.metrics = make(map[valueHash]*registeredMetric) - - r.metrics[metricName] = metric - } - - v, ok := metric.vectors[hash.names] - if !ok { - v = &vector{holder: vh} - metric.vectors[hash.names] = v - } - - rm, ok := metric.metrics[hash.values] - if !ok { - rm = ®isteredMetric{ - labels: labels, - ttl: ttl, - metric: mh, - vecKey: hash.names, - } - metric.metrics[hash.values] = rm - v.refCount++ - } - now := clock.Now() - rm.lastRegisteredAt = now - // Update ttl from mapping - rm.ttl = ttl -} - -func (r *registry) Get(metricName string, hash labelHash, metricType metricType) (vectorHolder, metricHolder) { - metric, hasMetric := r.metrics[metricName] - - if !hasMetric { - return nil, nil - } - if metric.metricType != metricType { - return nil, nil - } - - rm, ok := metric.metrics[hash.values] - if ok { - now := clock.Now() - rm.lastRegisteredAt = now - return metric.vectors[hash.names].holder, rm.metric - } - - vector, ok := metric.vectors[hash.names] - if ok { - return vector.holder, nil - } - - return nil, nil -} - -func (r *registry) GetCounter(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Counter, error) { - hash, labelNames := r.hashLabels(labels) - vh, mh := r.get(metricName, hash, CounterMetricType) - if mh != nil { - return mh.(prometheus.Counter), nil - } - - if r.metricConflicts(metricName, CounterMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - - var counterVec *prometheus.CounterVec - if vh == nil { - metricsCount.WithLabelValues("counter").Inc() - counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: metricName, - Help: help, - }, labelNames) - - if err := prometheus.Register(uncheckedCollector{counterVec}); err != nil { - return nil, err - } - } else { - counterVec = vh.(*prometheus.CounterVec) - } - - var counter prometheus.Counter - var err error - if counter, err = counterVec.GetMetricWith(labels); err != nil { - return nil, err - } - r.storeCounter(metricName, hash, labels, counterVec, counter, mapping.Ttl) - - return counter, nil -} - -func (r *registry) GetGauge(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Gauge, error) { - hash, labelNames := r.hashLabels(labels) - vh, mh := r.get(metricName, hash, GaugeMetricType) - if mh != nil { - return mh.(prometheus.Gauge), nil - } - - if r.metricConflicts(metricName, GaugeMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - - var gaugeVec *prometheus.GaugeVec - if vh == nil { - metricsCount.WithLabelValues("gauge").Inc() - gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: metricName, - Help: help, - }, labelNames) - - if err := prometheus.Register(uncheckedCollector{gaugeVec}); err != nil { - return nil, err - } - } else { - gaugeVec = vh.(*prometheus.GaugeVec) - } - - var gauge prometheus.Gauge - var err error - if gauge, err = gaugeVec.GetMetricWith(labels); err != nil { - return nil, err - } - r.storeGauge(metricName, hash, labels, gaugeVec, gauge, mapping.Ttl) - - return gauge, nil -} - -func (r *registry) GetHistogram(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { - hash, labelNames := r.hashLabels(labels) - vh, mh := r.get(metricName, hash, HistogramMetricType) - if mh != nil { - return mh.(prometheus.Observer), nil - } - - if r.metricConflicts(metricName, HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_sum", HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_count", HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_bucket", HistogramMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - - var histogramVec *prometheus.HistogramVec - if vh == nil { - metricsCount.WithLabelValues("histogram").Inc() - buckets := r.mapper.Defaults.Buckets - if mapping.HistogramOptions != nil && len(mapping.HistogramOptions.Buckets) > 0 { - buckets = mapping.HistogramOptions.Buckets - } - histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: metricName, - Help: help, - Buckets: buckets, - }, labelNames) - - if err := prometheus.Register(uncheckedCollector{histogramVec}); err != nil { - return nil, err - } - } else { - histogramVec = vh.(*prometheus.HistogramVec) - } - - var observer prometheus.Observer - var err error - if observer, err = histogramVec.GetMetricWith(labels); err != nil { - return nil, err - } - r.storeHistogram(metricName, hash, labels, histogramVec, observer, mapping.Ttl) - - return observer, nil -} - -func (r *registry) GetSummary(metricName string, labels prometheus.Labels, help string, mapping *mapper.MetricMapping) (prometheus.Observer, error) { - hash, labelNames := r.hashLabels(labels) - vh, mh := r.get(metricName, hash, SummaryMetricType) - if mh != nil { - return mh.(prometheus.Observer), nil - } - - if r.metricConflicts(metricName, SummaryMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_sum", SummaryMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - if r.metricConflicts(metricName+"_count", SummaryMetricType) { - return nil, fmt.Errorf("metric with name %s is already registered", metricName) - } - - var summaryVec *prometheus.SummaryVec - if vh == nil { - metricsCount.WithLabelValues("summary").Inc() - quantiles := r.mapper.Defaults.Quantiles - if mapping != nil && mapping.SummaryOptions != nil && len(mapping.SummaryOptions.Quantiles) > 0 { - quantiles = mapping.SummaryOptions.Quantiles - } - summaryOptions := mapper.SummaryOptions{} - if mapping != nil && mapping.SummaryOptions != nil { - summaryOptions = *mapping.SummaryOptions - } - objectives := make(map[float64]float64) - for _, q := range quantiles { - objectives[q.Quantile] = q.Error - } - // In the case of no mapping file, explicitly define the default quantiles - if len(objectives) == 0 { - objectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} - } - summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Name: metricName, - Help: help, - Objectives: objectives, - MaxAge: summaryOptions.MaxAge, - AgeBuckets: summaryOptions.AgeBuckets, - BufCap: summaryOptions.BufCap, - }, labelNames) - - if err := prometheus.Register(uncheckedCollector{summaryVec}); err != nil { - return nil, err - } - } else { - summaryVec = vh.(*prometheus.SummaryVec) - } - - var observer prometheus.Observer - var err error - if observer, err = summaryVec.GetMetricWith(labels); err != nil { - return nil, err - } - r.storeSummary(metricName, hash, labels, summaryVec, observer, mapping.Ttl) - - return observer, nil -} - -func (r *registry) RemoveStaleMetrics() { - now := clock.Now() - // delete timeseries with expired ttl - for _, metric := range r.metrics { - for hash, rm := range metric.metrics { - if rm.ttl == 0 { - continue - } - if rm.lastRegisteredAt.Add(rm.ttl).Before(now) { - metric.vectors[rm.vecKey].holder.Delete(rm.labels) - metric.vectors[rm.vecKey].refCount-- - delete(metric.metrics, hash) - } - } - } -} - -// Calculates a hash of both the label names and the label names and values. -func (r *registry) HashLabels(labels prometheus.Labels) (labelHash, []string) { - r.hasher.Reset() - r.nameBuf.Reset() - r.valueBuf.Reset() - labelNames := make([]string, 0, len(labels)) - - for labelName := range labels { - labelNames = append(labelNames, labelName) - } - sort.Strings(labelNames) - - r.valueBuf.WriteByte(model.SeparatorByte) - for _, labelName := range labelNames { - r.valueBuf.WriteString(labels[labelName]) - r.valueBuf.WriteByte(model.SeparatorByte) - - r.nameBuf.WriteString(labelName) - r.nameBuf.WriteByte(model.SeparatorByte) - } - - lh := labelHash{} - r.hasher.Write(r.nameBuf.Bytes()) - lh.names = nameHash(r.hasher.Sum64()) - - // Now add the values to the names we've already hashed. - r.hasher.Write(r.valueBuf.Bytes()) - lh.values = valueHash(r.hasher.Sum64()) - - return lh, labelNames -} diff --git a/pkg/util/util.go~ b/pkg/util/util.go~ deleted file mode 100644 index c02100c..0000000 --- a/pkg/util/util.go~ +++ /dev/null @@ -1,53 +0,0 @@ -package util - -import ( - "fmt" - "net" - "strconv" -) - -func IPPortFromString(addr string) (*net.IPAddr, int, error) { - host, portStr, err := net.SplitHostPort(addr) - if err != nil { - return nil, 0, fmt.Errorf("bad StatsD listening address: %s", addr) - } - - if host == "" { - host = "0.0.0.0" - } - ip, err := net.ResolveIPAddr("ip", host) - if err != nil { - return nil, 0, fmt.Errorf("Unable to resolve %s: %s", host, err) - } - - port, err := strconv.Atoi(portStr) - if err != nil || port < 0 || port > 65535 { - return nil, 0, fmt.Errorf("Bad port %s: %s", portStr, err) - } - - return ip, port, nil -} - -func UDPAddrFromString(addr string) (*net.UDPAddr, error) { - ip, port, err := ipPortFromString(addr) - if err != nil { - return nil, err - } - return &net.UDPAddr{ - IP: ip.IP, - Port: port, - Zone: ip.Zone, - }, nil -} - -func TCPAddrFromString(addr string) (*net.TCPAddr, error) { - ip, port, err := ipPortFromString(addr) - if err != nil { - return nil, err - } - return &net.TCPAddr{ - IP: ip.IP, - Port: port, - Zone: ip.Zone, - }, nil -}