mirror of
https://github.com/prometheus/statsd_exporter.git
synced 2024-12-26 07:20:30 +00:00
Add internal event queuing and flushing
At high traffic levels, the locking around sending on channels can cause a large amount of blocking and CPU usage. These adds an event queue mechanism so that events are queued for short period of time, and flushed in batches to the main exporter goroutine periodically. The default is is to flush every 1000 events, or every 200ms, whichever happens first. Signed-off-by: Clayton O'Neill <claytono@github.com>
This commit is contained in:
parent
50d5932124
commit
c7e76967c8
9 changed files with 296 additions and 88 deletions
39
README.md
39
README.md
|
@ -53,28 +53,37 @@ NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/king
|
|||
usage: statsd_exporter [<flags>]
|
||||
|
||||
Flags:
|
||||
-h, --help Show context-sensitive help (also try --help-long and --help-man).
|
||||
-h, --help Show context-sensitive help (also try --help-long and --help-man).
|
||||
--web.listen-address=":9102"
|
||||
The address on which to expose the web interface and generated Prometheus metrics.
|
||||
The address on which to expose the web interface and generated Prometheus metrics.
|
||||
--web.telemetry-path="/metrics"
|
||||
Path under which to expose metrics.
|
||||
Path under which to expose metrics.
|
||||
--statsd.listen-udp=":9125"
|
||||
The UDP address on which to receive statsd metric lines. "" disables it.
|
||||
The UDP address on which to receive statsd metric lines. "" disables it.
|
||||
--statsd.listen-tcp=":9125"
|
||||
The TCP address on which to receive statsd metric lines. "" disables it.
|
||||
The TCP address on which to receive statsd metric lines. "" disables it.
|
||||
--statsd.listen-unixgram=""
|
||||
The Unixgram socket path to receive statsd metric lines in datagram. "" disables it.
|
||||
The Unixgram socket path to receive statsd metric lines in datagram. "" disables it.
|
||||
--statsd.unixsocket-mode="755"
|
||||
The permission mode of the unix socket.
|
||||
The permission mode of the unix socket.
|
||||
--statsd.mapping-config=STATSD.MAPPING-CONFIG
|
||||
Metric mapping configuration file name.
|
||||
Metric mapping configuration file name.
|
||||
--statsd.read-buffer=STATSD.READ-BUFFER
|
||||
Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.
|
||||
--debug.dump-fsm="" The path to dump internal FSM generated for glob matching as Dot file.
|
||||
--log.level="info" Only log messages with the given severity or above. Valid levels: [debug, info, warn, error, fatal]
|
||||
Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to
|
||||
a value greater than the value specified.
|
||||
--statsd.cache-size=1000 Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.
|
||||
--statsd.event-queue-size=10000
|
||||
Size of internal queue for processing events
|
||||
--statsd.event-flush-threshold=1000
|
||||
Number of events to hold in queue before flushing
|
||||
--statsd.event-flush-interval=200ms
|
||||
Number of events to hold in queue before flushing
|
||||
--debug.dump-fsm="" The path to dump internal FSM generated for glob matching as Dot file.
|
||||
--log.level="info" Only log messages with the given severity or above. Valid levels: [debug, info, warn, error, fatal]
|
||||
--log.format="logger:stderr"
|
||||
Set the log target and format. Example: "logger:syslog?appname=bob&local=7" or "logger:stdout?json=true"
|
||||
--version Show application version.
|
||||
Set the log target and format. Example: "logger:syslog?appname=bob& local=7" or "logger:stdout?json=true"
|
||||
--version Show application version.
|
||||
|
||||
```
|
||||
|
||||
## Tests
|
||||
|
@ -373,6 +382,10 @@ metrics that do not expire.
|
|||
expire a metric only by changing the mapping configuration. At least one
|
||||
sample must be received for updated mappings to take effect.
|
||||
|
||||
### Event flushing configuration
|
||||
|
||||
Internally `statsd_exporter` runs a goroutine for each network listener (UDP, TCP & Unix Socket). These each receive and parse metrics received into an event. For performance purposes, these events are queued internally and flushed to the main exporter goroutine periodically in batches. The size of this queue and the flush criteria can be tuned with the `--statsd.event-queue-size`, `--statsd.event-flush-threshold` and `--statsd.event-flush-interval`. However, the defaults should perform well even for very high traffic environments.
|
||||
|
||||
## Using Docker
|
||||
|
||||
You can deploy this exporter using the [prom/statsd-exporter](https://registry.hub.docker.com/u/prom/statsd-exporter/) Docker image.
|
||||
|
|
|
@ -293,8 +293,9 @@ func TestHandlePacket(t *testing.T) {
|
|||
|
||||
for k, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
|
||||
events := make(chan Events, 32)
|
||||
l.SetEventHandler(&unbufferedEventHandler{c: events})
|
||||
for i, scenario := range scenarios {
|
||||
l.handlePacket([]byte(scenario.in), events)
|
||||
l.handlePacket([]byte(scenario.in))
|
||||
|
||||
le := len(events)
|
||||
// Flatten actual events.
|
||||
|
|
132
event.go
Normal file
132
event.go
Normal file
|
@ -0,0 +1,132 @@
|
|||
// Copyright 2013 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||
"github.com/prometheus/statsd_exporter/pkg/mapper"
|
||||
)
|
||||
|
||||
type Event interface {
|
||||
MetricName() string
|
||||
Value() float64
|
||||
Labels() map[string]string
|
||||
MetricType() mapper.MetricType
|
||||
}
|
||||
|
||||
type CounterEvent struct {
|
||||
metricName string
|
||||
value float64
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
func (c *CounterEvent) MetricName() string { return c.metricName }
|
||||
func (c *CounterEvent) Value() float64 { return c.value }
|
||||
func (c *CounterEvent) Labels() map[string]string { return c.labels }
|
||||
func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter }
|
||||
|
||||
type GaugeEvent struct {
|
||||
metricName string
|
||||
value float64
|
||||
relative bool
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
func (g *GaugeEvent) MetricName() string { return g.metricName }
|
||||
func (g *GaugeEvent) Value() float64 { return g.value }
|
||||
func (c *GaugeEvent) Labels() map[string]string { return c.labels }
|
||||
func (c *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge }
|
||||
|
||||
type TimerEvent struct {
|
||||
metricName string
|
||||
value float64
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
func (t *TimerEvent) MetricName() string { return t.metricName }
|
||||
func (t *TimerEvent) Value() float64 { return t.value }
|
||||
func (c *TimerEvent) Labels() map[string]string { return c.labels }
|
||||
func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer }
|
||||
|
||||
type Events []Event
|
||||
|
||||
type eventQueue struct {
|
||||
c chan Events
|
||||
q Events
|
||||
m sync.Mutex
|
||||
flushThreshold int
|
||||
flushTicker *time.Ticker
|
||||
}
|
||||
|
||||
type eventHandler interface {
|
||||
queue(event Events)
|
||||
}
|
||||
|
||||
func newEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration) *eventQueue {
|
||||
ticker := clock.NewTicker(flushInterval)
|
||||
eq := &eventQueue{
|
||||
c: c,
|
||||
flushThreshold: flushThreshold,
|
||||
flushTicker: ticker,
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
<-ticker.C
|
||||
eq.flush()
|
||||
}
|
||||
}()
|
||||
return eq
|
||||
}
|
||||
|
||||
func (eq *eventQueue) queue(events Events) {
|
||||
eq.m.Lock()
|
||||
defer eq.m.Unlock()
|
||||
|
||||
for _, e := range events {
|
||||
eq.q = append(eq.q, e)
|
||||
if len(eq.q) >= eq.flushThreshold {
|
||||
eq.flushUnlocked()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (eq *eventQueue) flush() {
|
||||
eq.m.Lock()
|
||||
defer eq.m.Unlock()
|
||||
eq.flushUnlocked()
|
||||
}
|
||||
|
||||
func (eq *eventQueue) flushUnlocked() {
|
||||
eq.c <- eq.q
|
||||
eq.q = eq.q[:0]
|
||||
eventsFlushed.Inc()
|
||||
}
|
||||
|
||||
func (eq *eventQueue) len() int {
|
||||
eq.m.Lock()
|
||||
defer eq.m.Unlock()
|
||||
|
||||
return len(eq.q)
|
||||
}
|
||||
|
||||
type unbufferedEventHandler struct {
|
||||
c chan Events
|
||||
}
|
||||
|
||||
func (ueh *unbufferedEventHandler) queue(events Events) {
|
||||
ueh.c <- events
|
||||
}
|
80
event_test.go
Normal file
80
event_test.go
Normal file
|
@ -0,0 +1,80 @@
|
|||
// Copyright 2013 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/statsd_exporter/pkg/clock"
|
||||
)
|
||||
|
||||
func TestEventThresholdFlush(t *testing.T) {
|
||||
c := make(chan Events, 100)
|
||||
// We're not going to flush during this test, so the duration doesn't matter.
|
||||
eq := newEventQueue(c, 5, time.Second)
|
||||
e := make(Events, 13)
|
||||
go func() {
|
||||
eq.queue(e)
|
||||
}()
|
||||
|
||||
batch := <-c
|
||||
if len(batch) != 5 {
|
||||
t.Fatalf("Expected event batch to be 5 elements, but got %v", len(batch))
|
||||
}
|
||||
batch = <-c
|
||||
if len(batch) != 5 {
|
||||
t.Fatalf("Expected event batch to be 5 elements, but got %v", len(batch))
|
||||
}
|
||||
batch = <-c
|
||||
if len(batch) != 3 {
|
||||
t.Fatalf("Expected event batch to be 3 elements, but got %v", len(batch))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventIntervalFlush(t *testing.T) {
|
||||
// Mock a time.NewTicker
|
||||
tickerCh := make(chan time.Time)
|
||||
clock.ClockInstance = &clock.Clock{
|
||||
TickerCh: tickerCh,
|
||||
}
|
||||
clock.ClockInstance.Instant = time.Unix(0, 0)
|
||||
|
||||
c := make(chan Events, 100)
|
||||
eq := newEventQueue(c, 1000, time.Second*1000)
|
||||
e := make(Events, 10)
|
||||
eq.queue(e)
|
||||
|
||||
if eq.len() != 10 {
|
||||
t.Fatal("Expected 10 events to be queued, but got", eq.len())
|
||||
}
|
||||
|
||||
if len(eq.c) != 0 {
|
||||
t.Fatal("Expected 0 events in the event channel, but got", len(eq.c))
|
||||
}
|
||||
|
||||
// Tick time forward to trigger a flush
|
||||
clock.ClockInstance.Instant = time.Unix(10000, 0)
|
||||
clock.ClockInstance.TickerCh <- time.Unix(10000, 0)
|
||||
|
||||
events := <-eq.c
|
||||
if eq.len() != 0 {
|
||||
t.Fatal("Expected 0 events to be queued, but got", eq.len())
|
||||
}
|
||||
|
||||
if len(events) != 10 {
|
||||
t.Fatal("Expected 10 events in the event channel, but got", len(events))
|
||||
}
|
||||
|
||||
}
|
92
exporter.go
92
exporter.go
|
@ -46,49 +46,6 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
|
|||
u.c.Collect(c)
|
||||
}
|
||||
|
||||
type Event interface {
|
||||
MetricName() string
|
||||
Value() float64
|
||||
Labels() map[string]string
|
||||
MetricType() mapper.MetricType
|
||||
}
|
||||
|
||||
type CounterEvent struct {
|
||||
metricName string
|
||||
value float64
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
func (c *CounterEvent) MetricName() string { return c.metricName }
|
||||
func (c *CounterEvent) Value() float64 { return c.value }
|
||||
func (c *CounterEvent) Labels() map[string]string { return c.labels }
|
||||
func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter }
|
||||
|
||||
type GaugeEvent struct {
|
||||
metricName string
|
||||
value float64
|
||||
relative bool
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
func (g *GaugeEvent) MetricName() string { return g.metricName }
|
||||
func (g *GaugeEvent) Value() float64 { return g.value }
|
||||
func (c *GaugeEvent) Labels() map[string]string { return c.labels }
|
||||
func (c *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge }
|
||||
|
||||
type TimerEvent struct {
|
||||
metricName string
|
||||
value float64
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
func (t *TimerEvent) MetricName() string { return t.metricName }
|
||||
func (t *TimerEvent) Value() float64 { return t.value }
|
||||
func (c *TimerEvent) Labels() map[string]string { return c.labels }
|
||||
func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer }
|
||||
|
||||
type Events []Event
|
||||
|
||||
type Exporter struct {
|
||||
mapper *mapper.MetricMapper
|
||||
registry *registry
|
||||
|
@ -475,10 +432,15 @@ samples:
|
|||
}
|
||||
|
||||
type StatsDUDPListener struct {
|
||||
conn *net.UDPConn
|
||||
conn *net.UDPConn
|
||||
eventHandler eventHandler
|
||||
}
|
||||
|
||||
func (l *StatsDUDPListener) Listen(e chan<- Events) {
|
||||
func (l *StatsDUDPListener) SetEventHandler(eh eventHandler) {
|
||||
l.eventHandler = eh
|
||||
}
|
||||
|
||||
func (l *StatsDUDPListener) Listen() {
|
||||
buf := make([]byte, 65535)
|
||||
for {
|
||||
n, _, err := l.conn.ReadFromUDP(buf)
|
||||
|
@ -491,26 +453,29 @@ func (l *StatsDUDPListener) Listen(e chan<- Events) {
|
|||
log.Error(err)
|
||||
return
|
||||
}
|
||||
l.handlePacket(buf[0:n], e)
|
||||
l.handlePacket(buf[0:n])
|
||||
}
|
||||
}
|
||||
|
||||
func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) {
|
||||
func (l *StatsDUDPListener) handlePacket(packet []byte) {
|
||||
udpPackets.Inc()
|
||||
lines := strings.Split(string(packet), "\n")
|
||||
events := Events{}
|
||||
for _, line := range lines {
|
||||
linesReceived.Inc()
|
||||
events = append(events, lineToEvents(line)...)
|
||||
l.eventHandler.queue(lineToEvents(line))
|
||||
}
|
||||
e <- events
|
||||
}
|
||||
|
||||
type StatsDTCPListener struct {
|
||||
conn *net.TCPListener
|
||||
conn *net.TCPListener
|
||||
eventHandler eventHandler
|
||||
}
|
||||
|
||||
func (l *StatsDTCPListener) Listen(e chan<- Events) {
|
||||
func (l *StatsDTCPListener) SetEventHandler(eh eventHandler) {
|
||||
l.eventHandler = eh
|
||||
}
|
||||
|
||||
func (l *StatsDTCPListener) Listen() {
|
||||
for {
|
||||
c, err := l.conn.AcceptTCP()
|
||||
if err != nil {
|
||||
|
@ -521,11 +486,11 @@ func (l *StatsDTCPListener) Listen(e chan<- Events) {
|
|||
}
|
||||
log.Fatalf("AcceptTCP failed: %v", err)
|
||||
}
|
||||
go l.handleConn(c, e)
|
||||
go l.handleConn(c)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) {
|
||||
func (l *StatsDTCPListener) handleConn(c *net.TCPConn) {
|
||||
defer c.Close()
|
||||
|
||||
tcpConnections.Inc()
|
||||
|
@ -546,15 +511,20 @@ func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) {
|
|||
break
|
||||
}
|
||||
linesReceived.Inc()
|
||||
e <- lineToEvents(string(line))
|
||||
l.eventHandler.queue(lineToEvents(string(line)))
|
||||
}
|
||||
}
|
||||
|
||||
type StatsDUnixgramListener struct {
|
||||
conn *net.UnixConn
|
||||
conn *net.UnixConn
|
||||
eventHandler eventHandler
|
||||
}
|
||||
|
||||
func (l *StatsDUnixgramListener) Listen(e chan<- Events) {
|
||||
func (l *StatsDUnixgramListener) SetEventHandler(eh eventHandler) {
|
||||
l.eventHandler = eh
|
||||
}
|
||||
|
||||
func (l *StatsDUnixgramListener) Listen() {
|
||||
buf := make([]byte, 65535)
|
||||
for {
|
||||
n, _, err := l.conn.ReadFromUnix(buf)
|
||||
|
@ -566,17 +536,15 @@ func (l *StatsDUnixgramListener) Listen(e chan<- Events) {
|
|||
}
|
||||
log.Fatal(err)
|
||||
}
|
||||
l.handlePacket(buf[:n], e)
|
||||
l.handlePacket(buf[:n])
|
||||
}
|
||||
}
|
||||
|
||||
func (l *StatsDUnixgramListener) handlePacket(packet []byte, e chan<- Events) {
|
||||
func (l *StatsDUnixgramListener) handlePacket(packet []byte) {
|
||||
unixgramPackets.Inc()
|
||||
lines := strings.Split(string(packet), "\n")
|
||||
events := Events{}
|
||||
for _, line := range lines {
|
||||
linesReceived.Inc()
|
||||
events = append(events, lineToEvents(line)...)
|
||||
l.eventHandler.queue(lineToEvents(string(line)))
|
||||
}
|
||||
e <- events
|
||||
}
|
||||
|
|
|
@ -40,13 +40,13 @@ func benchmarkUDPListener(times int, b *testing.B) {
|
|||
}
|
||||
}
|
||||
for n := 0; n < b.N; n++ {
|
||||
l := StatsDUDPListener{}
|
||||
// there are more events than input lines, need bigger buffer
|
||||
events := make(chan Events, len(bytesInput)*times*2)
|
||||
l := StatsDUDPListener{eventHandler: &unbufferedEventHandler{c: events}}
|
||||
|
||||
for i := 0; i < times; i++ {
|
||||
for _, line := range bytesInput {
|
||||
l.handlePacket([]byte(line), events)
|
||||
l.handlePacket([]byte(line))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -489,10 +489,12 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
|
|||
}()
|
||||
|
||||
events := make(chan Events)
|
||||
ueh := &unbufferedEventHandler{c: events}
|
||||
|
||||
go func() {
|
||||
for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
|
||||
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events)
|
||||
l.SetEventHandler(ueh)
|
||||
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"))
|
||||
}
|
||||
close(events)
|
||||
}()
|
||||
|
@ -639,14 +641,15 @@ func TestCounterIncrement(t *testing.T) {
|
|||
}
|
||||
|
||||
type statsDPacketHandler interface {
|
||||
handlePacket(packet []byte, e chan<- Events)
|
||||
handlePacket(packet []byte)
|
||||
SetEventHandler(eh eventHandler)
|
||||
}
|
||||
|
||||
type mockStatsDTCPListener struct {
|
||||
StatsDTCPListener
|
||||
}
|
||||
|
||||
func (ml *mockStatsDTCPListener) handlePacket(packet []byte, e chan<- Events) {
|
||||
func (ml *mockStatsDTCPListener) handlePacket(packet []byte) {
|
||||
// Forcing IPv4 because the TravisCI build environment does not have IPv6
|
||||
// addresses.
|
||||
lc, err := net.ListenTCP("tcp4", nil)
|
||||
|
@ -674,7 +677,7 @@ func (ml *mockStatsDTCPListener) handlePacket(packet []byte, e chan<- Events) {
|
|||
if err != nil {
|
||||
panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err))
|
||||
}
|
||||
ml.handleConn(sc, e)
|
||||
ml.handleConn(sc)
|
||||
}
|
||||
|
||||
func TestEscapeMetricName(t *testing.T) {
|
||||
|
|
14
main.go
14
main.go
|
@ -150,6 +150,9 @@ func main() {
|
|||
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
|
||||
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
|
||||
cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int()
|
||||
eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events").Default("10000").Int()
|
||||
eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing").Default("1000").Int()
|
||||
eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Number of events to hold in queue before flushing").Default("200ms").Duration()
|
||||
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
|
||||
)
|
||||
|
||||
|
@ -169,8 +172,9 @@ func main() {
|
|||
|
||||
go serveHTTP(*listenAddress, *metricsEndpoint)
|
||||
|
||||
events := make(chan Events, 1024)
|
||||
events := make(chan Events, *eventQueueSize)
|
||||
defer close(events)
|
||||
eventQueue := newEventQueue(events, *eventFlushThreshold, *eventFlushInterval)
|
||||
|
||||
if *statsdListenUDP != "" {
|
||||
udpListenAddr := udpAddrFromString(*statsdListenUDP)
|
||||
|
@ -186,8 +190,8 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
ul := &StatsDUDPListener{conn: uconn}
|
||||
go ul.Listen(events)
|
||||
ul := &StatsDUDPListener{conn: uconn, eventHandler: eventQueue}
|
||||
go ul.Listen()
|
||||
}
|
||||
|
||||
if *statsdListenTCP != "" {
|
||||
|
@ -199,7 +203,7 @@ func main() {
|
|||
defer tconn.Close()
|
||||
|
||||
tl := &StatsDTCPListener{conn: tconn}
|
||||
go tl.Listen(events)
|
||||
go tl.Listen()
|
||||
}
|
||||
|
||||
if *statsdListenUnixgram != "" {
|
||||
|
@ -225,7 +229,7 @@ func main() {
|
|||
}
|
||||
|
||||
ul := &StatsDUnixgramListener{conn: uxgconn}
|
||||
go ul.Listen(events)
|
||||
go ul.Listen()
|
||||
|
||||
// if it's an abstract unix domain socket, it won't exist on fs
|
||||
// so we can't chmod it either
|
||||
|
|
|
@ -25,6 +25,12 @@ var (
|
|||
},
|
||||
[]string{"type"},
|
||||
)
|
||||
eventsFlushed = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "statsd_exporter_events_flushed_total",
|
||||
Help: "Number of times events were flushed to exporter",
|
||||
},
|
||||
)
|
||||
eventsUnmapped = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "statsd_exporter_events_unmapped_total",
|
||||
Help: "The total number of StatsD events no mapping was found for.",
|
||||
|
@ -87,7 +93,7 @@ var (
|
|||
tagErrors = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "statsd_exporter_tag_errors_total",
|
||||
Help: "The number of errors parsign DogStatsD tags.",
|
||||
Help: "The number of errors parsing DogStatsD tags.",
|
||||
},
|
||||
)
|
||||
configLoads = prometheus.NewCounterVec(
|
||||
|
@ -133,6 +139,7 @@ var (
|
|||
|
||||
func init() {
|
||||
prometheus.MustRegister(eventStats)
|
||||
prometheus.MustRegister(eventsFlushed)
|
||||
prometheus.MustRegister(eventsUnmapped)
|
||||
prometheus.MustRegister(udpPackets)
|
||||
prometheus.MustRegister(tcpConnections)
|
||||
|
|
Loading…
Reference in a new issue