Add TCP StatsD listener support (#71)

* add TCP StatsD listener support

* add listen-tcp flag to control UDP/TCP mode on same port

* statsdListenUDP/statsdListenTCP as string, and alias listen-address to listen-udp

* add stats for tcp error/line_too_long

* add test for TCP listener
This commit is contained in:
jwfang 2017-08-01 18:21:00 +08:00 committed by Tobias Schmidt
parent 8b40f781ef
commit 07543ac557
5 changed files with 275 additions and 151 deletions

View file

@ -271,24 +271,26 @@ func TestHandlePacket(t *testing.T) {
},
}
l := StatsDListener{}
events := make(chan Events, 32)
for i, scenario := range scenarios {
l.handlePacket([]byte(scenario.in), events)
for k, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
events := make(chan Events, 32)
for i, scenario := range scenarios {
l.handlePacket([]byte(scenario.in), events)
// Flatten actual events.
actual := Events{}
for i := 0; i < len(events); i++ {
actual = append(actual, <-events...)
}
le := len(events)
// Flatten actual events.
actual := Events{}
for i := 0; i < le; i++ {
actual = append(actual, <-events...)
}
if len(actual) != len(scenario.out) {
t.Fatalf("%d. Expected %d events, got %d in scenario '%s'", i, len(scenario.out), len(actual), scenario.name)
}
if len(actual) != len(scenario.out) {
t.Fatalf("%d.%d. Expected %d events, got %d in scenario '%s'", k, i, len(scenario.out), len(actual), scenario.name)
}
for j, expected := range scenario.out {
if !reflect.DeepEqual(&expected, &actual[j]) {
t.Fatalf("%d.%d. Expected %#v, got %#v in scenario '%s'", i, j, expected, actual[j], scenario.name)
for j, expected := range scenario.out {
if !reflect.DeepEqual(&expected, &actual[j]) {
t.Fatalf("%d.%d.%d. Expected %#v, got %#v in scenario '%s'", k, i, j, expected, actual[j], scenario.name)
}
}
}
}

View file

@ -14,10 +14,12 @@
package main
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"hash/fnv"
"io"
"net"
"regexp"
"strconv"
@ -304,10 +306,6 @@ func NewExporter(mapper *metricMapper, addSuffix bool) *Exporter {
}
}
type StatsDListener struct {
conn *net.UDPConn
}
func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (Event, error) {
switch statType {
case "c":
@ -336,17 +334,6 @@ func buildEvent(statType, metric string, value float64, relative bool, labels ma
}
}
func (l *StatsDListener) Listen(e chan<- Events) {
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUDP(buf)
if err != nil {
log.Fatal(err)
}
l.handlePacket(buf[0:n], e)
}
}
func parseDogStatsDTagsToLabels(component string) map[string]string {
labels := map[string]string{}
networkStats.WithLabelValues("dogstatsd_tags").Inc()
@ -366,105 +353,162 @@ func parseDogStatsDTagsToLabels(component string) map[string]string {
return labels
}
func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) {
func lineToEvents(line string) Events {
events := Events{}
if line == "" {
return events
}
elements := strings.SplitN(line, ":", 2)
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
networkStats.WithLabelValues("malformed_line").Inc()
log.Errorln("Bad line from StatsD:", line)
return events
}
metric := elements[0]
var samples []string
if strings.Contains(elements[1], "|#") {
// using datadog extensions, disable multi-metrics
samples = elements[1:]
} else {
samples = strings.Split(elements[1], ":")
}
samples:
for _, sample := range samples {
components := strings.Split(sample, "|")
samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 {
networkStats.WithLabelValues("malformed_component").Inc()
log.Errorln("Bad component on line:", line)
continue
}
valueStr, statType := components[0], components[1]
var relative = false
if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 {
relative = true
}
value, err := strconv.ParseFloat(valueStr, 64)
if err != nil {
log.Errorf("Bad value %s on line: %s", valueStr, line)
networkStats.WithLabelValues("malformed_value").Inc()
continue
}
multiplyEvents := 1
labels := map[string]string{}
if len(components) >= 3 {
for _, component := range components[2:] {
if len(component) == 0 {
log.Errorln("Empty component on line: ", line)
networkStats.WithLabelValues("malformed_component").Inc()
continue samples
}
}
for _, component := range components[2:] {
switch component[0] {
case '@':
if statType != "c" && statType != "ms" {
log.Errorln("Illegal sampling factor for non-counter metric on line", line)
networkStats.WithLabelValues("illegal_sample_factor").Inc()
continue
}
samplingFactor, err = strconv.ParseFloat(component[1:], 64)
if err != nil {
log.Errorf("Invalid sampling factor %s on line %s", component[1:], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc()
}
if samplingFactor == 0 {
samplingFactor = 1
}
if statType == "c" {
value /= samplingFactor
} else if statType == "ms" {
multiplyEvents = int(1 / samplingFactor)
}
case '#':
labels = parseDogStatsDTagsToLabels(component)
default:
log.Errorf("Invalid sampling factor or tag section %s on line %s", components[2], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc()
continue
}
}
}
for i := 0; i < multiplyEvents; i++ {
event, err := buildEvent(statType, metric, value, relative, labels)
if err != nil {
log.Errorf("Error building event on line %s: %s", line, err)
networkStats.WithLabelValues("illegal_event").Inc()
continue
}
events = append(events, event)
}
networkStats.WithLabelValues("legal").Inc()
}
return events
}
type StatsDUDPListener struct {
conn *net.UDPConn
}
func (l *StatsDUDPListener) Listen(e chan<- Events) {
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUDP(buf)
if err != nil {
log.Fatal(err)
}
l.handlePacket(buf[0:n], e)
}
}
func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) {
lines := strings.Split(string(packet), "\n")
events := Events{}
for _, line := range lines {
if line == "" {
continue
}
elements := strings.SplitN(line, ":", 2)
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
networkStats.WithLabelValues("malformed_line").Inc()
log.Errorln("Bad line from StatsD:", line)
continue
}
metric := elements[0]
var samples []string
if strings.Contains(elements[1], "|#") {
// using datadog extensions, disable multi-metrics
samples = elements[1:]
} else {
samples = strings.Split(elements[1], ":")
}
samples:
for _, sample := range samples {
components := strings.Split(sample, "|")
samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 {
networkStats.WithLabelValues("malformed_component").Inc()
log.Errorln("Bad component on line:", line)
continue
}
valueStr, statType := components[0], components[1]
var relative = false
if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 {
relative = true
}
value, err := strconv.ParseFloat(valueStr, 64)
if err != nil {
log.Errorf("Bad value %s on line: %s", valueStr, line)
networkStats.WithLabelValues("malformed_value").Inc()
continue
}
multiplyEvents := 1
labels := map[string]string{}
if len(components) >= 3 {
for _, component := range components[2:] {
if len(component) == 0 {
log.Errorln("Empty component on line: ", line)
networkStats.WithLabelValues("malformed_component").Inc()
continue samples
}
}
for _, component := range components[2:] {
switch component[0] {
case '@':
if statType != "c" && statType != "ms" {
log.Errorln("Illegal sampling factor for non-counter metric on line", line)
networkStats.WithLabelValues("illegal_sample_factor").Inc()
continue
}
samplingFactor, err = strconv.ParseFloat(component[1:], 64)
if err != nil {
log.Errorf("Invalid sampling factor %s on line %s", component[1:], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc()
}
if samplingFactor == 0 {
samplingFactor = 1
}
if statType == "c" {
value /= samplingFactor
} else if statType == "ms" {
multiplyEvents = int(1 / samplingFactor)
}
case '#':
labels = parseDogStatsDTagsToLabels(component)
default:
log.Errorf("Invalid sampling factor or tag section %s on line %s", components[2], line)
networkStats.WithLabelValues("invalid_sample_factor").Inc()
continue
}
}
}
for i := 0; i < multiplyEvents; i++ {
event, err := buildEvent(statType, metric, value, relative, labels)
if err != nil {
log.Errorf("Error building event on line %s: %s", line, err)
networkStats.WithLabelValues("illegal_event").Inc()
continue
}
events = append(events, event)
}
networkStats.WithLabelValues("legal").Inc()
}
events = append(events, lineToEvents(line)...)
}
e <- events
}
type StatsDTCPListener struct {
conn *net.TCPListener
}
func (l *StatsDTCPListener) Listen(e chan<- Events) {
for {
c, err := l.conn.AcceptTCP()
if err != nil {
log.Fatalf("AcceptTCP failed: %v", err)
}
go l.handleConn(c, e)
}
}
func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) {
defer c.Close()
r := bufio.NewReader(c)
for {
line, isPrefix, err := r.ReadLine()
if err != nil {
if err != io.EOF {
networkStats.WithLabelValues("tcp_error").Inc()
log.Errorf("Read %s failed: %v", c.RemoteAddr(), err)
}
break
}
if isPrefix {
networkStats.WithLabelValues("tcp_line_too_long").Inc()
log.Errorf("Read %s failed: line too long", c.RemoteAddr())
break
}
e <- lineToEvents(string(line))
}
}

