From 03255bf2181d87f5e553fe06c4deea6ce53aafb4 Mon Sep 17 00:00:00 2001 From: kullanici0606 Date: Mon, 18 Sep 2023 14:31:33 +0300 Subject: [PATCH 1/6] process udp packets in a separate goroutine so that udp packets are not dropped Signed-off-by: kullanici0606 --- main.go | 4 ++++ pkg/listener/listener.go | 18 +++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index dd8092c..1fc5762 100644 --- a/main.go +++ b/main.go @@ -261,6 +261,7 @@ func main() { signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool() relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String() relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint() + udpPacketQueueSize = kingpin.Flag("statsd.udp-packet-queue-size", "Size of internal queue for processing udp packets.").Default("10000").Int() ) promlogConfig := &promlog.Config{} @@ -368,6 +369,8 @@ func main() { } } + udpPacketQueue := make(chan []byte, *udpPacketQueueSize) + ul := &listener.StatsDUDPListener{ Conn: uconn, EventHandler: eventQueue, @@ -381,6 +384,7 @@ func main() { SamplesReceived: samplesReceived, TagErrors: tagErrors, TagsReceived: tagsReceived, + UdpPacketQueue: udpPacketQueue, } go ul.Listen() diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index c8acc0a..ce03944 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -45,6 +45,7 @@ type StatsDUDPListener struct { SamplesReceived prometheus.Counter TagErrors prometheus.Counter TagsReceived prometheus.Counter + UdpPacketQueue chan []byte } func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { @@ -53,6 +54,7 @@ func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { func (l *StatsDUDPListener) Listen() { buf := make([]byte, 65535) + go l.ProcessUdpPacketQueue() for { n, _, err := l.Conn.ReadFromUDP(buf) if err != nil { @@ -64,7 +66,21 @@ func (l *StatsDUDPListener) Listen() { level.Error(l.Logger).Log("error", err) return } - l.HandlePacket(buf[0:n]) + l.EnqueueUdpPacket(buf[0:n]) + } +} + +func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte) { + packetCopy := make([]byte, len(packet)) + copy(packetCopy, packet) + l.UdpPacketQueue <- packetCopy +} + +func (l *StatsDUDPListener) ProcessUdpPacketQueue() { + level.Info(l.Logger).Log("msg", "Running in pipelining mode") + for { + packet := <-l.UdpPacketQueue + l.HandlePacket(packet) } } From ccb3eb6277197d469eb4d1bbdad934270afb85c2 Mon Sep 17 00:00:00 2001 From: kullanici0606 Date: Mon, 18 Sep 2023 14:34:06 +0300 Subject: [PATCH 2/6] remove unnecessary log Signed-off-by: kullanici0606 --- pkg/listener/listener.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index ce03944..b0526ae 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -77,7 +77,6 @@ func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte) { } func (l *StatsDUDPListener) ProcessUdpPacketQueue() { - level.Info(l.Logger).Log("msg", "Running in pipelining mode") for { packet := <-l.UdpPacketQueue l.HandlePacket(packet) From b0c6d983e1609848315af56919e0f96abba99359 Mon Sep 17 00:00:00 2001 From: kullanici0606 Date: Mon, 18 Sep 2023 14:40:29 +0300 Subject: [PATCH 3/6] fix udp metrics increment Signed-off-by: kullanici0606 --- pkg/listener/listener.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index b0526ae..6c05e28 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -71,6 +71,7 @@ func (l *StatsDUDPListener) Listen() { } func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte) { + l.UDPPackets.Inc() packetCopy := make([]byte, len(packet)) copy(packetCopy, packet) l.UdpPacketQueue <- packetCopy @@ -84,7 +85,6 @@ func (l *StatsDUDPListener) ProcessUdpPacketQueue() { } func (l *StatsDUDPListener) HandlePacket(packet []byte) { - l.UDPPackets.Inc() lines := strings.Split(string(packet), "\n") for _, line := range lines { level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line) From aaaf26c074112ea2403688578bb8cd5d5519feed Mon Sep 17 00:00:00 2001 From: kullanici0606 Date: Wed, 20 Sep 2023 10:05:50 +0300 Subject: [PATCH 4/6] avoid making copies of slices since we need to minimize the time spent in packet read in order not to drop packets Signed-off-by: kullanici0606 --- exporter_benchmark_test.go | 2 ++ pkg/listener/listener.go | 7 ++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 9a26760..2e13627 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -65,6 +65,7 @@ func benchmarkUDPListener(times int, b *testing.B) { // there are more events than input lines, need bigger buffer events := make(chan event.Events, len(bytesInput)*times*2) + udpChan := make(chan []byte, len(bytesInput)*times*2) l := listener.StatsDUDPListener{ EventHandler: &event.UnbufferedEventHandler{C: events}, @@ -74,6 +75,7 @@ func benchmarkUDPListener(times int, b *testing.B) { LinesReceived: linesReceived, SamplesReceived: samplesReceived, TagsReceived: tagsReceived, + UdpPacketQueue: udpChan, } // resume benchmark timer diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index 6c05e28..c3f1520 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -66,13 +66,14 @@ func (l *StatsDUDPListener) Listen() { level.Error(l.Logger).Log("error", err) return } - l.EnqueueUdpPacket(buf[0:n]) + // avoid making copies of slices since we need to minimize the time spent here in order not to drop packets + l.EnqueueUdpPacket(buf, n) } } -func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte) { +func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte, n int) { l.UDPPackets.Inc() - packetCopy := make([]byte, len(packet)) + packetCopy := make([]byte, n) copy(packetCopy, packet) l.UdpPacketQueue <- packetCopy } From c246633aec59cd2dadd8253e6e50f5b8ea9dd168 Mon Sep 17 00:00:00 2001 From: kullanici0606 Date: Mon, 23 Oct 2023 10:00:25 +0300 Subject: [PATCH 5/6] add counter for dropped UDP packets. Remove unnecessary and misleading comment Signed-off-by: kullanici0606 --- bridge_test.go | 1 + main.go | 7 +++++++ pkg/exporter/exporter_test.go | 7 +++++++ pkg/listener/listener.go | 10 ++++++++-- 4 files changed, 23 insertions(+), 2 deletions(-) diff --git a/bridge_test.go b/bridge_test.go index c986c48..bc92e48 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -543,6 +543,7 @@ func TestHandlePacket(t *testing.T) { Logger: log.NewNopLogger(), LineParser: parser, UDPPackets: udpPackets, + UDPPacketDrops: udpPacketDrops, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, SampleErrors: *sampleErrors, diff --git a/main.go b/main.go index 1fc5762..5d115da 100644 --- a/main.go +++ b/main.go @@ -71,6 +71,12 @@ var ( Help: "The total number of StatsD packets received over UDP.", }, ) + udpPacketDrops = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_udp_packet_drops_total", + Help: "The total number of dropped StatsD packets which received over UDP.", + }, + ) tcpConnections = promauto.NewCounter( prometheus.CounterOpts{ Name: "statsd_exporter_tcp_connections_total", @@ -377,6 +383,7 @@ func main() { Logger: logger, LineParser: parser, UDPPackets: udpPackets, + UDPPacketDrops: udpPacketDrops, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, Relay: relayTarget, diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index a0bdce4..a2bb948 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -56,6 +56,12 @@ var ( Help: "The total number of StatsD packets received over UDP.", }, ) + udpPacketDrops = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "statsd_exporter_udp_packet_drops_total", + Help: "The total number of dropped StatsD packets which received over UDP.", + }, + ) tcpConnections = prometheus.NewCounter( prometheus.CounterOpts{ Name: "statsd_exporter_tcp_connections_total", @@ -683,6 +689,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { Logger: log.NewNopLogger(), LineParser: parser, UDPPackets: udpPackets, + UDPPacketDrops: udpPacketDrops, LinesReceived: linesReceived, EventsFlushed: eventsFlushed, SampleErrors: *sampleErrors, diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index c3f1520..99e5792 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -38,6 +38,7 @@ type StatsDUDPListener struct { Logger log.Logger LineParser Parser UDPPackets prometheus.Counter + UDPPacketDrops prometheus.Counter LinesReceived prometheus.Counter EventsFlushed prometheus.Counter Relay *relay.Relay @@ -66,7 +67,7 @@ func (l *StatsDUDPListener) Listen() { level.Error(l.Logger).Log("error", err) return } - // avoid making copies of slices since we need to minimize the time spent here in order not to drop packets + l.EnqueueUdpPacket(buf, n) } } @@ -75,7 +76,12 @@ func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte, n int) { l.UDPPackets.Inc() packetCopy := make([]byte, n) copy(packetCopy, packet) - l.UdpPacketQueue <- packetCopy + select { + case l.UdpPacketQueue <- packetCopy: + // do nothing + default: + l.UDPPacketDrops.Inc() + } } func (l *StatsDUDPListener) ProcessUdpPacketQueue() { From 04ed8c69a92f160de4f8cb398ed1ff1d30b7b16c Mon Sep 17 00:00:00 2001 From: kullanici0606 <35795498+kullanici0606@users.noreply.github.com> Date: Mon, 23 Oct 2023 10:08:22 +0300 Subject: [PATCH 6/6] fix typo Co-authored-by: Matthias Rampke Signed-off-by: kullanici0606 <35795498+kullanici0606@users.noreply.github.com> --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 5d115da..4a33c65 100644 --- a/main.go +++ b/main.go @@ -267,7 +267,7 @@ func main() { signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool() relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String() relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint() - udpPacketQueueSize = kingpin.Flag("statsd.udp-packet-queue-size", "Size of internal queue for processing udp packets.").Default("10000").Int() + udpPacketQueueSize = kingpin.Flag("statsd.udp-packet-queue-size", "Size of internal queue for processing UDP packets.").Default("10000").Int() ) promlogConfig := &promlog.Config{}