Implement listener for Unixgram sockets

As requested in #189

Signed-off-by: Wangchong Zhou <fffonion@gmail.com>
This commit is contained in:
Wangchong Zhou 2019-04-05 16:37:23 -07:00
parent 71df5a3198
commit 05bca84294
No known key found for this signature in database
GPG key ID: B607274584E8D5E5
3 changed files with 87 additions and 10 deletions

View file

@ -707,3 +707,29 @@ func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) {
e <- lineToEvents(string(line))
}
}
type StatsDUnixgramListener struct {
conn *net.UnixConn
}
func (l *StatsDUnixgramListener) Listen(e chan<- Events) {
buf := make([]byte, 65535)
for {
n, _, err := l.conn.ReadFromUnix(buf)
if err != nil {
log.Fatal(err)
}
l.handlePacket(buf[:n], e)
}
}
func (l *StatsDUnixgramListener) handlePacket(packet []byte, e chan<- Events) {
unixgramPackets.Inc()
lines := strings.Split(string(packet), "\n")
events := Events{}
for _, line := range lines {
linesReceived.Inc()
events = append(events, lineToEvents(line)...)
}
e <- events
}

64
main.go
View file

@ -137,13 +137,16 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string) error {
func main() {
var (
listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String()
metricsEndpoint = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String()
statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String()
metricsEndpoint = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String()
statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
statsdListenUnixgram = kingpin.Flag("statsd.listen-unixgram", "The Unixgram socket path to receive statsd metric lines in datagram. \"\" disables it.").Default("").String()
// not using Int here because flag diplays default in decimal, 0755 will show as 493
statsdUnixSocketUmask = kingpin.Flag("statsd.unixsocket-umask", "The umask of the unix socket.").Default("755").String()
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
)
log.AddFlags(kingpin.CommandLine)
@ -151,13 +154,13 @@ func main() {
kingpin.HelpFlag.Short('h')
kingpin.Parse()
if *statsdListenUDP == "" && *statsdListenTCP == "" {
log.Fatalln("At least one of UDP/TCP listeners must be specified.")
if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" {
log.Fatalln("At least one of UDP/TCP/Unixgram listeners must be specified.")
}
log.Infoln("Starting StatsD -> Prometheus Exporter", version.Info())
log.Infoln("Build context", version.BuildContext())
log.Infof("Accepting StatsD Traffic: UDP %v, TCP %v", *statsdListenUDP, *statsdListenTCP)
log.Infof("Accepting StatsD Traffic: UDP %v, TCP %v, Unixgram %v", *statsdListenUDP, *statsdListenTCP, *statsdListenUnixgram)
log.Infoln("Accepting Prometheus Requests on", *listenAddress)
go serveHTTP(*listenAddress, *metricsEndpoint)
@ -195,6 +198,47 @@ func main() {
go tl.Listen(events)
}
if *statsdListenUnixgram != "" {
if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) {
log.Fatalf("Unixgram socket \"%s\" already exists", *statsdListenUnixgram)
}
uxgconn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
Net: "unixgram",
Name: *statsdListenUnixgram,
})
if err != nil {
log.Fatal(err)
}
defer uxgconn.Close()
if *readBuffer != 0 {
err = uxgconn.SetReadBuffer(*readBuffer)
if err != nil {
log.Fatal("Error setting Unixgram read buffer:", err)
}
}
ul := &StatsDUnixgramListener{conn: uxgconn}
go ul.Listen(events)
// if it's an abstract unix domain socket, it won't exist on fs
// so we can't chmod it either
if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) {
// convert the string to octet
perm, err := strconv.ParseInt("0"+string(*statsdUnixSocketUmask), 8, 32)
if err != nil {
log.Warnf("Bad permission %s: %v, ignoring\n", *statsdUnixSocketUmask, err)
} else {
err = os.Chmod(*statsdListenUnixgram, os.FileMode(perm))
if err != nil {
log.Warnf("Failed to change unixgram socket permission: %v", err)
}
}
}
}
mapper := &mapper.MetricMapper{MappingsCount: mappingsCount}
if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig)

View file

@ -53,6 +53,12 @@ var (
Help: "The number of lines discarded due to being too long.",
},
)
unixgramPackets = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_unixgram_packets_total",
Help: "The total number of StatsD packets received over Unixgram.",
},
)
linesReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_lines_total",
@ -118,6 +124,7 @@ func init() {
prometheus.MustRegister(tcpConnections)
prometheus.MustRegister(tcpErrors)
prometheus.MustRegister(tcpLineTooLong)
prometheus.MustRegister(unixgramPackets)
prometheus.MustRegister(linesReceived)
prometheus.MustRegister(samplesReceived)
prometheus.MustRegister(sampleErrors)