From 05bca8429445cc2ec9d6aa48815e071d5cd92dca Mon Sep 17 00:00:00 2001 From: Wangchong Zhou Date: Fri, 5 Apr 2019 16:37:23 -0700 Subject: [PATCH 1/4] Implement listener for Unixgram sockets As requested in #189 Signed-off-by: Wangchong Zhou --- exporter.go | 26 +++++++++++++++++++++ main.go | 64 ++++++++++++++++++++++++++++++++++++++++++++-------- telemetry.go | 7 ++++++ 3 files changed, 87 insertions(+), 10 deletions(-) diff --git a/exporter.go b/exporter.go index 15c01b7..ca1e0e0 100644 --- a/exporter.go +++ b/exporter.go @@ -707,3 +707,29 @@ func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) { e <- lineToEvents(string(line)) } } + +type StatsDUnixgramListener struct { + conn *net.UnixConn +} + +func (l *StatsDUnixgramListener) Listen(e chan<- Events) { + buf := make([]byte, 65535) + for { + n, _, err := l.conn.ReadFromUnix(buf) + if err != nil { + log.Fatal(err) + } + l.handlePacket(buf[:n], e) + } +} + +func (l *StatsDUnixgramListener) handlePacket(packet []byte, e chan<- Events) { + unixgramPackets.Inc() + lines := strings.Split(string(packet), "\n") + events := Events{} + for _, line := range lines { + linesReceived.Inc() + events = append(events, lineToEvents(line)...) + } + e <- events +} diff --git a/main.go b/main.go index 7127afb..2f4af3f 100644 --- a/main.go +++ b/main.go @@ -137,13 +137,16 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string) error { func main() { var ( - listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String() - metricsEndpoint = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() - statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() - statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() - mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() - readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() - dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String() + listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String() + metricsEndpoint = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() + statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() + statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() + statsdListenUnixgram = kingpin.Flag("statsd.listen-unixgram", "The Unixgram socket path to receive statsd metric lines in datagram. \"\" disables it.").Default("").String() + // not using Int here because flag diplays default in decimal, 0755 will show as 493 + statsdUnixSocketUmask = kingpin.Flag("statsd.unixsocket-umask", "The umask of the unix socket.").Default("755").String() + mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() + readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() + dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String() ) log.AddFlags(kingpin.CommandLine) @@ -151,13 +154,13 @@ func main() { kingpin.HelpFlag.Short('h') kingpin.Parse() - if *statsdListenUDP == "" && *statsdListenTCP == "" { - log.Fatalln("At least one of UDP/TCP listeners must be specified.") + if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" { + log.Fatalln("At least one of UDP/TCP/Unixgram listeners must be specified.") } log.Infoln("Starting StatsD -> Prometheus Exporter", version.Info()) log.Infoln("Build context", version.BuildContext()) - log.Infof("Accepting StatsD Traffic: UDP %v, TCP %v", *statsdListenUDP, *statsdListenTCP) + log.Infof("Accepting StatsD Traffic: UDP %v, TCP %v, Unixgram %v", *statsdListenUDP, *statsdListenTCP, *statsdListenUnixgram) log.Infoln("Accepting Prometheus Requests on", *listenAddress) go serveHTTP(*listenAddress, *metricsEndpoint) @@ -195,6 +198,47 @@ func main() { go tl.Listen(events) } + if *statsdListenUnixgram != "" { + if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) { + log.Fatalf("Unixgram socket \"%s\" already exists", *statsdListenUnixgram) + } + uxgconn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{ + Net: "unixgram", + Name: *statsdListenUnixgram, + }) + if err != nil { + log.Fatal(err) + } + + defer uxgconn.Close() + + if *readBuffer != 0 { + err = uxgconn.SetReadBuffer(*readBuffer) + if err != nil { + log.Fatal("Error setting Unixgram read buffer:", err) + } + } + + ul := &StatsDUnixgramListener{conn: uxgconn} + go ul.Listen(events) + + // if it's an abstract unix domain socket, it won't exist on fs + // so we can't chmod it either + if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) { + // convert the string to octet + perm, err := strconv.ParseInt("0"+string(*statsdUnixSocketUmask), 8, 32) + if err != nil { + log.Warnf("Bad permission %s: %v, ignoring\n", *statsdUnixSocketUmask, err) + } else { + err = os.Chmod(*statsdListenUnixgram, os.FileMode(perm)) + if err != nil { + log.Warnf("Failed to change unixgram socket permission: %v", err) + } + } + } + + } + mapper := &mapper.MetricMapper{MappingsCount: mappingsCount} if *mappingConfig != "" { err := mapper.InitFromFile(*mappingConfig) diff --git a/telemetry.go b/telemetry.go index 541140c..d430086 100644 --- a/telemetry.go +++ b/telemetry.go @@ -53,6 +53,12 @@ var ( Help: "The number of lines discarded due to being too long.", }, ) + unixgramPackets = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_unixgram_packets_total", + Help: "The total number of StatsD packets received over Unixgram.", + }, + ) linesReceived = prometheus.NewCounter( prometheus.CounterOpts{ Name: "statsd_exporter_lines_total", @@ -118,6 +124,7 @@ func init() { prometheus.MustRegister(tcpConnections) prometheus.MustRegister(tcpErrors) prometheus.MustRegister(tcpLineTooLong) + prometheus.MustRegister(unixgramPackets) prometheus.MustRegister(linesReceived) prometheus.MustRegister(samplesReceived) prometheus.MustRegister(sampleErrors) From 9ed6d59151bc5fdb285e83f6e45fe6d8ef5ce9eb Mon Sep 17 00:00:00 2001 From: Matthias Rampke Date: Tue, 9 Apr 2019 11:13:33 -0700 Subject: [PATCH 2/4] Correct to use name socket mode instead of mask Co-Authored-By: fffonion Signed-off-by: Wangchong Zhou --- main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index 2f4af3f..30e4db3 100644 --- a/main.go +++ b/main.go @@ -143,10 +143,10 @@ func main() { statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() statsdListenUnixgram = kingpin.Flag("statsd.listen-unixgram", "The Unixgram socket path to receive statsd metric lines in datagram. \"\" disables it.").Default("").String() // not using Int here because flag diplays default in decimal, 0755 will show as 493 - statsdUnixSocketUmask = kingpin.Flag("statsd.unixsocket-umask", "The umask of the unix socket.").Default("755").String() - mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() - readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() - dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String() + statsdUnixSocketMode = kingpin.Flag("statsd.unixsocket-mode", "The permission mode of the unix socket.").Default("755").String() + mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() + readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() + dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String() ) log.AddFlags(kingpin.CommandLine) @@ -226,9 +226,9 @@ func main() { // so we can't chmod it either if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) { // convert the string to octet - perm, err := strconv.ParseInt("0"+string(*statsdUnixSocketUmask), 8, 32) + perm, err := strconv.ParseInt("0"+string(*statsdUnixSocketMode), 8, 32) if err != nil { - log.Warnf("Bad permission %s: %v, ignoring\n", *statsdUnixSocketUmask, err) + log.Warnf("Bad permission %s: %v, ignoring\n", *statsdUnixSocketMode, err) } else { err = os.Chmod(*statsdListenUnixgram, os.FileMode(perm)) if err != nil { From 383ee9bd3bcc628d92722fcb123e905eddd1aea4 Mon Sep 17 00:00:00 2001 From: Wangchong Zhou Date: Mon, 22 Apr 2019 16:31:24 -0700 Subject: [PATCH 3/4] Add a signal handler to allow clean up of unixgram socket file Signed-off-by: Wangchong Zhou --- exporter.go | 18 +++++++++++++++++- main.go | 15 +++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/exporter.go b/exporter.go index ca1e0e0..6115ccd 100644 --- a/exporter.go +++ b/exporter.go @@ -652,7 +652,13 @@ func (l *StatsDUDPListener) Listen(e chan<- Events) { for { n, _, err := l.conn.ReadFromUDP(buf) if err != nil { - log.Fatal(err) + // https://github.com/golang/go/issues/4373 + // ignore net: errClosing error as it will occur during shutdown + if strings.HasSuffix(err.Error(), "use of closed network connection") { + return + } + log.Error(err) + return } l.handlePacket(buf[0:n], e) } @@ -677,6 +683,11 @@ func (l *StatsDTCPListener) Listen(e chan<- Events) { for { c, err := l.conn.AcceptTCP() if err != nil { + // https://github.com/golang/go/issues/4373 + // ignore net: errClosing error as it will occur during shutdown + if strings.HasSuffix(err.Error(), "use of closed network connection") { + return + } log.Fatalf("AcceptTCP failed: %v", err) } go l.handleConn(c, e) @@ -717,6 +728,11 @@ func (l *StatsDUnixgramListener) Listen(e chan<- Events) { for { n, _, err := l.conn.ReadFromUnix(buf) if err != nil { + // https://github.com/golang/go/issues/4373 + // ignore net: errClosing error as it will occur during shutdown + if strings.HasSuffix(err.Error(), "use of closed network connection") { + return + } log.Fatal(err) } l.handlePacket(buf[:n], e) diff --git a/main.go b/main.go index 30e4db3..5b11c7e 100644 --- a/main.go +++ b/main.go @@ -18,7 +18,9 @@ import ( "net" "net/http" "os" + "os/signal" "strconv" + "syscall" "github.com/howeyc/fsnotify" "github.com/prometheus/client_golang/prometheus" @@ -199,7 +201,8 @@ func main() { } if *statsdListenUnixgram != "" { - if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) { + var err error + if _, err = os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) { log.Fatalf("Unixgram socket \"%s\" already exists", *statsdListenUnixgram) } uxgconn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{ @@ -225,6 +228,8 @@ func main() { // if it's an abstract unix domain socket, it won't exist on fs // so we can't chmod it either if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) { + defer os.Remove(*statsdListenUnixgram) + // convert the string to octet perm, err := strconv.ParseInt("0"+string(*statsdUnixSocketMode), 8, 32) if err != nil { @@ -254,5 +259,11 @@ func main() { go watchConfig(*mappingConfig, mapper) } exporter := NewExporter(mapper) - exporter.Listen(events) + + signals := make(chan os.Signal) + signal.Notify(signals, os.Interrupt, syscall.SIGTERM) + + go exporter.Listen(events) + + <-signals } From 47b5ef9be0fbdc2629b4820d6640c86f3110db96 Mon Sep 17 00:00:00 2001 From: Wangchong Zhou Date: Mon, 22 Apr 2019 16:35:12 -0700 Subject: [PATCH 4/4] Buffer signals channel Signed-off-by: Wangchong Zhou --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 5b11c7e..55e4896 100644 --- a/main.go +++ b/main.go @@ -260,7 +260,7 @@ func main() { } exporter := NewExporter(mapper) - signals := make(chan os.Signal) + signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) go exporter.Listen(events)