From a75e588cf0a65a6acf3490e35680c69f0670270a Mon Sep 17 00:00:00 2001 From: Chai Nadig Date: Wed, 13 May 2020 00:30:01 -0700 Subject: [PATCH] Log Ingested Lines Signed-off-by: Chai Nadig --- main.go | 2 +- pkg/event/event.go | 1 + pkg/listener/listener.go | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 7a31261..9180df6 100644 --- a/main.go +++ b/main.go @@ -257,7 +257,7 @@ func main() { statsdListenUDP = kingpin.Flag("statsd.listen-udp", "The UDP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() statsdListenTCP = kingpin.Flag("statsd.listen-tcp", "The TCP address on which to receive statsd metric lines. \"\" disables it.").Default(":9125").String() statsdListenUnixgram = kingpin.Flag("statsd.listen-unixgram", "The Unixgram socket path to receive statsd metric lines in datagram. \"\" disables it.").Default("").String() - // not using Int here because flag diplays default in decimal, 0755 will show as 493 + // not using Int here because flag displays default in decimal, 0755 will show as 493 statsdUnixSocketMode = kingpin.Flag("statsd.unixsocket-mode", "The permission mode of the unix socket.").Default("755").String() mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String() readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() diff --git a/pkg/event/event.go b/pkg/event/event.go index bc22629..54139be 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -84,6 +84,7 @@ func NewEventQueue(c chan Events, flushThreshold int, flushInterval time.Duratio eq := &EventQueue{ C: c, flushThreshold: flushThreshold, + flushInterval: flushInterval, flushTicker: ticker, q: make([]Event, 0, flushThreshold), eventsFlushed: eventsFlushed, diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index 9b9eb7d..0df7788 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -65,6 +65,7 @@ func (l *StatsDUDPListener) HandlePacket(packet []byte) { l.UDPPackets.Inc() lines := strings.Split(string(packet), "\n") for _, line := range lines { + level.Debug(l.Logger).Log("msg", "Incoming line", "line", line) l.LinesReceived.Inc() l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) } @@ -120,6 +121,7 @@ func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) { } break } + level.Debug(l.Logger).Log("msg", "Incoming line", "line", line) if isPrefix { l.TCPLineTooLong.Inc() level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr()) @@ -168,6 +170,7 @@ func (l *StatsDUnixgramListener) HandlePacket(packet []byte) { l.UnixgramPackets.Inc() lines := strings.Split(string(packet), "\n") for _, line := range lines { + level.Debug(l.Logger).Log("msg", "Incoming line", "line", line) l.LinesReceived.Inc() l.EventHandler.Queue(pkgLine.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) }