diff --git a/bridge_test.go b/bridge_test.go index c986c48..bc92e48 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -543,6 +543,7 @@ func TestHandlePacket(t *testing.T) { Logger: log.NewNopLogger(), LineParser: parser, UDPPackets: udpPackets, + UDPPacketDrops: udpPacketDrops, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, SampleErrors: *sampleErrors, diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 9a26760..2e13627 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -65,6 +65,7 @@ func benchmarkUDPListener(times int, b *testing.B) { // there are more events than input lines, need bigger buffer events := make(chan event.Events, len(bytesInput)*times*2) + udpChan := make(chan []byte, len(bytesInput)*times*2) l := listener.StatsDUDPListener{ EventHandler: &event.UnbufferedEventHandler{C: events}, @@ -74,6 +75,7 @@ func benchmarkUDPListener(times int, b *testing.B) { LinesReceived: linesReceived, SamplesReceived: samplesReceived, TagsReceived: tagsReceived, + UdpPacketQueue: udpChan, } // resume benchmark timer diff --git a/main.go b/main.go index dd8092c..4a33c65 100644 --- a/main.go +++ b/main.go @@ -71,6 +71,12 @@ var ( Help: "The total number of StatsD packets received over UDP.", }, ) + udpPacketDrops = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_udp_packet_drops_total", + Help: "The total number of dropped StatsD packets which received over UDP.", + }, + ) tcpConnections = promauto.NewCounter( prometheus.CounterOpts{ Name: "statsd_exporter_tcp_connections_total", @@ -261,6 +267,7 @@ func main() { signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool() relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String() relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint() + udpPacketQueueSize = kingpin.Flag("statsd.udp-packet-queue-size", "Size of internal queue for processing UDP packets.").Default("10000").Int() ) promlogConfig := &promlog.Config{} @@ -368,12 +375,15 @@ func main() { } } + udpPacketQueue := make(chan []byte, *udpPacketQueueSize) + ul := &listener.StatsDUDPListener{ Conn: uconn, EventHandler: eventQueue, Logger: logger, LineParser: parser, UDPPackets: udpPackets, + UDPPacketDrops: udpPacketDrops, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, Relay: relayTarget, @@ -381,6 +391,7 @@ func main() { SamplesReceived: samplesReceived, TagErrors: tagErrors, TagsReceived: tagsReceived, + UdpPacketQueue: udpPacketQueue, } go ul.Listen() diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index a0bdce4..a2bb948 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -56,6 +56,12 @@ var ( Help: "The total number of StatsD packets received over UDP.", }, ) + udpPacketDrops = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_udp_packet_drops_total", + Help: "The total number of dropped StatsD packets which received over UDP.", + }, + ) tcpConnections = prometheus.NewCounter( prometheus.CounterOpts{ Name: "statsd_exporter_tcp_connections_total", @@ -683,6 +689,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { Logger: log.NewNopLogger(), LineParser: parser, UDPPackets: udpPackets, + UDPPacketDrops: udpPacketDrops, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, SampleErrors: *sampleErrors, diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index c8acc0a..99e5792 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -38,6 +38,7 @@ type StatsDUDPListener struct { Logger log.Logger LineParser Parser UDPPackets prometheus.Counter + UDPPacketDrops prometheus.Counter LinesReceived prometheus.Counter EventsFlushed prometheus.Counter Relay *relay.Relay @@ -45,6 +46,7 @@ type StatsDUDPListener struct { SamplesReceived prometheus.Counter TagErrors prometheus.Counter TagsReceived prometheus.Counter + UdpPacketQueue chan []byte } func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { @@ -53,6 +55,7 @@ func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { func (l *StatsDUDPListener) Listen() { buf := make([]byte, 65535) + go l.ProcessUdpPacketQueue() for { n, _, err := l.Conn.ReadFromUDP(buf) if err != nil { @@ -64,12 +67,31 @@ func (l *StatsDUDPListener) Listen() { level.Error(l.Logger).Log("error", err) return } - l.HandlePacket(buf[0:n]) + + l.EnqueueUdpPacket(buf, n) + } +} + +func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte, n int) { + l.UDPPackets.Inc() + packetCopy := make([]byte, n) + copy(packetCopy, packet) + select { + case l.UdpPacketQueue <- packetCopy: + // do nothing + default: + l.UDPPacketDrops.Inc() + } +} + +func (l *StatsDUDPListener) ProcessUdpPacketQueue() { + for { + packet := <-l.UdpPacketQueue + l.HandlePacket(packet) } } func (l *StatsDUDPListener) HandlePacket(packet []byte) { - l.UDPPackets.Inc() lines := strings.Split(string(packet), "\n") for _, line := range lines { level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line)