process udp packets in a separate goroutine so that udp packets are not dropped

Signed-off-by: kullanici0606 <yakup.turgut@btk.gov.tr>
This commit is contained in:
kullanici0606 2023-09-18 14:31:33 +03:00
parent 871e2d8df1
commit 03255bf218
2 changed files with 21 additions and 1 deletions

View file

@ -261,6 +261,7 @@ func main() {
signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool()
relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String()
relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint()
udpPacketQueueSize = kingpin.Flag("statsd.udp-packet-queue-size", "Size of internal queue for processing udp packets.").Default("10000").Int()
)
promlogConfig := &promlog.Config{}
@ -368,6 +369,8 @@ func main() {
}
}
udpPacketQueue := make(chan []byte, *udpPacketQueueSize)
ul := &listener.StatsDUDPListener{
Conn: uconn,
EventHandler: eventQueue,
@ -381,6 +384,7 @@ func main() {
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
UdpPacketQueue: udpPacketQueue,
}
go ul.Listen()

View file

@ -45,6 +45,7 @@ type StatsDUDPListener struct {
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
UdpPacketQueue chan []byte
}
func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) {
@ -53,6 +54,7 @@ func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) {
func (l *StatsDUDPListener) Listen() {
buf := make([]byte, 65535)
go l.ProcessUdpPacketQueue()
for {
n, _, err := l.Conn.ReadFromUDP(buf)
if err != nil {
@ -64,7 +66,21 @@ func (l *StatsDUDPListener) Listen() {
level.Error(l.Logger).Log("error", err)
return
}
l.HandlePacket(buf[0:n])
l.EnqueueUdpPacket(buf[0:n])
}
}
func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte) {
packetCopy := make([]byte, len(packet))
copy(packetCopy, packet)
l.UdpPacketQueue <- packetCopy
}
func (l *StatsDUDPListener) ProcessUdpPacketQueue() {
level.Info(l.Logger).Log("msg", "Running in pipelining mode")
for {
packet := <-l.UdpPacketQueue
l.HandlePacket(packet)
}
}