Merge pull request #227 from claytono/event-queuing

Add internal event queuing and flushing
This commit is contained in:
Matthias Rampke 2019-06-07 15:50:25 +02:00 committed by GitHub
commit 5832aa9bcf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 296 additions and 88 deletions

View file

@ -69,12 +69,21 @@ NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/king
--statsd.mapping-config=STATSD.MAPPING-CONFIG --statsd.mapping-config=STATSD.MAPPING-CONFIG
Metric mapping configuration file name. Metric mapping configuration file name.
--statsd.read-buffer=STATSD.READ-BUFFER --statsd.read-buffer=STATSD.READ-BUFFER
Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified. Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to
a value greater than the value specified.
--statsd.cache-size=1000 Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.
--statsd.event-queue-size=10000
Size of internal queue for processing events
--statsd.event-flush-threshold=1000
Number of events to hold in queue before flushing
--statsd.event-flush-interval=200ms
Number of events to hold in queue before flushing
--debug.dump-fsm="" The path to dump internal FSM generated for glob matching as Dot file. --debug.dump-fsm="" The path to dump internal FSM generated for glob matching as Dot file.
--log.level="info" Only log messages with the given severity or above. Valid levels: [debug, info, warn, error, fatal] --log.level="info" Only log messages with the given severity or above. Valid levels: [debug, info, warn, error, fatal]
--log.format="logger:stderr" --log.format="logger:stderr"
Set the log target and format. Example: "logger:syslog?appname=bob&local=7" or "logger:stdout?json=true" Set the log target and format. Example: "logger:syslog?appname=bob& local=7" or "logger:stdout?json=true"
--version Show application version. --version Show application version.
``` ```
## Tests ## Tests
@ -373,6 +382,10 @@ metrics that do not expire.
expire a metric only by changing the mapping configuration. At least one expire a metric only by changing the mapping configuration. At least one
sample must be received for updated mappings to take effect. sample must be received for updated mappings to take effect.
### Event flushing configuration
Internally `statsd_exporter` runs a goroutine for each network listener (UDP, TCP & Unix Socket). These each receive and parse metrics received into an event. For performance purposes, these events are queued internally and flushed to the main exporter goroutine periodically in batches. The size of this queue and the flush criteria can be tuned with the `--statsd.event-queue-size`, `--statsd.event-flush-threshold` and `--statsd.event-flush-interval`. However, the defaults should perform well even for very high traffic environments.
## Using Docker ## Using Docker
You can deploy this exporter using the [prom/statsd-exporter](https://registry.hub.docker.com/u/prom/statsd-exporter/) Docker image. You can deploy this exporter using the [prom/statsd-exporter](https://registry.hub.docker.com/u/prom/statsd-exporter/) Docker image.

View file

@ -293,8 +293,9 @@ func TestHandlePacket(t *testing.T) {
for k, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { for k, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
events := make(chan Events, 32) events := make(chan Events, 32)
l.SetEventHandler(&unbufferedEventHandler{c: events})
for i, scenario := range scenarios { for i, scenario := range scenarios {
l.handlePacket([]byte(scenario.in), events) l.handlePacket([]byte(scenario.in))
le := len(events) le := len(events)
// Flatten actual events. // Flatten actual events.

132
event.go Normal file
View file

@ -0,0 +1,132 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"sync"
"time"
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
type Event interface {
MetricName() string
Value() float64
Labels() map[string]string
MetricType() mapper.MetricType
}
type CounterEvent struct {
metricName string
value float64
labels map[string]string
}
func (c *CounterEvent) MetricName() string { return c.metricName }
func (c *CounterEvent) Value() float64 { return c.value }
func (c *CounterEvent) Labels() map[string]string { return c.labels }
func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter }
type GaugeEvent struct {
metricName string
value float64
relative bool
labels map[string]string
}
func (g *GaugeEvent) MetricName() string { return g.metricName }
func (g *GaugeEvent) Value() float64 { return g.value }
func (c *GaugeEvent) Labels() map[string]string { return c.labels }
func (c *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge }
type TimerEvent struct {
metricName string
value float64
labels map[string]string
}
func (t *TimerEvent) MetricName() string { return t.metricName }
func (t *TimerEvent) Value() float64 { return t.value }
func (c *TimerEvent) Labels() map[string]string { return c.labels }
func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer }
type Events []Event
type eventQueue struct {
c chan Events
q Events
m sync.Mutex
flushThreshold int
flushTicker *time.Ticker
}
type eventHandler interface {
queue(event Events)
}
func newEventQueue(c chan Events, flushThreshold int, flushInterval time.Duration) *eventQueue {
ticker := clock.NewTicker(flushInterval)
eq := &eventQueue{
c: c,
flushThreshold: flushThreshold,
flushTicker: ticker,
}
go func() {
for {
<-ticker.C
eq.flush()
}
}()
return eq
}
func (eq *eventQueue) queue(events Events) {
eq.m.Lock()
defer eq.m.Unlock()
for _, e := range events {
eq.q = append(eq.q, e)
if len(eq.q) >= eq.flushThreshold {
eq.flushUnlocked()
}
}
}
func (eq *eventQueue) flush() {
eq.m.Lock()
defer eq.m.Unlock()
eq.flushUnlocked()
}
func (eq *eventQueue) flushUnlocked() {
eq.c <- eq.q
eq.q = eq.q[:0]
eventsFlushed.Inc()
}
func (eq *eventQueue) len() int {
eq.m.Lock()
defer eq.m.Unlock()
return len(eq.q)
}
type unbufferedEventHandler struct {
c chan Events
}
func (ueh *unbufferedEventHandler) queue(events Events) {
ueh.c <- events
}

80
event_test.go Normal file
View file

@ -0,0 +1,80 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"testing"
"time"
"github.com/prometheus/statsd_exporter/pkg/clock"
)
func TestEventThresholdFlush(t *testing.T) {
c := make(chan Events, 100)
// We're not going to flush during this test, so the duration doesn't matter.
eq := newEventQueue(c, 5, time.Second)
e := make(Events, 13)
go func() {
eq.queue(e)
}()
batch := <-c
if len(batch) != 5 {
t.Fatalf("Expected event batch to be 5 elements, but got %v", len(batch))
}
batch = <-c
if len(batch) != 5 {
t.Fatalf("Expected event batch to be 5 elements, but got %v", len(batch))
}
batch = <-c
if len(batch) != 3 {
t.Fatalf("Expected event batch to be 3 elements, but got %v", len(batch))
}
}
func TestEventIntervalFlush(t *testing.T) {
// Mock a time.NewTicker
tickerCh := make(chan time.Time)
clock.ClockInstance = &clock.Clock{
TickerCh: tickerCh,
}
clock.ClockInstance.Instant = time.Unix(0, 0)
c := make(chan Events, 100)
eq := newEventQueue(c, 1000, time.Second*1000)
e := make(Events, 10)
eq.queue(e)
if eq.len() != 10 {
t.Fatal("Expected 10 events to be queued, but got", eq.len())
}
if len(eq.c) != 0 {
t.Fatal("Expected 0 events in the event channel, but got", len(eq.c))
}
// Tick time forward to trigger a flush
clock.ClockInstance.Instant = time.Unix(10000, 0)
clock.ClockInstance.TickerCh <- time.Unix(10000, 0)
events := <-eq.c
if eq.len() != 0 {
t.Fatal("Expected 0 events to be queued, but got", eq.len())
}
if len(events) != 10 {
t.Fatal("Expected 10 events in the event channel, but got", len(events))
}
}

View file

@ -46,49 +46,6 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {
u.c.Collect(c) u.c.Collect(c)
} }
type Event interface {
MetricName() string
Value() float64
Labels() map[string]string
MetricType() mapper.MetricType
}
type CounterEvent struct {
metricName string
value float64
labels map[string]string
}
func (c *CounterEvent) MetricName() string { return c.metricName }
func (c *CounterEvent) Value() float64 { return c.value }
func (c *CounterEvent) Labels() map[string]string { return c.labels }
func (c *CounterEvent) MetricType() mapper.MetricType { return mapper.MetricTypeCounter }
type GaugeEvent struct {
metricName string
value float64
relative bool
labels map[string]string
}
func (g *GaugeEvent) MetricName() string { return g.metricName }
func (g *GaugeEvent) Value() float64 { return g.value }
func (c *GaugeEvent) Labels() map[string]string { return c.labels }
func (c *GaugeEvent) MetricType() mapper.MetricType { return mapper.MetricTypeGauge }
type TimerEvent struct {
metricName string
value float64
labels map[string]string
}
func (t *TimerEvent) MetricName() string { return t.metricName }
func (t *TimerEvent) Value() float64 { return t.value }
func (c *TimerEvent) Labels() map[string]string { return c.labels }
func (c *TimerEvent) MetricType() mapper.MetricType { return mapper.MetricTypeTimer }
type Events []Event
type Exporter struct { type Exporter struct {
mapper *mapper.MetricMapper mapper *mapper.MetricMapper
registry *registry registry *registry
@ -476,9 +433,14 @@ samples:
type StatsDUDPListener struct { type StatsDUDPListener struct {
conn *net.UDPConn conn *net.UDPConn
eventHandler eventHandler
} }
func (l *StatsDUDPListener) Listen(e chan<- Events) { func (l *StatsDUDPListener) SetEventHandler(eh eventHandler) {
l.eventHandler = eh
}
func (l *StatsDUDPListener) Listen() {
buf := make([]byte, 65535) buf := make([]byte, 65535)
for { for {
n, _, err := l.conn.ReadFromUDP(buf) n, _, err := l.conn.ReadFromUDP(buf)
@ -491,26 +453,29 @@ func (l *StatsDUDPListener) Listen(e chan<- Events) {
log.Error(err) log.Error(err)
return return
} }
l.handlePacket(buf[0:n], e) l.handlePacket(buf[0:n])
} }
} }
func (l *StatsDUDPListener) handlePacket(packet []byte, e chan<- Events) { func (l *StatsDUDPListener) handlePacket(packet []byte) {
udpPackets.Inc() udpPackets.Inc()
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
events := Events{}
for _, line := range lines { for _, line := range lines {
linesReceived.Inc() linesReceived.Inc()
events = append(events, lineToEvents(line)...) l.eventHandler.queue(lineToEvents(line))
} }
e <- events
} }
type StatsDTCPListener struct { type StatsDTCPListener struct {
conn *net.TCPListener conn *net.TCPListener
eventHandler eventHandler
} }
func (l *StatsDTCPListener) Listen(e chan<- Events) { func (l *StatsDTCPListener) SetEventHandler(eh eventHandler) {
l.eventHandler = eh
}
func (l *StatsDTCPListener) Listen() {
for { for {
c, err := l.conn.AcceptTCP() c, err := l.conn.AcceptTCP()
if err != nil { if err != nil {
@ -521,11 +486,11 @@ func (l *StatsDTCPListener) Listen(e chan<- Events) {
} }
log.Fatalf("AcceptTCP failed: %v", err) log.Fatalf("AcceptTCP failed: %v", err)
} }
go l.handleConn(c, e) go l.handleConn(c)
} }
} }
func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) { func (l *StatsDTCPListener) handleConn(c *net.TCPConn) {
defer c.Close() defer c.Close()
tcpConnections.Inc() tcpConnections.Inc()
@ -546,15 +511,20 @@ func (l *StatsDTCPListener) handleConn(c *net.TCPConn, e chan<- Events) {
break break
} }
linesReceived.Inc() linesReceived.Inc()
e <- lineToEvents(string(line)) l.eventHandler.queue(lineToEvents(string(line)))
} }
} }
type StatsDUnixgramListener struct { type StatsDUnixgramListener struct {
conn *net.UnixConn conn *net.UnixConn
eventHandler eventHandler
} }
func (l *StatsDUnixgramListener) Listen(e chan<- Events) { func (l *StatsDUnixgramListener) SetEventHandler(eh eventHandler) {
l.eventHandler = eh
}
func (l *StatsDUnixgramListener) Listen() {
buf := make([]byte, 65535) buf := make([]byte, 65535)
for { for {
n, _, err := l.conn.ReadFromUnix(buf) n, _, err := l.conn.ReadFromUnix(buf)
@ -566,17 +536,15 @@ func (l *StatsDUnixgramListener) Listen(e chan<- Events) {
} }
log.Fatal(err) log.Fatal(err)
} }
l.handlePacket(buf[:n], e) l.handlePacket(buf[:n])
} }
} }
func (l *StatsDUnixgramListener) handlePacket(packet []byte, e chan<- Events) { func (l *StatsDUnixgramListener) handlePacket(packet []byte) {
unixgramPackets.Inc() unixgramPackets.Inc()
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
events := Events{}
for _, line := range lines { for _, line := range lines {
linesReceived.Inc() linesReceived.Inc()
events = append(events, lineToEvents(line)...) l.eventHandler.queue(lineToEvents(string(line)))
} }
e <- events
} }

