diff --git a/bridge_test.go b/bridge_test.go index e3a81e7..33d4716 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -271,24 +271,26 @@ func TestHandlePacket(t *testing.T) { }, } - l := StatsDListener{} - events := make(chan Events, 32) - for i, scenario := range scenarios { - l.handlePacket([]byte(scenario.in), events) + for k, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { + events := make(chan Events, 32) + for i, scenario := range scenarios { + l.handlePacket([]byte(scenario.in), events) - // Flatten actual events. - actual := Events{} - for i := 0; i < len(events); i++ { - actual = append(actual, <-events...) - } + le := len(events) + // Flatten actual events. + actual := Events{} + for i := 0; i < le; i++ { + actual = append(actual, <-events...) + } - if len(actual) != len(scenario.out) { - t.Fatalf("%d. Expected %d events, got %d in scenario '%s'", i, len(scenario.out), len(actual), scenario.name) - } + if len(actual) != len(scenario.out) { + t.Fatalf("%d.%d. Expected %d events, got %d in scenario '%s'", k, i, len(scenario.out), len(actual), scenario.name) + } - for j, expected := range scenario.out { - if !reflect.DeepEqual(&expected, &actual[j]) { - t.Fatalf("%d.%d. Expected %#v, got %#v in scenario '%s'", i, j, expected, actual[j], scenario.name) + for j, expected := range scenario.out { + if !reflect.DeepEqual(&expected, &actual[j]) { + t.Fatalf("%d.%d.%d. Expected %#v, got %#v in scenario '%s'", k, i, j, expected, actual[j], scenario.name) + } } } } diff --git a/exporter.go b/exporter.go index 0603dc7..6d5d917 100644 --- a/exporter.go +++ b/exporter.go @@ -14,10 +14,12 @@ package main import ( + "bufio" "bytes" "encoding/binary" "fmt" "hash/fnv" + "io" "net" "regexp" "strconv" @@ -304,10 +306,6 @@ func NewExporter(mapper *metricMapper, addSuffix bool) *Exporter { } } -type StatsDListener struct { - conn *net.UDPConn -} - func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (Event, error) { switch statType { case "c": @@ -336,17 +334,6 @@ func buildEvent(statType, metric string, value float64, relative bool, labels ma } } -func (l *StatsDListener) Listen(e chan<- Events) { - buf := make([]byte, 65535) - for { - n, _, err := l.conn.ReadFromUDP(buf) - if err != nil { - log.Fatal(err) - } - l.handlePacket(buf[0:n], e) - } -} - func parseDogStatsDTagsToLabels(component string) map[string]string { labels := map[string]string{} networkStats.WithLabelValues("dogstatsd_tags").Inc() @@ -366,105 +353,162 @@ func parseDogStatsDTagsToLabels(component string) map[string]string { return labels } -func (l *StatsDListener) handlePacket(packet []byte, e chan<- Events) { +func lineToEvents(line string) Events { + events := Events{} + if line == "" { + return events + } + + elements := strings.SplitN(line, ":", 2) + if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { + networkStats.WithLabelValues("malformed_line").Inc() + log.Errorln("Bad line from StatsD:", line) + return events + } + metric := elements[0] + var samples []string + if strings.Contains(elements[1], "|#") { + // using datadog extensions, disable multi-metrics + samples = elements[1:] + } else { + samples = strings.Split(elements[1], ":") + } +samples: + for _, sample := range samples { + components := strings.Split(sample, "|") + samplingFactor := 1.0 + if len(components) < 2 || len(components) > 4 { + networkStats.WithLabelValues("malformed_component").Inc() + log.Errorln("Bad component on line:", line) + continue + } + valueStr, statType := components[0], components[1] + + var relative = false + if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 { + relative = true + } + + value, err := strconv.ParseFloat(valueStr, 64) + if err != nil { + log.Errorf("Bad value %s on line: %s", valueStr, line) + networkStats.WithLabelValues("malformed_value").Inc() + continue + } + + multiplyEvents := 1 + labels := map[string]string{} + if len(components) >= 3 { + for _, component := range components[2:] { + if len(component) == 0 { + log.Errorln("Empty component on line: ", line) + networkStats.WithLabelValues("malformed_component").Inc() + continue samples + } + } + + for _, component := range components[2:] { + switch component[0] { + case '@': + if statType != "c" && statType != "ms" { + log.Errorln("Illegal sampling factor for non-counter metric on line", line) + networkStats.WithLabelValues("illegal_sample_factor").Inc() + continue + } + samplingFactor, err = strconv.ParseFloat(component[1:], 64) + if err != nil { + log.Errorf("Invalid sampling factor %s on line %s", component[1:], line) + networkStats.WithLabelValues("invalid_sample_factor").Inc() + } + if samplingFactor == 0 { + samplingFactor = 1 + } + + if statType == "c" { + value /= samplingFactor + } else if statType == "ms" { + multiplyEvents = int(1 / samplingFactor) + } + case '#': + labels = parseDogStatsDTagsToLabels(component) + default: + log.Errorf("Invalid sampling factor or tag section %s on line %s", components[2], line) + networkStats.WithLabelValues("invalid_sample_factor").Inc() + continue + } + } + } + + for i := 0; i < multiplyEvents; i++ { + event, err := buildEvent(statType, metric, value, relative, labels) + if err != nil { + log.Errorf("Error building event on line %s: %s", line, err) + networkStats.WithLabelValues("illegal_event").Inc() + continue + } + events = append(events, event) + } + networkStats.WithLabelValues("legal").Inc() + } + return events +} + +type StatsDUDPListener struct { + conn *net.UDPConn +} + +func (l *StatsDUDPListener) Listen(e chan<- Events) { + buf := make([]byte, 65535) + for { + n, _, err := l.conn.ReadFromUDP(buf) + if err != nil { + log.Fatal(err) + } + l.handlePacket(buf[0:n], e) + } +} + +func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) { lines := strings.Split(string(packet), "\n") events := Events{} for _, line := range lines { - if line == "" { - continue - } - - elements := strings.SplitN(line, ":", 2) - if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) { - networkStats.WithLabelValues("malformed_line").Inc() - log.Errorln("Bad line from StatsD:", line) - continue - } - metric := elements[0] - var samples []string - if strings.Contains(elements[1], "|#") { - // using datadog extensions, disable multi-metrics - samples = elements[1:] - } else { - samples = strings.Split(elements[1], ":") - } - samples: - for _, sample := range samples { - components := strings.Split(sample, "|") - samplingFactor := 1.0 - if len(components) < 2 || len(components) > 4 { - networkStats.WithLabelValues("malformed_component").Inc() - log.Errorln("Bad component on line:", line) - continue - } - valueStr, statType := components[0], components[1] - - var relative = false - if strings.Index(valueStr, "+") == 0 || strings.Index(valueStr, "-") == 0 { - relative = true - } - - value, err := strconv.ParseFloat(valueStr, 64) - if err != nil { - log.Errorf("Bad value %s on line: %s", valueStr, line) - networkStats.WithLabelValues("malformed_value").Inc() - continue - } - - multiplyEvents := 1 - labels := map[string]string{} - if len(components) >= 3 { - for _, component := range components[2:] { - if len(component) == 0 { - log.Errorln("Empty component on line: ", line) - networkStats.WithLabelValues("malformed_component").Inc() - continue samples - } - } - - for _, component := range components[2:] { - switch component[0] { - case '@': - if statType != "c" && statType != "ms" { - log.Errorln("Illegal sampling factor for non-counter metric on line", line) - networkStats.WithLabelValues("illegal_sample_factor").Inc() - continue - } - samplingFactor, err = strconv.ParseFloat(component[1:], 64) - if err != nil { - log.Errorf("Invalid sampling factor %s on line %s", component[1:], line) - networkStats.WithLabelValues("invalid_sample_factor").Inc() - } - if samplingFactor == 0 { - samplingFactor = 1 - } - - if statType == "c" { - value /= samplingFactor - } else if statType == "ms" { - multiplyEvents = int(1 / samplingFactor) - } - case '#': - labels = parseDogStatsDTagsToLabels(component) - default: - log.Errorf("Invalid sampling factor or tag section %s on line %s", components[2], line) - networkStats.WithLabelValues("invalid_sample_factor").Inc() - continue - } - } - } - - for i := 0; i < multiplyEvents; i++ { - event, err := buildEvent(statType, metric, value, relative, labels) - if err != nil { - log.Errorf("Error building event on line %s: %s", line, err) - networkStats.WithLabelValues("illegal_event").Inc() - continue - } - events = append(events, event) - } - networkStats.WithLabelValues("legal").Inc() - } + events = append(events, lineToEvents(line)...) } e <- events } + +type StatsDTCPListener struct { + conn *net.TCPListener +} + +func (l *StatsDTCPListener) Listen(e chan<- Events) { + for { + c, err := l.conn.AcceptTCP() + if err != nil { + log.Fatalf("AcceptTCP failed: %v", err) + } + go l.handleConn(c, e) + } +} + +func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) { + defer c.Close() + + r := bufio.NewReader(c) + for { + line, isPrefix, err := r.ReadLine() + if err != nil { + if err != io.EOF { + networkStats.WithLabelValues("tcp_error").Inc() + log.Errorf("Read %s failed: %v", c.RemoteAddr(), err) + } + break + } + if isPrefix { + networkStats.WithLabelValues("tcp_line_too_long").Inc() + log.Errorf("Read %s failed: line too long", c.RemoteAddr()) + break + } + e <- lineToEvents(string(line)) + } +} diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index ad005da..1c3104c 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -38,7 +38,7 @@ func benchmarkExporter(times int, b *testing.B) { } } for n := 0; n < b.N; n++ { - l := StatsDListener{} + l := StatsDUDPListener{} // there are more events than input lines, need bigger buffer events := make(chan Events, len(bytesInput)*times*2) diff --git a/exporter_test.go b/exporter_test.go index 1eb4bf5..cbfb910 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -14,6 +14,8 @@ package main import ( + "fmt" + "net" "testing" "time" ) @@ -56,19 +58,55 @@ func TestNegativeCounter(t *testing.T) { // It sends the same tags first with a valid value, then with an invalid one. // The exporter should not panic, but drop the invalid event func TestInvalidUtf8InDatadogTagValue(t *testing.T) { - l := StatsDListener{} - events := make(chan Events, 2) - - l.handlePacket([]byte("bar:200|c|#tag:value"), events) - l.handlePacket([]byte("bar:200|c|#tag:\xc3\x28invalid"), events) - ex := NewExporter(&metricMapper{}, true) + for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { + events := make(chan Events, 2) + + l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events) + + // Close channel to signify we are done with the listener after a short period. + go func() { + time.Sleep(time.Millisecond * 100) + close(events) + }() + + ex.Listen(events) + } +} + +type statsDPacketHandler interface { + handlePacket(packet []byte, e chan<- Events) +} + +type mockStatsDTCPListener struct { + StatsDTCPListener +} + +func (ml *mockStatsDTCPListener) handlePacket(packet []byte, e chan<- Events) { + lc, err := net.ListenTCP("tcp", nil) + if err != nil { + panic(fmt.Sprintf("mockStatsDTCPListener: listen failed: %v", err)) + } + + defer lc.Close() - // Close channel to signify we are done with the listener after a short period. go func() { - time.Sleep(time.Millisecond * 100) - close(events) + cc, err := net.DialTCP("tcp", nil, lc.Addr().(*net.TCPAddr)) + if err != nil { + panic(fmt.Sprintf("mockStatsDTCPListener: dial failed: %v", err)) + } + + defer cc.Close() + + n, err := cc.Write(packet) + if err != nil || n != len(packet) { + panic(fmt.Sprintf("mockStatsDTCPListener: write failed: %v,%d", err, n)) + } }() - ex.Listen(events) + sc, err := lc.AcceptTCP() + if err != nil { + panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err)) + } + ml.handleConn(sc, e) } diff --git a/main.go b/main.go index e923a35..d979859 100644 --- a/main.go +++ b/main.go @@ -34,7 +34,9 @@ func init() { var ( listenAddress = flag.String("web.listen-address", ":9102", "The address on which to expose the web interface and generated Prometheus metrics.") metricsEndpoint = flag.String("web.telemetry-path", "/metrics", "Path under which to expose metrics.") - statsdListenAddress = flag.String("statsd.listen-address", ":9125", "The UDP address on which to receive statsd metric lines.") + statsdListenAddress = flag.String("statsd.listen-address", "", "The UDP address on which to receive statsd metric lines. DEPRECATED, use statsd.listen-udp instead.") + statsdListenUDP = flag.String("statsd.listen-udp", ":9125", "The UDP address on which to receive statsd metric lines. \"\" disables it.") + statsdListenTCP = flag.String("statsd.listen-tcp", ":9125", "The TCP address on which to receive statsd metric lines. \"\" disables it.") mappingConfig = flag.String("statsd.mapping-config", "", "Metric mapping configuration file name.") readBuffer = flag.Int("statsd.read-buffer", 0, "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.") addSuffix = flag.Bool("statsd.add-suffix", true, "Add the metric type (counter/gauge/timer) as suffix to the generated Prometheus metric (NOT recommended, but set by default for backward compatibility).") @@ -55,7 +57,7 @@ func serveHTTP() { log.Fatal(http.ListenAndServe(*listenAddress, nil)) } -func udpAddrFromString(addr string) *net.UDPAddr { +func ipPortFromString(addr string) (*net.IPAddr, int) { host, portStr, err := net.SplitHostPort(addr) if err != nil { log.Fatal("Bad StatsD listening address", addr) @@ -74,6 +76,11 @@ func udpAddrFromString(addr string) *net.UDPAddr { log.Fatalf("Bad port %s: %s", portStr, err) } + return ip, port +} + +func udpAddrFromString(addr string) *net.UDPAddr { + ip, port := ipPortFromString(addr) return &net.UDPAddr{ IP: ip.IP, Port: port, @@ -81,6 +88,15 @@ func udpAddrFromString(addr string) *net.UDPAddr { } } +func tcpAddrFromString(addr string) *net.TCPAddr { + ip, port := ipPortFromString(addr) + return &net.TCPAddr{ + IP: ip.IP, + Port: port, + Zone: ip.Zone, + } +} + func watchConfig(fileName string, mapper *metricMapper) { watcher, err := fsnotify.NewWatcher() if err != nil { @@ -122,12 +138,22 @@ func main() { os.Exit(0) } + if *statsdListenAddress != "" { + log.Warnln("Warning: statsd.listen-address is DEPRECATED, please use statsd.listen-udp instead.") + *statsdListenUDP = *statsdListenAddress + } + + if *statsdListenUDP == "" && *statsdListenTCP == "" { + log.Fatalln("At least one of UDP/TCP listeners must be specified.") + } + if *addSuffix { log.Warnln("Warning: Using -statsd.add-suffix is discouraged. We recommend explicitly naming metrics appropriately in the mapping configuration.") } + log.Infoln("Starting StatsD -> Prometheus Exporter", version.Info()) log.Infoln("Build context", version.BuildContext()) - log.Infoln("Accepting StatsD Traffic on", *statsdListenAddress) + log.Infof("Accepting StatsD Traffic: UDP %v, TCP %v", *statsdListenUDP, *statsdListenTCP) log.Infoln("Accepting Prometheus Requests on", *listenAddress) go serveHTTP() @@ -135,21 +161,35 @@ func main() { events := make(chan Events, 1024) defer close(events) - listenAddr := udpAddrFromString(*statsdListenAddress) - conn, err := net.ListenUDP("udp", listenAddr) - if err != nil { - log.Fatal(err) - } - - if *readBuffer != 0 { - err = conn.SetReadBuffer(*readBuffer) + if *statsdListenUDP != "" { + udpListenAddr := udpAddrFromString(*statsdListenUDP) + uconn, err := net.ListenUDP("udp", udpListenAddr) if err != nil { - log.Fatal("Error setting UDP read buffer:", err) + log.Fatal(err) } + + if *readBuffer != 0 { + err = uconn.SetReadBuffer(*readBuffer) + if err != nil { + log.Fatal("Error setting UDP read buffer:", err) + } + } + + ul := &StatsDUDPListener{conn: uconn} + go ul.Listen(events) } - l := &StatsDListener{conn: conn} - go l.Listen(events) + if *statsdListenTCP != "" { + tcpListenAddr := tcpAddrFromString(*statsdListenTCP) + tconn, err := net.ListenTCP("tcp", tcpListenAddr) + if err != nil { + log.Fatal(err) + } + defer tconn.Close() + + tl := &StatsDTCPListener{conn: tconn} + go tl.Listen(events) + } mapper := &metricMapper{} if *mappingConfig != "" {