Add statsd relay

Add a simple statsd packet output relay that buffers and forwards raw
statsd lines.
* Only supports UDP output.
* Has a hard-coded buffering timeout of 1 second.

Closes: https://github.com/prometheus/statsd_exporter/issues/95

Signed-off-by: SuperQ <superq@gmail.com>
This commit is contained in:
SuperQ 2021-07-21 14:46:19 +02:00
parent 6c43ebbc79
commit 0b84893d4a
No known key found for this signature in database
GPG key ID: C646B23C9E3245F1
4 changed files with 167 additions and 2 deletions

View file

@ -148,6 +148,10 @@ NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/king
Parse Librato style tags. Enabled by default.
--statsd.parse-signalfx-tags
Parse SignalFX style tags. Enabled by default.
--statsd.relay.address=STATSD.RELAY.ADDRESS
The UDP relay target address (host:port)
--statsd.relay.packet-length=1400
Maximum relay output packet length to avoid fragmentation
--log.level=info Only log messages with the given severity or
above. One of: [debug, info, warn, error]
--log.format=logfmt Output format of log messages. One of: [logfmt,
@ -160,6 +164,10 @@ NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/king
The `statsd_exporter` has an optional lifecycle API (disabled by default) that can be used to reload or quit the exporter
by sending a `PUT` or `POST` request to the `/-/reload` or `/-/quit` endpoints.
## Relay
The `statsd_exporter` has an optional mode that will buffer and relay incoming statsd lines to a remote server. This is useful to "tee" the data when migrating to using the exporter. The relay will flush the buffer at least once per second to avoid delaying delivery of metrics.
## Tests
$ go test
@ -560,4 +568,4 @@ We encourage re-use of these packages and welcome [issues](https://github.com/pr
[travis]: https://travis-ci.org/prometheus/statsd_exporter
[circleci]: https://circleci.com/gh/prometheus/statsd_exporter
[quay]: https://quay.io/repository/prometheus/statsd-exporter
[hub]: https://hub.docker.com/r/prom/statsd-exporter/
[hub]: https://hub.docker.com/r/prom/statsd-exporter/

18
main.go
View file

@ -41,6 +41,7 @@ import (
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/mappercache/lru"
"github.com/prometheus/statsd_exporter/pkg/mappercache/randomreplacement"
"github.com/prometheus/statsd_exporter/pkg/relay"
)
const (
@ -286,7 +287,7 @@ func main() {
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int()
cacheType = kingpin.Flag("statsd.cache-type", "Metric mapping cache type. Valid options are \"lru\" and \"random\"").Default("lru").Enum("lru", "random")
eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events.").Default("10000").Int()
eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events.").Default("10000").Uint()
eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing.").Default("1000").Int()
eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Maximum time between event queue flushes.").Default("200ms").Duration()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
@ -295,6 +296,8 @@ func main() {
influxdbTagsEnabled = kingpin.Flag("statsd.parse-influxdb-tags", "Parse InfluxDB style tags. Enabled by default.").Default("true").Bool()
libratoTagsEnabled = kingpin.Flag("statsd.parse-librato-tags", "Parse Librato style tags. Enabled by default.").Default("true").Bool()
signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool()
relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String()
relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint()
)
promlogConfig := &promlog.Config{}
@ -362,6 +365,16 @@ func main() {
return
}
var relayTarget *relay.Relay
if *relayAddr != "" {
var err error
relayTarget, err = relay.NewRelay(logger, *relayAddr, *relayPacketLen)
if err != nil {
level.Error(logger).Log("msg", "Unable to create relay", "err", err)
os.Exit(1)
}
}
level.Info(logger).Log("msg", "Accepting StatsD Traffic", "udp", *statsdListenUDP, "tcp", *statsdListenTCP, "unixgram", *statsdListenUnixgram)
level.Info(logger).Log("msg", "Accepting Prometheus Requests", "addr", *listenAddress)
@ -398,6 +411,7 @@ func main() {
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
@ -427,6 +441,7 @@ func main() {
LineParser: parser,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
@ -472,6 +487,7 @@ func main() {
UnixgramPackets: unixgramPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/level"
"github.com/prometheus/statsd_exporter/pkg/relay"
)
type Parser interface {
@ -39,6 +40,7 @@ type StatsDUDPListener struct {
UDPPackets prometheus.Counter
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
Relay *relay.Relay
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
@ -72,6 +74,9 @@ func (l *StatsDUDPListener) HandlePacket(packet []byte) {
for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line)
l.LinesReceived.Inc()
if l.Relay != nil && len(line) > 0 {
l.Relay.RelayLine(line)
}
l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
}
}
@ -83,6 +88,7 @@ type StatsDTCPListener struct {
LineParser Parser
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
Relay *relay.Relay
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
@ -134,6 +140,9 @@ func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) {
break
}
l.LinesReceived.Inc()
if l.Relay != nil && len(line) > 0 {
l.Relay.RelayLine(string(line))
}
l.EventHandler.Queue(l.LineParser.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
}
}
@ -146,6 +155,7 @@ type StatsDUnixgramListener struct {
UnixgramPackets prometheus.Counter
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
Relay *relay.Relay
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
@ -179,6 +189,9 @@ func (l *StatsDUnixgramListener) HandlePacket(packet []byte) {
for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "unixgram", "line", line)
l.LinesReceived.Inc()
if l.Relay != nil && len(line) > 0 {
l.Relay.RelayLine(line)
}
l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
}
}

128
pkg/relay/relay.go Normal file
View file

@ -0,0 +1,128 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package relay
import (
"bytes"
"fmt"
"net"
"strings"
"time"
"github.com/go-kit/log"
"github.com/prometheus/statsd_exporter/pkg/level"
)
type Relay struct {
addr *net.UDPAddr
bufferChannel chan []byte
conn *net.UDPConn
logger log.Logger
packetLength uint
}
// NewRelay creates a statsd UDP relay. It can be used to send copies of statsd raw
// lines to a separate service.
func NewRelay(l log.Logger, target string, packetLength uint) (*Relay, error) {
addr, err := net.ResolveUDPAddr("udp", target)
if err != nil {
return nil, fmt.Errorf("unable to resolve target %s, err: %w", target, err)
}
conn, err := net.ListenUDP("udp", nil)
if err != nil {
return nil, fmt.Errorf("unable to listen on UDP, err: %w", err)
}
c := make(chan []byte, 100)
r := Relay{
addr: addr,
bufferChannel: c,
conn: conn,
logger: l,
packetLength: packetLength,
}
// Startup the UDP sender.
go r.relayOutput()
return &r, nil
}
// relayOutput buffers statsd lines and sends them to the relay target.
func (r *Relay) relayOutput() {
var buffer bytes.Buffer
var err error
relayInterval := time.NewTicker(1 * time.Second)
defer relayInterval.Stop()
for {
select {
case <-relayInterval.C:
err = r.sendPacket(buffer.Bytes())
if err != nil {
level.Error(r.logger).Log("msg", "Error sending UDP packet", "error", err)
return
}
// Clear out the buffer.
buffer.Reset()
case b := <-r.bufferChannel:
if uint(len(b)+buffer.Len()) > r.packetLength {
level.Debug(r.logger).Log("msg", "Buffer full, sending packet", "length", buffer.Len())
err = r.sendPacket(buffer.Bytes())
if err != nil {
level.Error(r.logger).Log("msg", "Error sending UDP packet", "error", err)
return
}
// Seed the new buffer with the new line.
buffer.Reset()
buffer.Write(b)
} else {
level.Debug(r.logger).Log("msg", "Adding line to buffer", "line", b)
buffer.Write(b)
}
}
}
}
// sendPacket sends a single relay line to the destination target.
func (r *Relay) sendPacket(buf []byte) error {
if len(buf) == 0 {
level.Debug(r.logger).Log("msg", "Empty buffer, nothing to send")
return nil
}
level.Debug(r.logger).Log("msg", "Sending packet", "length", len(buf), "data", buf)
_, err := r.conn.WriteToUDP(buf, r.addr)
return err
}
// RelayLine processes a single statsd line and forwards it to the relay target.
func (r *Relay) RelayLine(l string) {
lineLength := uint(len(l))
if lineLength == 0 {
level.Debug(r.logger).Log("msg", "Empty line, not relaying")
return
}
if lineLength > r.packetLength-1 {
level.Warn(r.logger).Log("msg", "line too long, not relaying", "length", lineLength, "max", r.packetLength)
return
}
level.Debug(r.logger).Log("msg", "Relaying line", "line", l)
if !strings.HasSuffix(l, "\n") {
l = l + "\n"
}
r.bufferChannel <- []byte(l)
}