Merge pull request #511 from kullanici0606/master

Performance improvement - process udp packets in separate goroutine in order to minimize packet drops
This commit is contained in:
Matthias Rampke 2023-10-23 18:04:04 +02:00 committed by GitHub
commit 12d14bb1d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 45 additions and 2 deletions

View file

@ -543,6 +543,7 @@ func TestHandlePacket(t *testing.T) {
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
LineParser: parser, LineParser: parser,
UDPPackets: udpPackets, UDPPackets: udpPackets,
UDPPacketDrops: udpPacketDrops,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors, SampleErrors: *sampleErrors,

View file

@ -65,6 +65,7 @@ func benchmarkUDPListener(times int, b *testing.B) {
// there are more events than input lines, need bigger buffer // there are more events than input lines, need bigger buffer
events := make(chan event.Events, len(bytesInput)*times*2) events := make(chan event.Events, len(bytesInput)*times*2)
udpChan := make(chan []byte, len(bytesInput)*times*2)
l := listener.StatsDUDPListener{ l := listener.StatsDUDPListener{
EventHandler: &event.UnbufferedEventHandler{C: events}, EventHandler: &event.UnbufferedEventHandler{C: events},
@ -74,6 +75,7 @@ func benchmarkUDPListener(times int, b *testing.B) {
LinesReceived: linesReceived, LinesReceived: linesReceived,
SamplesReceived: samplesReceived, SamplesReceived: samplesReceived,
TagsReceived: tagsReceived, TagsReceived: tagsReceived,
UdpPacketQueue: udpChan,
} }
// resume benchmark timer // resume benchmark timer

11
main.go
View file

@ -71,6 +71,12 @@ var (
Help: "The total number of StatsD packets received over UDP.", Help: "The total number of StatsD packets received over UDP.",
}, },
) )
udpPacketDrops = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packet_drops_total",
Help: "The total number of dropped StatsD packets which received over UDP.",
},
)
tcpConnections = promauto.NewCounter( tcpConnections = promauto.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total", Name: "statsd_exporter_tcp_connections_total",
@ -261,6 +267,7 @@ func main() {
signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool() signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool()
relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String() relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String()
relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint() relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint()
udpPacketQueueSize = kingpin.Flag("statsd.udp-packet-queue-size", "Size of internal queue for processing UDP packets.").Default("10000").Int()
) )
promlogConfig := &promlog.Config{} promlogConfig := &promlog.Config{}
@ -368,12 +375,15 @@ func main() {
} }
} }
udpPacketQueue := make(chan []byte, *udpPacketQueueSize)
ul := &listener.StatsDUDPListener{ ul := &listener.StatsDUDPListener{
Conn: uconn, Conn: uconn,
EventHandler: eventQueue, EventHandler: eventQueue,
Logger: logger, Logger: logger,
LineParser: parser, LineParser: parser,
UDPPackets: udpPackets, UDPPackets: udpPackets,
UDPPacketDrops: udpPacketDrops,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,
Relay: relayTarget, Relay: relayTarget,
@ -381,6 +391,7 @@ func main() {
SamplesReceived: samplesReceived, SamplesReceived: samplesReceived,
TagErrors: tagErrors, TagErrors: tagErrors,
TagsReceived: tagsReceived, TagsReceived: tagsReceived,
UdpPacketQueue: udpPacketQueue,
} }
go ul.Listen() go ul.Listen()

View file

@ -56,6 +56,12 @@ var (
Help: "The total number of StatsD packets received over UDP.", Help: "The total number of StatsD packets received over UDP.",
}, },
) )
udpPacketDrops = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packet_drops_total",
Help: "The total number of dropped StatsD packets which received over UDP.",
},
)
tcpConnections = prometheus.NewCounter( tcpConnections = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total", Name: "statsd_exporter_tcp_connections_total",
@ -683,6 +689,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
LineParser: parser, LineParser: parser,
UDPPackets: udpPackets, UDPPackets: udpPackets,
UDPPacketDrops: udpPacketDrops,
LinesReceived: linesReceived, LinesReceived: linesReceived,
EventsFlushed: eventsFlushed, EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors, SampleErrors: *sampleErrors,

View file

@ -38,6 +38,7 @@ type StatsDUDPListener struct {
Logger log.Logger Logger log.Logger
LineParser Parser LineParser Parser
UDPPackets prometheus.Counter UDPPackets prometheus.Counter
UDPPacketDrops prometheus.Counter
LinesReceived prometheus.Counter LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter EventsFlushed prometheus.Counter
Relay *relay.Relay Relay *relay.Relay
@ -45,6 +46,7 @@ type StatsDUDPListener struct {
SamplesReceived prometheus.Counter SamplesReceived prometheus.Counter
TagErrors prometheus.Counter TagErrors prometheus.Counter
TagsReceived prometheus.Counter TagsReceived prometheus.Counter
UdpPacketQueue chan []byte
} }
func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) {
@ -53,6 +55,7 @@ func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) {
func (l *StatsDUDPListener) Listen() { func (l *StatsDUDPListener) Listen() {
buf := make([]byte, 65535) buf := make([]byte, 65535)
go l.ProcessUdpPacketQueue()
for { for {
n, _, err := l.Conn.ReadFromUDP(buf) n, _, err := l.Conn.ReadFromUDP(buf)
if err != nil { if err != nil {
@ -64,12 +67,31 @@ func (l *StatsDUDPListener) Listen() {
level.Error(l.Logger).Log("error", err) level.Error(l.Logger).Log("error", err)
return return
} }
l.HandlePacket(buf[0:n])
l.EnqueueUdpPacket(buf, n)
}
}
func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte, n int) {
l.UDPPackets.Inc()
packetCopy := make([]byte, n)
copy(packetCopy, packet)
select {
case l.UdpPacketQueue <- packetCopy:
// do nothing
default:
l.UDPPacketDrops.Inc()
}
}
func (l *StatsDUDPListener) ProcessUdpPacketQueue() {
for {
packet := <-l.UdpPacketQueue
l.HandlePacket(packet)
} }
} }
func (l *StatsDUDPListener) HandlePacket(packet []byte) { func (l *StatsDUDPListener) HandlePacket(packet []byte) {
l.UDPPackets.Inc()
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
for _, line := range lines { for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line) level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line)