From 0b84893d4a6d1ed139eba91f1d1515cbe9134430 Mon Sep 17 00:00:00 2001 From: SuperQ Date: Wed, 21 Jul 2021 14:46:19 +0200 Subject: [PATCH] Add statsd relay Add a simple statsd packet output relay that buffers and forwards raw statsd lines. * Only supports UDP output. * Has a hard-coded buffering timeout of 1 second. Closes: https://github.com/prometheus/statsd_exporter/issues/95 Signed-off-by: SuperQ --- README.md | 10 ++- main.go | 18 +++++- pkg/listener/listener.go | 13 ++++ pkg/relay/relay.go | 128 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 pkg/relay/relay.go diff --git a/README.md b/README.md index 1a62ed6..a0db516 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,10 @@ NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/king Parse Librato style tags. Enabled by default. --statsd.parse-signalfx-tags Parse SignalFX style tags. Enabled by default. + --statsd.relay.address=STATSD.RELAY.ADDRESS + The UDP relay target address (host:port) + --statsd.relay.packet-length=1400 + Maximum relay output packet length to avoid fragmentation --log.level=info Only log messages with the given severity or above. One of: [debug, info, warn, error] --log.format=logfmt Output format of log messages. One of: [logfmt, @@ -160,6 +164,10 @@ NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/king The `statsd_exporter` has an optional lifecycle API (disabled by default) that can be used to reload or quit the exporter by sending a `PUT` or `POST` request to the `/-/reload` or `/-/quit` endpoints. +## Relay + +The `statsd_exporter` has an optional mode that will buffer and relay incoming statsd lines to a remote server. This is useful to "tee" the data when migrating to using the exporter. The relay will flush the buffer at least once per second to avoid delaying delivery of metrics. + ## Tests $ go test @@ -560,4 +568,4 @@ We encourage re-use of these packages and welcome [issues](https://github.com/pr [travis]: https://travis-ci.org/prometheus/statsd_exporter [circleci]: https://circleci.com/gh/prometheus/statsd_exporter [quay]: https://quay.io/repository/prometheus/statsd-exporter -[hub]: https://hub.docker.com/r/prom/statsd-exporter/ \ No newline at end of file +[hub]: https://hub.docker.com/r/prom/statsd-exporter/ diff --git a/main.go b/main.go index 87e8221..05e6481 100644 --- a/main.go +++ b/main.go @@ -41,6 +41,7 @@ import ( "github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/prometheus/statsd_exporter/pkg/mappercache/lru" "github.com/prometheus/statsd_exporter/pkg/mappercache/randomreplacement" + "github.com/prometheus/statsd_exporter/pkg/relay" ) const ( @@ -286,7 +287,7 @@ func main() { readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int() cacheType = kingpin.Flag("statsd.cache-type", "Metric mapping cache type. Valid options are \"lru\" and \"random\"").Default("lru").Enum("lru", "random") - eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events.").Default("10000").Int() + eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events.").Default("10000").Uint() eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing.").Default("1000").Int() eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Maximum time between event queue flushes.").Default("200ms").Duration() dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String() @@ -295,6 +296,8 @@ func main() { influxdbTagsEnabled = kingpin.Flag("statsd.parse-influxdb-tags", "Parse InfluxDB style tags. Enabled by default.").Default("true").Bool() libratoTagsEnabled = kingpin.Flag("statsd.parse-librato-tags", "Parse Librato style tags. Enabled by default.").Default("true").Bool() signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool() + relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String() + relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint() ) promlogConfig := &promlog.Config{} @@ -362,6 +365,16 @@ func main() { return } + var relayTarget *relay.Relay + if *relayAddr != "" { + var err error + relayTarget, err = relay.NewRelay(logger, *relayAddr, *relayPacketLen) + if err != nil { + level.Error(logger).Log("msg", "Unable to create relay", "err", err) + os.Exit(1) + } + } + level.Info(logger).Log("msg", "Accepting StatsD Traffic", "udp", *statsdListenUDP, "tcp", *statsdListenTCP, "unixgram", *statsdListenUnixgram) level.Info(logger).Log("msg", "Accepting Prometheus Requests", "addr", *listenAddress) @@ -398,6 +411,7 @@ func main() { UDPPackets: udpPackets, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, + Relay: relayTarget, SampleErrors: *sampleErrors, SamplesReceived: samplesReceived, TagErrors: tagErrors, @@ -427,6 +441,7 @@ func main() { LineParser: parser, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, + Relay: relayTarget, SampleErrors: *sampleErrors, SamplesReceived: samplesReceived, TagErrors: tagErrors, @@ -472,6 +487,7 @@ func main() { UnixgramPackets: unixgramPackets, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, + Relay: relayTarget, SampleErrors: *sampleErrors, SamplesReceived: samplesReceived, TagErrors: tagErrors, diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index b1a57f5..09f9dbf 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/level" + "github.com/prometheus/statsd_exporter/pkg/relay" ) type Parser interface { @@ -39,6 +40,7 @@ type StatsDUDPListener struct { UDPPackets prometheus.Counter LinesReceived prometheus.Counter EventsFlushed prometheus.Counter + Relay *relay.Relay SampleErrors prometheus.CounterVec SamplesReceived prometheus.Counter TagErrors prometheus.Counter @@ -72,6 +74,9 @@ func (l *StatsDUDPListener) HandlePacket(packet []byte) { for _, line := range lines { level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line) l.LinesReceived.Inc() + if l.Relay != nil && len(line) > 0 { + l.Relay.RelayLine(line) + } l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } } @@ -83,6 +88,7 @@ type StatsDTCPListener struct { LineParser Parser LinesReceived prometheus.Counter EventsFlushed prometheus.Counter + Relay *relay.Relay SampleErrors prometheus.CounterVec SamplesReceived prometheus.Counter TagErrors prometheus.Counter @@ -134,6 +140,9 @@ func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) { break } l.LinesReceived.Inc() + if l.Relay != nil && len(line) > 0 { + l.Relay.RelayLine(string(line)) + } l.EventHandler.Queue(l.LineParser.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } } @@ -146,6 +155,7 @@ type StatsDUnixgramListener struct { UnixgramPackets prometheus.Counter LinesReceived prometheus.Counter EventsFlushed prometheus.Counter + Relay *relay.Relay SampleErrors prometheus.CounterVec SamplesReceived prometheus.Counter TagErrors prometheus.Counter @@ -179,6 +189,9 @@ func (l *StatsDUnixgramListener) HandlePacket(packet []byte) { for _, line := range lines { level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "unixgram", "line", line) l.LinesReceived.Inc() + if l.Relay != nil && len(line) > 0 { + l.Relay.RelayLine(line) + } l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } } diff --git a/pkg/relay/relay.go b/pkg/relay/relay.go new file mode 100644 index 0000000..d023616 --- /dev/null +++ b/pkg/relay/relay.go @@ -0,0 +1,128 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package relay + +import ( + "bytes" + "fmt" + "net" + "strings" + "time" + + "github.com/go-kit/log" + + "github.com/prometheus/statsd_exporter/pkg/level" +) + +type Relay struct { + addr *net.UDPAddr + bufferChannel chan []byte + conn *net.UDPConn + logger log.Logger + packetLength uint +} + +// NewRelay creates a statsd UDP relay. It can be used to send copies of statsd raw +// lines to a separate service. +func NewRelay(l log.Logger, target string, packetLength uint) (*Relay, error) { + addr, err := net.ResolveUDPAddr("udp", target) + if err != nil { + return nil, fmt.Errorf("unable to resolve target %s, err: %w", target, err) + } + conn, err := net.ListenUDP("udp", nil) + if err != nil { + return nil, fmt.Errorf("unable to listen on UDP, err: %w", err) + } + + c := make(chan []byte, 100) + + r := Relay{ + addr: addr, + bufferChannel: c, + conn: conn, + logger: l, + packetLength: packetLength, + } + + // Startup the UDP sender. + go r.relayOutput() + + return &r, nil +} + +// relayOutput buffers statsd lines and sends them to the relay target. +func (r *Relay) relayOutput() { + var buffer bytes.Buffer + var err error + + relayInterval := time.NewTicker(1 * time.Second) + defer relayInterval.Stop() + + for { + select { + case <-relayInterval.C: + err = r.sendPacket(buffer.Bytes()) + if err != nil { + level.Error(r.logger).Log("msg", "Error sending UDP packet", "error", err) + return + } + // Clear out the buffer. + buffer.Reset() + case b := <-r.bufferChannel: + if uint(len(b)+buffer.Len()) > r.packetLength { + level.Debug(r.logger).Log("msg", "Buffer full, sending packet", "length", buffer.Len()) + err = r.sendPacket(buffer.Bytes()) + if err != nil { + level.Error(r.logger).Log("msg", "Error sending UDP packet", "error", err) + return + } + // Seed the new buffer with the new line. + buffer.Reset() + buffer.Write(b) + } else { + level.Debug(r.logger).Log("msg", "Adding line to buffer", "line", b) + buffer.Write(b) + } + } + } +} + +// sendPacket sends a single relay line to the destination target. +func (r *Relay) sendPacket(buf []byte) error { + if len(buf) == 0 { + level.Debug(r.logger).Log("msg", "Empty buffer, nothing to send") + return nil + } + level.Debug(r.logger).Log("msg", "Sending packet", "length", len(buf), "data", buf) + _, err := r.conn.WriteToUDP(buf, r.addr) + return err +} + +// RelayLine processes a single statsd line and forwards it to the relay target. +func (r *Relay) RelayLine(l string) { + lineLength := uint(len(l)) + if lineLength == 0 { + level.Debug(r.logger).Log("msg", "Empty line, not relaying") + return + } + if lineLength > r.packetLength-1 { + level.Warn(r.logger).Log("msg", "line too long, not relaying", "length", lineLength, "max", r.packetLength) + return + } + level.Debug(r.logger).Log("msg", "Relaying line", "line", l) + if !strings.HasSuffix(l, "\n") { + l = l + "\n" + } + r.bufferChannel <- []byte(l) +}