// Copyright 2013 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "bufio" "net" "net/http" _ "net/http/pprof" "os" "os/signal" "strconv" "syscall" "github.com/howeyc/fsnotify" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/log" "github.com/prometheus/common/version" "gopkg.in/alecthomas/kingpin.v2" "github.com/prometheus/statsd_exporter/pkg/mapper" ) func init() { prometheus.MustRegister(version.NewCollector("statsd_exporter")) } func serveHTTP(listenAddress, metricsEndpoint string) { http.Handle(metricsEndpoint, promhttp.Handler()) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(` StatsD Exporter

StatsD Exporter

Metrics

`)) }) log.Fatal(http.ListenAndServe(listenAddress, nil)) } func ipPortFromString(addr string) (*net.IPAddr, int) { host, portStr, err := net.SplitHostPort(addr) if err != nil { log.Fatal("Bad StatsD listening address", addr) } if host == "" { host = "0.0.0.0" } ip, err := net.ResolveIPAddr("ip", host) if err != nil { log.Fatalf("Unable to resolve %s: %s", host, err) } port, err := strconv.Atoi(portStr) if err != nil || port < 0 || port > 65535 { log.Fatalf("Bad port %s: %s", portStr, err) } return ip, port } func udpAddrFromString(addr string) *net.UDPAddr { ip, port := ipPortFromString(addr) return &net.UDPAddr{ IP: ip.IP, Port: port, Zone: ip.Zone, } } func tcpAddrFromString(addr string) *net.TCPAddr { ip, port := ipPortFromString(addr) return &net.TCPAddr{ IP: ip.IP, Port: port, Zone: ip.Zone, } } func watchConfig(fileName string, mapper *mapper.MetricMapper, cacheSize int) { watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatal(err) } err = watcher.WatchFlags(fileName, fsnotify.FSN_MODIFY) if err != nil { log.Fatal(err) } for { select { case ev := <-watcher.Event: log.Infof("Config file changed (%s), attempting reload", ev) err = mapper.InitFromFile(fileName, cacheSize) if err != nil { log.Errorln("Error reloading config:", err) configLoads.WithLabelValues("failure").Inc() } else { log.Infoln("Config reloaded successfully") configLoads.WithLabelValues("success").Inc() } // Re-add the file watcher since it can get lost on some changes. E.g. // saving a file with vim results in a RENAME-MODIFY-DELETE event // sequence, after which the newly written file is no longer watched. _ = watcher.WatchFlags(fileName, fsnotify.FSN_MODIFY) case err := <-watcher.Error: log.Errorln("Error watching config:", err) } } } func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string) error { f, err := os.Create(dumpFilename) if err != nil { return err } log.Infoln("Start dumping FSM to", dumpFilename) w := bufio.NewWriter(f) mapper.FSM.DumpFSM(w) w.Flush() f.Close() log.Infoln("Finish dumping FSM") return nil } func main() { var ( listenAddress = kingpin.Flag("web.listen-address", "The address on which to expose the web interface and generated Prometheus metrics.").Default(":9102").String() metricsEndpoint = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String() statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() statsdListenUnixgram = kingpin.Flag("statsd.listen-unixgram", "The Unixgram socket path to receive statsd metric lines in datagram. \"\" disables it.").Default("").String() // not using Int here because flag diplays default in decimal, 0755 will show as 493 statsdUnixSocketMode = kingpin.Flag("statsd.unixsocket-mode", "The permission mode of the unix socket.").Default("755").String() mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int() eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events").Default("10000").Int() eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing").Default("1000").Int() eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Number of events to hold in queue before flushing").Default("200ms").Duration() dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String() ) log.AddFlags(kingpin.CommandLine) kingpin.Version(version.Print("statsd_exporter")) kingpin.HelpFlag.Short('h') kingpin.Parse() if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" { log.Fatalln("At least one of UDP/TCP/Unixgram listeners must be specified.") } log.Infoln("Starting StatsD -> Prometheus Exporter", version.Info()) log.Infoln("Build context", version.BuildContext()) log.Infof("Accepting StatsD Traffic: UDP %v, TCP %v, Unixgram %v", *statsdListenUDP, *statsdListenTCP, *statsdListenUnixgram) log.Infoln("Accepting Prometheus Requests on", *listenAddress) go serveHTTP(*listenAddress, *metricsEndpoint) events := make(chan Events, *eventQueueSize) defer close(events) eventQueue := newEventQueue(events, *eventFlushThreshold, *eventFlushInterval) if *statsdListenUDP != "" { udpListenAddr := udpAddrFromString(*statsdListenUDP) uconn, err := net.ListenUDP("udp", udpListenAddr) if err != nil { log.Fatal(err) } if *readBuffer != 0 { err = uconn.SetReadBuffer(*readBuffer) if err != nil { log.Fatal("Error setting UDP read buffer:", err) } } ul := &StatsDUDPListener{conn: uconn, eventHandler: eventQueue} go ul.Listen() } if *statsdListenTCP != "" { tcpListenAddr := tcpAddrFromString(*statsdListenTCP) tconn, err := net.ListenTCP("tcp", tcpListenAddr) if err != nil { log.Fatal(err) } defer tconn.Close() tl := &StatsDTCPListener{conn: tconn, eventHandler: eventQueue} go tl.Listen() } if *statsdListenUnixgram != "" { var err error if _, err = os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) { log.Fatalf("Unixgram socket \"%s\" already exists", *statsdListenUnixgram) } uxgconn, err := net.ListenUnixgram("unixgram", &net.UnixAddr{ Net: "unixgram", Name: *statsdListenUnixgram, }) if err != nil { log.Fatal(err) } defer uxgconn.Close() if *readBuffer != 0 { err = uxgconn.SetReadBuffer(*readBuffer) if err != nil { log.Fatal("Error setting Unixgram read buffer:", err) } } ul := &StatsDUnixgramListener{conn: uxgconn} go ul.Listen() // if it's an abstract unix domain socket, it won't exist on fs // so we can't chmod it either if _, err := os.Stat(*statsdListenUnixgram); !os.IsNotExist(err) { defer os.Remove(*statsdListenUnixgram) // convert the string to octet perm, err := strconv.ParseInt("0"+string(*statsdUnixSocketMode), 8, 32) if err != nil { log.Warnf("Bad permission %s: %v, ignoring\n", *statsdUnixSocketMode, err) } else { err = os.Chmod(*statsdListenUnixgram, os.FileMode(perm)) if err != nil { log.Warnf("Failed to change unixgram socket permission: %v", err) } } } } mapper := &mapper.MetricMapper{MappingsCount: mappingsCount} if *mappingConfig != "" { err := mapper.InitFromFile(*mappingConfig, *cacheSize) if err != nil { log.Fatal("Error loading config:", err) } if *dumpFSMPath != "" { err := dumpFSM(mapper, *dumpFSMPath) if err != nil { log.Fatal("Error dumping FSM:", err) } } go watchConfig(*mappingConfig, mapper, *cacheSize) } else { mapper.InitCache(*cacheSize) } exporter := NewExporter(mapper) signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) go exporter.Listen(events) <-signals }