View file

@ -40,13 +40,13 @@ func benchmarkUDPListener(times int, b *testing.B) {
} }
} }
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
l := StatsDUDPListener{}
// there are more events than input lines, need bigger buffer // there are more events than input lines, need bigger buffer
events := make(chan Events, len(bytesInput)*times*2) events := make(chan Events, len(bytesInput)*times*2)
l := StatsDUDPListener{eventHandler: &unbufferedEventHandler{c: events}}
for i := 0; i < times; i++ { for i := 0; i < times; i++ {
for _, line := range bytesInput { for _, line := range bytesInput {
l.handlePacket([]byte(line), events) l.handlePacket([]byte(line))
} }
} }
} }

View file

@ -489,10 +489,12 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
}() }()
events := make(chan Events) events := make(chan Events)
ueh := &unbufferedEventHandler{c: events}
go func() { go func() {
for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} { for _, l := range []statsDPacketHandler{&StatsDUDPListener{}, &mockStatsDTCPListener{}} {
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"), events) l.SetEventHandler(ueh)
l.handlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"))
} }
close(events) close(events)
}() }()
@ -639,14 +641,15 @@ func TestCounterIncrement(t *testing.T) {
} }
type statsDPacketHandler interface { type statsDPacketHandler interface {
handlePacket(packet []byte, e chan<- Events) handlePacket(packet []byte)
SetEventHandler(eh eventHandler)
} }
type mockStatsDTCPListener struct { type mockStatsDTCPListener struct {
StatsDTCPListener StatsDTCPListener
} }
func (ml *mockStatsDTCPListener) handlePacket(packet []byte, e chan<- Events) { func (ml *mockStatsDTCPListener) handlePacket(packet []byte) {
// Forcing IPv4 because the TravisCI build environment does not have IPv6 // Forcing IPv4 because the TravisCI build environment does not have IPv6
// addresses. // addresses.
lc, err := net.ListenTCP("tcp4", nil) lc, err := net.ListenTCP("tcp4", nil)
@ -674,7 +677,7 @@ func (ml *mockStatsDTCPListener) handlePacket(packet []byte, e chan<- Events) {
if err != nil { if err != nil {
panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err)) panic(fmt.Sprintf("mockStatsDTCPListener: accept failed: %v", err))
} }
ml.handleConn(sc, e) ml.handleConn(sc)
} }
func TestEscapeMetricName(t *testing.T) { func TestEscapeMetricName(t *testing.T) {

14
main.go
View file

@ -150,6 +150,9 @@ func main() {
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int() cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int()
eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events").Default("10000").Int()
eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing").Default("1000").Int()
eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Number of events to hold in queue before flushing").Default("200ms").Duration()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String() dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
) )
@ -169,8 +172,9 @@ func main() {
go serveHTTP(*listenAddress, *metricsEndpoint) go serveHTTP(*listenAddress, *metricsEndpoint)
events := make(chan Events, 1024) events := make(chan Events, *eventQueueSize)
defer close(events) defer close(events)
eventQueue := newEventQueue(events, *eventFlushThreshold, *eventFlushInterval)
if *statsdListenUDP != "" { if *statsdListenUDP != "" {
udpListenAddr := udpAddrFromString(*statsdListenUDP) udpListenAddr := udpAddrFromString(*statsdListenUDP)
@ -186,8 +190,8 @@ func main() {
} }
} }
ul := &StatsDUDPListener{conn: uconn} ul := &StatsDUDPListener{conn: uconn, eventHandler: eventQueue}
go ul.Listen(events) go ul.Listen()
} }
if *statsdListenTCP != "" { if *statsdListenTCP != "" {
@ -199,7 +203,7 @@ func main() {
defer tconn.Close() defer tconn.Close()
tl := &StatsDTCPListener{conn: tconn} tl := &StatsDTCPListener{conn: tconn}
go tl.Listen(events) go tl.Listen()
} }
if *statsdListenUnixgram != "" { if *statsdListenUnixgram != "" {
@ -225,7 +229,7 @@ func main() {
} }
ul := &StatsDUnixgramListener{conn: uxgconn} ul := &StatsDUnixgramListener{conn: uxgconn}
go ul.Listen(events) go ul.Listen()
// if it's an abstract unix domain socket, it won't exist on fs // if it's an abstract unix domain socket, it won't exist on fs
// so we can't chmod it either // so we can't chmod it either

View file

@ -25,6 +25,12 @@ var (
}, },
[]string{"type"}, []string{"type"},
) )
eventsFlushed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_event_queue_flushed_total",
Help: "Number of times events were flushed to exporter",
},
)
eventsUnmapped = prometheus.NewCounter(prometheus.CounterOpts{ eventsUnmapped = prometheus.NewCounter(prometheus.CounterOpts{
Name: "statsd_exporter_events_unmapped_total", Name: "statsd_exporter_events_unmapped_total",
Help: "The total number of StatsD events no mapping was found for.", Help: "The total number of StatsD events no mapping was found for.",
@ -87,7 +93,7 @@ var (
tagErrors = prometheus.NewCounter( tagErrors = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "statsd_exporter_tag_errors_total", Name: "statsd_exporter_tag_errors_total",
Help: "The number of errors parsign DogStatsD tags.", Help: "The number of errors parsing DogStatsD tags.",
}, },
) )
configLoads = prometheus.NewCounterVec( configLoads = prometheus.NewCounterVec(
@ -133,6 +139,7 @@ var (
func init() { func init() {
prometheus.MustRegister(eventStats) prometheus.MustRegister(eventStats)
prometheus.MustRegister(eventsFlushed)
prometheus.MustRegister(eventsUnmapped) prometheus.MustRegister(eventsUnmapped)
prometheus.MustRegister(udpPackets) prometheus.MustRegister(udpPackets)
prometheus.MustRegister(tcpConnections) prometheus.MustRegister(tcpConnections)