Log Ingested Lines

Signed-off-by: Chai Nadig <chaitanya.nadig@gmail.com>
This commit is contained in:
Chai Nadig 2020-05-13 00:30:01 -07:00 committed by Chai Nadig
parent 2898eb8c0c
commit a75e588cf0
3 changed files with 5 additions and 1 deletions

View file

@ -257,7 +257,7 @@ func main() {
statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String()
statsdListenUnixgram = kingpin.Flag("statsd.listen-unixgram", "The Unixgram socket path to receive statsd metric lines in datagram. \"\" disables it.").Default("").String() statsdListenUnixgram = kingpin.Flag("statsd.listen-unixgram", "The Unixgram socket path to receive statsd metric lines in datagram. \"\" disables it.").Default("").String()
// not using Int here because flag diplays default in decimal, 0755 will show as 493 // not using Int here because flag displays default in decimal, 0755 will show as 493
statsdUnixSocketMode = kingpin.Flag("statsd.unixsocket-mode", "The permission mode of the unix socket.").Default("755").String() statsdUnixSocketMode = kingpin.Flag("statsd.unixsocket-mode", "The permission mode of the unix socket.").Default("755").String()
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()

View file

@ -84,6 +84,7 @@ func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duratio
eq := &EventQueue{ eq := &EventQueue{
C: c, C: c,
flushThreshold: flushThreshold, flushThreshold: flushThreshold,
flushInterval: flushInterval,
flushTicker: ticker, flushTicker: ticker,
q: make([]Event, 0, flushThreshold), q: make([]Event, 0, flushThreshold),
eventsFlushed: eventsFlushed, eventsFlushed: eventsFlushed,

View file

@ -65,6 +65,7 @@ func (l *StatsDUDPListener) HandlePacket(packet []byte) {
l.UDPPackets.Inc() l.UDPPackets.Inc()
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
for _, line := range lines { for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "line", line)
l.LinesReceived.Inc() l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
} }
@ -120,6 +121,7 @@ func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) {
} }
break break
} }
level.Debug(l.Logger).Log("msg", "Incoming line", "line", line)
if isPrefix { if isPrefix {
l.TCPLineTooLong.Inc() l.TCPLineTooLong.Inc()
level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr()) level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr())
@ -168,6 +170,7 @@ func (l *StatsDUnixgramListener) HandlePacket(packet []byte) {
l.UnixgramPackets.Inc() l.UnixgramPackets.Inc()
lines := strings.Split(string(packet), "\n") lines := strings.Split(string(packet), "\n")
for _, line := range lines { for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "line", line)
l.LinesReceived.Inc() l.LinesReceived.Inc()
l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
} }