forked from mirrors/statsd_exporter
Merge pull request #199 from Kong/feat/unixgram-socket
Implement listener for Unixgram sockets
This commit is contained in:
commit
e3d6050616
3 changed files with 116 additions and 12 deletions
44
exporter.go
44
exporter.go
|
@ -712,7 +712,13 @@ func (l *StatsDUDPListener) Listen(e chan<- Events) {
|
||||||
for {
|
for {
|
||||||
n, _, err := l.conn.ReadFromUDP(buf)
|
n, _, err := l.conn.ReadFromUDP(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
// https://github.com/golang/go/issues/4373
|
||||||
|
// ignore net: errClosing error as it will occur during shutdown
|
||||||
|
if strings.HasSuffix(err.Error(), "use of closed network connection") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
l.handlePacket(buf[0:n], e)
|
l.handlePacket(buf[0:n], e)
|
||||||
}
|
}
|
||||||
|
@ -737,6 +743,11 @@ func (l *StatsDTCPListener) Listen(e chan<- Events) {
|
||||||
for {
|
for {
|
||||||
c, err := l.conn.AcceptTCP()
|
c, err := l.conn.AcceptTCP()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// https://github.com/golang/go/issues/4373
|
||||||
|
// ignore net: errClosing error as it will occur during shutdown
|
||||||
|
if strings.HasSuffix(err.Error(), "use of closed network connection") {
|
||||||
|
return
|
||||||
|
}
|
||||||
log.Fatalf("AcceptTCP failed: %v", err)
|
log.Fatalf("AcceptTCP failed: %v", err)
|
||||||
}
|
}
|
||||||
go l.handleConn(c, e)
|
go l.handleConn(c, e)
|
||||||
|
@ -767,3 +778,34 @@ func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) {
|
||||||
e <- lineToEvents(string(line))
|
e <- lineToEvents(string(line))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StatsDUnixgramListener struct {
|
||||||
|
conn *net.UnixConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *StatsDUnixgramListener) Listen(e chan<- Events) {
|
||||||
|
buf := make([]byte, 65535)
|
||||||
|
for {
|
||||||
|
n, _, err := l.conn.ReadFromUnix(buf)
|
||||||
|
if err != nil {
|
||||||
|
// https://github.com/golang/go/issues/4373
|
||||||
|
// ignore net: errClosing error as it will occur during shutdown
|
||||||
|
if strings.HasSuffix(err.Error(), "use of closed network connection") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
l.handlePacket(buf[:n], e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *StatsDUnixgramListener) handlePacket(packet []byte, e chan<- Events) {
|
||||||
|
unixgramPackets.Inc()
|
||||||
|
lines := strings.Split(string(packet), "\n")
|
||||||
|
events := Events{}
|
||||||
|
for _, line := range lines {
|
||||||
|
linesReceived.Inc()
|
||||||
|
events = append(events, lineToEvents(line)...)
|
||||||
|
}
|
||||||
|
e <- events
|
||||||
|
}
|
||||||
|
|
77
main.go
77
main.go
|
@ -18,7 +18,9 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"os/signal"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
"github.com/howeyc/fsnotify"
|
"github.com/howeyc/fsnotify"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
@ -137,13 +139,16 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string) error {
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var (
|
var (
|
||||||
listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String()
|
listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String()
|
||||||
metricsEndpoint = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String()
|
metricsEndpoint = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String()
|
||||||
statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
|
statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
|
||||||
statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
|
statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
|
||||||
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
|
statsdListenUnixgram = kingpin.Flag("statsd.listen-unixgram", "The Unixgram socket path to receive statsd metric lines in datagram. \"\" disables it.").Default("").String()
|
||||||
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
|
// not using Int here because flag diplays default in decimal, 0755 will show as 493
|
||||||
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
|
statsdUnixSocketMode = kingpin.Flag("statsd.unixsocket-mode", "The permission mode of the unix socket.").Default("755").String()
|
||||||
|
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
|
||||||
|
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
|
||||||
|
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
|
||||||
)
|
)
|
||||||
|
|
||||||
log.AddFlags(kingpin.CommandLine)
|
log.AddFlags(kingpin.CommandLine)
|
||||||
|
@ -151,13 +156,13 @@ func main() {
|
||||||
kingpin.HelpFlag.Short('h')
|
kingpin.HelpFlag.Short('h')
|
||||||
kingpin.Parse()
|
kingpin.Parse()
|
||||||
|
|
||||||
if *statsdListenUDP == "" && *statsdListenTCP == "" {
|
if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" {
|
||||||
log.Fatalln("At least one of UDP/TCP listeners must be specified.")
|
log.Fatalln("At least one of UDP/TCP/Unixgram listeners must be specified.")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infoln("Starting StatsD -> Prometheus Exporter", version.Info())
|
log.Infoln("Starting StatsD -> Prometheus Exporter", version.Info())
|
||||||
log.Infoln("Build context", version.BuildContext())
|
log.Infoln("Build context", version.BuildContext())
|
||||||
log.Infof("Accepting StatsD Traffic: UDP %v, TCP %v", *statsdListenUDP, *statsdListenTCP)
|
log.Infof("Accepting StatsD Traffic: UDP %v, TCP %v, Unixgram %v", *statsdListenUDP, *statsdListenTCP, *statsdListenUnixgram)
|
||||||
log.Infoln("Accepting Prometheus Requests on", *listenAddress)
|
log.Infoln("Accepting Prometheus Requests on", *listenAddress)
|
||||||
|
|
||||||
go serveHTTP(*listenAddress, *metricsEndpoint)
|
go serveHTTP(*listenAddress, *metricsEndpoint)
|
||||||
|
@ -195,6 +200,50 @@ func main() {
|
||||||
go tl.Listen(events)
|
go tl.Listen(events)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if *statsdListenUnixgram != "" {
|
||||||
|
var err error
|
||||||
|
if _, err = os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) {
|
||||||
|
log.Fatalf("Unixgram socket \"%s\" already exists", *statsdListenUnixgram)
|
||||||
|
}
|
||||||
|
uxgconn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
|
||||||
|
Net: "unixgram",
|
||||||
|
Name: *statsdListenUnixgram,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer uxgconn.Close()
|
||||||
|
|
||||||
|
if *readBuffer != 0 {
|
||||||
|
err = uxgconn.SetReadBuffer(*readBuffer)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Error setting Unixgram read buffer:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ul := &StatsDUnixgramListener{conn: uxgconn}
|
||||||
|
go ul.Listen(events)
|
||||||
|
|
||||||
|
// if it's an abstract unix domain socket, it won't exist on fs
|
||||||
|
// so we can't chmod it either
|
||||||
|
if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) {
|
||||||
|
defer os.Remove(*statsdListenUnixgram)
|
||||||
|
|
||||||
|
// convert the string to octet
|
||||||
|
perm, err := strconv.ParseInt("0"+string(*statsdUnixSocketMode), 8, 32)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Bad permission %s: %v, ignoring\n", *statsdUnixSocketMode, err)
|
||||||
|
} else {
|
||||||
|
err = os.Chmod(*statsdListenUnixgram, os.FileMode(perm))
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("Failed to change unixgram socket permission: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
mapper := &mapper.MetricMapper{MappingsCount: mappingsCount}
|
mapper := &mapper.MetricMapper{MappingsCount: mappingsCount}
|
||||||
if *mappingConfig != "" {
|
if *mappingConfig != "" {
|
||||||
err := mapper.InitFromFile(*mappingConfig)
|
err := mapper.InitFromFile(*mappingConfig)
|
||||||
|
@ -210,5 +259,11 @@ func main() {
|
||||||
go watchConfig(*mappingConfig, mapper)
|
go watchConfig(*mappingConfig, mapper)
|
||||||
}
|
}
|
||||||
exporter := NewExporter(mapper)
|
exporter := NewExporter(mapper)
|
||||||
exporter.Listen(events)
|
|
||||||
|
signals := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
|
||||||
|
|
||||||
|
go exporter.Listen(events)
|
||||||
|
|
||||||
|
<-signals
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,12 @@ var (
|
||||||
Help: "The number of lines discarded due to being too long.",
|
Help: "The number of lines discarded due to being too long.",
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
unixgramPackets = prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "statsd_exporter_unixgram_packets_total",
|
||||||
|
Help: "The total number of StatsD packets received over Unixgram.",
|
||||||
|
},
|
||||||
|
)
|
||||||
linesReceived = prometheus.NewCounter(
|
linesReceived = prometheus.NewCounter(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: "statsd_exporter_lines_total",
|
Name: "statsd_exporter_lines_total",
|
||||||
|
@ -132,6 +138,7 @@ func init() {
|
||||||
prometheus.MustRegister(tcpConnections)
|
prometheus.MustRegister(tcpConnections)
|
||||||
prometheus.MustRegister(tcpErrors)
|
prometheus.MustRegister(tcpErrors)
|
||||||
prometheus.MustRegister(tcpLineTooLong)
|
prometheus.MustRegister(tcpLineTooLong)
|
||||||
|
prometheus.MustRegister(unixgramPackets)
|
||||||
prometheus.MustRegister(linesReceived)
|
prometheus.MustRegister(linesReceived)
|
||||||
prometheus.MustRegister(samplesReceived)
|
prometheus.MustRegister(samplesReceived)
|
||||||
prometheus.MustRegister(sampleErrors)
|
prometheus.MustRegister(sampleErrors)
|
||||||
|
|
Loading…
Reference in a new issue