From 03255bf2181d87f5e553fe06c4deea6ce53aafb4 Mon Sep 17 00:00:00 2001 From: kullanici0606 Date: Mon, 18 Sep 2023 14:31:33 +0300 Subject: [PATCH] process udp packets in a separate goroutine so that udp packets are not dropped Signed-off-by: kullanici0606 --- main.go | 4 ++++ pkg/listener/listener.go | 18 +++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index dd8092c..1fc5762 100644 --- a/main.go +++ b/main.go @@ -261,6 +261,7 @@ func main() { signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool() relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String() relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint() + udpPacketQueueSize = kingpin.Flag("statsd.udp-packet-queue-size", "Size of internal queue for processing udp packets.").Default("10000").Int() ) promlogConfig := &promlog.Config{} @@ -368,6 +369,8 @@ func main() { } } + udpPacketQueue := make(chan []byte, *udpPacketQueueSize) + ul := &listener.StatsDUDPListener{ Conn: uconn, EventHandler: eventQueue, @@ -381,6 +384,7 @@ func main() { SamplesReceived: samplesReceived, TagErrors: tagErrors, TagsReceived: tagsReceived, + UdpPacketQueue: udpPacketQueue, } go ul.Listen() diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index c8acc0a..ce03944 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -45,6 +45,7 @@ type StatsDUDPListener struct { SamplesReceived prometheus.Counter TagErrors prometheus.Counter TagsReceived prometheus.Counter + UdpPacketQueue chan []byte } func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { @@ -53,6 +54,7 @@ func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { func (l *StatsDUDPListener) Listen() { buf := make([]byte, 65535) + go l.ProcessUdpPacketQueue() for { n, _, err := l.Conn.ReadFromUDP(buf) if err != nil { @@ -64,7 +66,21 @@ func (l *StatsDUDPListener) Listen() { level.Error(l.Logger).Log("error", err) return } - l.HandlePacket(buf[0:n]) + l.EnqueueUdpPacket(buf[0:n]) + } +} + +func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte) { + packetCopy := make([]byte, len(packet)) + copy(packetCopy, packet) + l.UdpPacketQueue <- packetCopy +} + +func (l *StatsDUDPListener) ProcessUdpPacketQueue() { + level.Info(l.Logger).Log("msg", "Running in pipelining mode") + for { + packet := <-l.UdpPacketQueue + l.HandlePacket(packet) } }