View file

@ -38,7 +38,7 @@ func benchmarkExporter(times int, b *testing.B) {
}
}
for n := 0; n < b.N; n++ {
l := StatsDListener{}
l := StatsDUDPListener{}
// there are more events than input lines, need bigger buffer
events := make(chan Events, len(bytesInput)*times*2)

View file

@ -14,6 +14,8 @@
package main
import (
"fmt"
"net"
"testing"
"time"
)
@ -56,19 +58,55 @@ func TestNegativeCounter(t *testing.T) {
// It sends the same tags first with a valid value, then with an invalid one.
// The exporter should not panic, but drop the invalid event
func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
l := StatsDListener{}
events := make(chan Events, 2)
l.handlePacket([]byte("bar:200|c|#tag:value"), events)
l.handlePacket([]byte("bar:200|c|#tag:\xc3\x28invalid"), events)
ex := NewExporter(&metricMapper{}, true)
for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
events := make(chan Events, 2)
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events)
// Close channel to signify we are done with the listener after a short period.
go func() {
time.Sleep(time.Millisecond * 100)
close(events)
}()
ex.Listen(events)
}
}
type statsDPacketHandler interface {
handlePacket(packet []byte, e chan<- Events)
}
type mockStatsDTCPListener struct {
StatsDTCPListener
}
func (ml *mockStatsDTCPListener) handlePacket(packet []byte, e chan<- Events) {
lc, err := net.ListenTCP("tcp", nil)
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: listen failed: %v", err))
}
defer lc.Close()
// Close channel to signify we are done with the listener after a short period.
go func() {
time.Sleep(time.Millisecond * 100)
close(events)
cc, err := net.DialTCP("tcp", nil, lc.Addr().(*net.TCPAddr))
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: dial failed: %v", err))
}
defer cc.Close()
n, err := cc.Write(packet)
if err != nil || n != len(packet) {
panic(fmt.Sprintf("mockStatsDTCPListener: write failed: %v,%d", err, n))
}
}()
ex.Listen(events)
sc, err := lc.AcceptTCP()
if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err))
}
ml.handleConn(sc, e)
}

68
main.go
View file

@ -34,7 +34,9 @@ func init() {
var (
listenAddress = flag.String("web.listen-address", ":9102", "The address on which to expose the web interface and generated Prometheus metrics.")
metricsEndpoint = flag.String("web.telemetry-path", "/metrics", "Path under which to expose metrics.")
statsdListenAddress = flag.String("statsd.listen-address", ":9125", "The UDP address on which to receive statsd metric lines.")
statsdListenAddress = flag.String("statsd.listen-address", "", "The UDP address on which to receive statsd metric lines. DEPRECATED, use statsd.listen-udp instead.")
statsdListenUDP = flag.String("statsd.listen-udp", ":9125", "The UDP address on which to receive statsd metric lines. \"\" disables it.")
statsdListenTCP = flag.String("statsd.listen-tcp", ":9125", "The TCP address on which to receive statsd metric lines. \"\" disables it.")
mappingConfig = flag.String("statsd.mapping-config", "", "Metric mapping configuration file name.")
readBuffer = flag.Int("statsd.read-buffer", 0, "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.")
addSuffix = flag.Bool("statsd.add-suffix", true, "Add the metric type (counter/gauge/timer) as suffix to the generated Prometheus metric (NOT recommended, but set by default for backward compatibility).")
@ -55,7 +57,7 @@ func serveHTTP() {
log.Fatal(http.ListenAndServe(*listenAddress, nil))
}
func udpAddrFromString(addr string) *net.UDPAddr {
func ipPortFromString(addr string) (*net.IPAddr, int) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
log.Fatal("Bad StatsD listening address", addr)
@ -74,6 +76,11 @@ func udpAddrFromString(addr string) *net.UDPAddr {
log.Fatalf("Bad port %s: %s", portStr, err)
}
return ip, port
}
func udpAddrFromString(addr string) *net.UDPAddr {
ip, port := ipPortFromString(addr)
return &net.UDPAddr{
IP: ip.IP,
Port: port,
@ -81,6 +88,15 @@ func udpAddrFromString(addr string) *net.UDPAddr {
}
}
func tcpAddrFromString(addr string) *net.TCPAddr {
ip, port := ipPortFromString(addr)
return &net.TCPAddr{
IP: ip.IP,
Port: port,
Zone: ip.Zone,
}
}
func watchConfig(fileName string, mapper *metricMapper) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
@ -122,12 +138,22 @@ func main() {
os.Exit(0)
}
if *statsdListenAddress != "" {
log.Warnln("Warning: statsd.listen-address is DEPRECATED, please use statsd.listen-udp instead.")
*statsdListenUDP = *statsdListenAddress
}
if *statsdListenUDP == "" && *statsdListenTCP == "" {
log.Fatalln("At least one of UDP/TCP listeners must be specified.")
}
if *addSuffix {
log.Warnln("Warning: Using -statsd.add-suffix is discouraged. We recommend explicitly naming metrics appropriately in the mapping configuration.")
}
log.Infoln("Starting StatsD -> Prometheus Exporter", version.Info())
log.Infoln("Build context", version.BuildContext())
log.Infoln("Accepting StatsD Traffic on", *statsdListenAddress)
log.Infof("Accepting StatsD Traffic: UDP %v, TCP %v", *statsdListenUDP, *statsdListenTCP)
log.Infoln("Accepting Prometheus Requests on", *listenAddress)
go serveHTTP()
@ -135,21 +161,35 @@ func main() {
events := make(chan Events, 1024)
defer close(events)
listenAddr := udpAddrFromString(*statsdListenAddress)
conn, err := net.ListenUDP("udp", listenAddr)
if err != nil {
log.Fatal(err)
}
if *readBuffer != 0 {
err = conn.SetReadBuffer(*readBuffer)
if *statsdListenUDP != "" {
udpListenAddr := udpAddrFromString(*statsdListenUDP)
uconn, err := net.ListenUDP("udp", udpListenAddr)
if err != nil {
log.Fatal("Error setting UDP read buffer:", err)
log.Fatal(err)
}
if *readBuffer != 0 {
err = uconn.SetReadBuffer(*readBuffer)
if err != nil {
log.Fatal("Error setting UDP read buffer:", err)
}
}
ul := &StatsDUDPListener{conn: uconn}
go ul.Listen(events)
}
l := &StatsDListener{conn: conn}
go l.Listen(events)
if *statsdListenTCP != "" {
tcpListenAddr := tcpAddrFromString(*statsdListenTCP)
tconn, err := net.ListenTCP("tcp", tcpListenAddr)
if err != nil {
log.Fatal(err)
}
defer tconn.Close()
tl := &StatsDTCPListener{conn: tconn}
go tl.Listen(events)
}
mapper := &metricMapper{}
if *mappingConfig != "" {