Introducing primary queue for raw packets and secondary for events

Fixing linting problems

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>
This commit is contained in:
Pedro Tanaka 2022-08-19 15:15:30 +02:00
parent e85098da3f
commit 5f3413f906
8 changed files with 304 additions and 212 deletions

View file

@ -20,6 +20,9 @@ import (
"testing"
"time"
"github.com/prometheus/statsd_exporter/pkg/parser"
"github.com/stretchr/testify/require"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
@ -531,67 +534,77 @@ func TestHandlePacket(t *testing.T) {
},
}
parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()
testParser := line.NewParser()
testParser.EnableDogstatsdParsing()
testParser.EnableInfluxdbParsing()
testParser.EnableLibratoParsing()
testParser.EnableSignalFXParsing()
for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LineParser: parser,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
Conn: nil,
Logger: log.NewNopLogger(),
UDPPackets: udpPackets,
}, &mockStatsDTCPListener{listener.StatsDTCPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LineParser: parser,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
Conn: nil,
Logger: log.NewNopLogger(),
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}, log.NewNopLogger()}} {
packets := make(chan string, 32)
events := make(chan event.Events, 32)
l.SetEventHandler(&event.UnbufferedEventHandler{C: events})
workers := make([]*parser.Worker, 1)
for i := range workers {
workers[i] = parser.NewWorker(
log.NewNopLogger(),
&event.UnbufferedEventHandler{C: events},
testParser,
nil,
linesReceived,
*sampleErrors,
samplesReceived,
tagErrors,
tagsReceived,
)
}
go workers[0].Consume(packets)
l.SetPacketBuffer(packets)
for i, scenario := range scenarios {
l.HandlePacket([]byte(scenario.in))
t.Run(scenario.name, func(t *testing.T) {
l.HandlePacket([]byte(scenario.in))
le := len(events)
// Flatten actual events.
actual := event.Events{}
for j := 0; j < le; j++ {
actual = append(actual, <-events...)
}
if len(actual) != len(scenario.out) {
t.Fatalf("%d.%d. Expected %d events, got %d in scenario '%s'", k, i, len(scenario.out), len(actual), scenario.name)
}
for j, expected := range scenario.out {
if !reflect.DeepEqual(&expected, &actual[j]) {
t.Fatalf("%d.%d.%d. Expected %#v, got %#v in scenario '%s'", k, i, j, expected, actual[j], scenario.name)
le := 0
if len(scenario.out) != 0 {
// wait until workers produce events
require.Eventually(t, func() bool {
le = len(events)
return le > 0
}, time.Second, time.Millisecond*10)
}
}
// Flatten actual events.
actual := event.Events{}
for j := 0; j < le; j++ {
actual = append(actual, <-events...)
}
if len(actual) != len(scenario.out) {
t.Fatalf("%d.%d. Expected %d events, got %d in scenario '%s'", k, i, len(scenario.out), len(actual), scenario.name)
}
for j, expected := range scenario.out {
if !reflect.DeepEqual(&expected, &actual[j]) {
t.Fatalf("%d.%d.%d. Expected %#v, got %#v in scenario '%s'", k, i, j, expected, actual[j], scenario.name)
}
}
})
}
}
}
type statsDPacketHandler interface {
HandlePacket(packet []byte)
SetEventHandler(eh event.EventHandler)
SetPacketBuffer(pb chan string)
}
type mockStatsDTCPListener struct {

View file

@ -17,6 +17,8 @@ import (
"fmt"
"testing"
"github.com/prometheus/statsd_exporter/pkg/parser"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
@ -50,11 +52,11 @@ func benchmarkUDPListener(times int, b *testing.B) {
}
}
parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()
testParser := line.NewParser()
testParser.EnableDogstatsdParsing()
testParser.EnableInfluxdbParsing()
testParser.EnableLibratoParsing()
testParser.EnableSignalFXParsing()
// reset benchmark timer to not measure startup costs
b.ResetTimer()
@ -65,25 +67,39 @@ func benchmarkUDPListener(times int, b *testing.B) {
// there are more events than input lines, need bigger buffer
events := make(chan event.Events, len(bytesInput)*times*2)
packets := make(chan string, len(input)*times*2)
l := listener.StatsDUDPListener{
EventHandler: &event.UnbufferedEventHandler{C: events},
Logger: logger,
LineParser: parser,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
SamplesReceived: samplesReceived,
TagsReceived: tagsReceived,
Logger: logger,
UDPPackets: udpPackets,
PacketBuffer: packets,
}
workers := make([]*parser.Worker, 2)
for i := 0; i < len(workers); i++ {
workers[i] = parser.NewWorker(
logger,
&event.UnbufferedEventHandler{C: events},
testParser,
nil,
linesReceived,
*sampleErrors,
samplesReceived,
tagErrors,
tagsReceived,
)
go workers[i].Consume(packets)
}
// resume benchmark timer
b.StartTimer()
for i := 0; i < times; i++ {
for _, line := range bytesInput {
l.HandlePacket([]byte(line))
for _, li := range bytesInput {
l.HandlePacket([]byte(li))
}
}
close(packets)
}
}

3
go.mod
View file

@ -8,6 +8,7 @@ require (
github.com/prometheus/client_golang v1.12.2
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.37.0
github.com/stretchr/testify v1.4.0
github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.4.0
@ -18,9 +19,11 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d // indirect
google.golang.org/protobuf v1.28.0 // indirect

100
main.go
View file

@ -24,6 +24,8 @@ import (
"strconv"
"syscall"
"github.com/prometheus/statsd_exporter/pkg/parser"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -249,6 +251,8 @@ func main() {
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int()
cacheType = kingpin.Flag("statsd.cache-type", "Metric mapping cache type. Valid options are \"lru\" and \"random\"").Default("lru").Enum("lru", "random")
parserWorkerPool = kingpin.Flag("statsd.parser-worker-pool-size", "How many workers will process raw statsd packets simultaneously.").Default("1").Uint()
packetBufferSize = kingpin.Flag("statsd.packet-buffer-size", "Size of buffer that holds raw statsd packets for parsing.").Default("5000").Uint()
eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events.").Default("10000").Uint()
eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing.").Default("1000").Int()
eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Maximum time between event queue flushes.").Default("200ms").Duration()
@ -274,23 +278,26 @@ func main() {
}
prometheus.MustRegister(version.NewCollector("statsd_exporter"))
parser := line.NewParser()
lineParser := line.NewParser()
if *dogstatsdTagsEnabled {
parser.EnableDogstatsdParsing()
lineParser.EnableDogstatsdParsing()
}
if *influxdbTagsEnabled {
parser.EnableInfluxdbParsing()
lineParser.EnableInfluxdbParsing()
}
if *libratoTagsEnabled {
parser.EnableLibratoParsing()
lineParser.EnableLibratoParsing()
}
if *signalFXTagsEnabled {
parser.EnableSignalFXParsing()
lineParser.EnableSignalFXParsing()
}
level.Info(logger).Log("msg", "Starting StatsD -> Prometheus Exporter", "version", version.Info())
level.Info(logger).Log("msg", "Build context", "context", version.BuildContext())
packets := make(chan string, *packetBufferSize)
defer close(packets)
events := make(chan event.Events, *eventQueueSize)
defer close(events)
eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed)
@ -321,7 +328,17 @@ func main() {
}
}
exporter := exporter.NewExporter(prometheus.DefaultRegisterer, thisMapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
exporterInstance := exporter.NewExporter(
prometheus.DefaultRegisterer,
thisMapper,
logger,
eventsActions,
eventsUnmapped,
errorEventStats,
eventStats,
conflictingEventStats,
metricsCount,
)
if *checkConfig {
level.Info(logger).Log("msg", "Configuration check successful, exiting")
@ -367,18 +384,11 @@ func main() {
}
ul := &listener.StatsDUDPListener{
Conn: uconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
Conn: uconn,
Logger: logger,
PacketBuffer: packets,
UDPPackets: udpPackets,
}
go ul.Listen()
@ -398,20 +408,13 @@ func main() {
defer tconn.Close()
tl := &listener.StatsDTCPListener{
Conn: tconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
Conn: tconn,
Logger: logger,
PacketBuffer: packets,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
}
go tl.Listen()
@ -443,18 +446,11 @@ func main() {
}
ul := &listener.StatsDUnixgramListener{
Conn: uxgconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
Conn: uxgconn,
Logger: logger,
UnixgramPackets: unixgramPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
PacketBuffer: packets,
}
go ul.Listen()
@ -477,6 +473,22 @@ func main() {
}
}
workers := make([]*parser.Worker, *parserWorkerPool)
for i := 0; i < int(*parserWorkerPool); i++ {
workers[i] = parser.NewWorker(
logger,
eventQueue,
lineParser,
relayTarget,
linesReceived,
*sampleErrors,
samplesReceived,
tagErrors,
tagsReceived,
)
go workers[i].Consume(packets)
}
mux := http.DefaultServeMux
mux.Handle(*metricsEndpoint, promhttp.Handler())
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@ -530,7 +542,7 @@ func main() {
go serveHTTP(mux, *listenAddress, logger)
go sighupConfigReloader(*mappingConfig, thisMapper, logger)
go exporter.Listen(events)
go exporterInstance.Listen(events)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)

View file

@ -118,7 +118,7 @@ func (eq *EventQueue) Flush() {
func (eq *EventQueue) FlushUnlocked() {
eq.C <- eq.q
eq.q = make([]Event, 0, cap(eq.q))
eq.q = eq.q[:0]
eq.eventsFlushed.Inc()
}

View file

@ -19,6 +19,9 @@ import (
"testing"
"time"
"github.com/prometheus/statsd_exporter/pkg/parser"
"github.com/stretchr/testify/require"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
@ -39,12 +42,6 @@ var (
},
[]string{"type"},
)
eventsFlushed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_event_queue_flushed_total",
Help: "Number of times events were flushed to exporter",
},
)
eventsUnmapped = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_events_unmapped_total",
@ -596,51 +593,58 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
}
}()
events := make(chan event.Events)
events := make(chan event.Events, 10)
packets := make(chan string, 10)
ueh := &event.UnbufferedEventHandler{C: events}
parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()
testParser := line.NewParser()
testParser.EnableDogstatsdParsing()
testParser.EnableInfluxdbParsing()
testParser.EnableLibratoParsing()
testParser.EnableSignalFXParsing()
workers := make([]*parser.Worker, 1)
for i := range workers {
workers[i] = parser.NewWorker(
log.NewNopLogger(),
ueh,
testParser,
nil,
linesReceived,
*sampleErrors,
samplesReceived,
tagErrors,
tagsReceived,
)
}
go workers[0].Consume(packets)
go func() {
for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LineParser: parser,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
Conn: nil,
Logger: log.NewNopLogger(),
PacketBuffer: packets,
UDPPackets: udpPackets,
}, &mockStatsDTCPListener{listener.StatsDTCPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LineParser: parser,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
TCPLineTooLong: tcpLineTooLong,
Conn: nil,
Logger: log.NewNopLogger(),
TCPConnections: tcpConnections,
TCPErrors: tcpErrors,
PacketBuffer: packets,
TCPLineTooLong: tcpLineTooLong,
}, log.NewNopLogger()}} {
l.SetEventHandler(ueh)
l.SetPacketBuffer(packets)
l.HandlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid"))
}
close(events)
}()
testMapper := mapper.MetricMapper{}
require.Eventually(t, func() bool {
return len(events) > 0
}, time.Second, time.Millisecond*10)
close(events)
close(packets)
ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount)
ex.Listen(events)
}
@ -776,7 +780,7 @@ func TestCounterIncrement(t *testing.T) {
type statsDPacketHandler interface {
HandlePacket(packet []byte)
SetEventHandler(eh event.EventHandler)
SetPacketBuffer(pb chan string)
}
type mockStatsDTCPListener struct {

View file

@ -25,7 +25,6 @@ import (
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/level"
"github.com/prometheus/statsd_exporter/pkg/relay"
)
type Parser interface {
@ -33,22 +32,14 @@ type Parser interface {
}
type StatsDUDPListener struct {
Conn *net.UDPConn
EventHandler event.EventHandler
Logger log.Logger
LineParser Parser
UDPPackets prometheus.Counter
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
Relay *relay.Relay
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
Conn *net.UDPConn
Logger log.Logger
UDPPackets prometheus.Counter
PacketBuffer chan string
}
func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) {
l.EventHandler = eh
func (l *StatsDUDPListener) SetPacketBuffer(pb chan string) {
l.PacketBuffer = pb
}
func (l *StatsDUDPListener) Listen() {
@ -70,36 +61,21 @@ func (l *StatsDUDPListener) Listen() {
func (l *StatsDUDPListener) HandlePacket(packet []byte) {
l.UDPPackets.Inc()
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line)
l.LinesReceived.Inc()
if l.Relay != nil && len(line) > 0 {
l.Relay.RelayLine(line)
}
l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
}
l.PacketBuffer <- string(packet)
}
type StatsDTCPListener struct {
Conn *net.TCPListener
EventHandler event.EventHandler
Logger log.Logger
LineParser Parser
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
Relay *relay.Relay
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
TCPConnections prometheus.Counter
TCPErrors prometheus.Counter
TCPLineTooLong prometheus.Counter
Conn *net.TCPListener
PacketBuffer chan string
Logger log.Logger
TCPConnections prometheus.Counter
TCPErrors prometheus.Counter
TCPLineTooLong prometheus.Counter
}
func (l *StatsDTCPListener) SetEventHandler(eh event.EventHandler) {
l.EventHandler = eh
func (l *StatsDTCPListener) SetPacketBuffer(pb chan string) {
l.PacketBuffer = pb
}
func (l *StatsDTCPListener) Listen() {
@ -139,31 +115,21 @@ func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) {
level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr())
break
}
l.LinesReceived.Inc()
if l.Relay != nil && len(line) > 0 {
l.Relay.RelayLine(string(line))
}
l.EventHandler.Queue(l.LineParser.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
l.PacketBuffer <- string(line)
}
}
type StatsDUnixgramListener struct {
Conn *net.UnixConn
EventHandler event.EventHandler
Logger log.Logger
LineParser Parser
Conn *net.UnixConn
PacketBuffer chan string
Logger log.Logger
UnixgramPackets prometheus.Counter
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
Relay *relay.Relay
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
}
func (l *StatsDUnixgramListener) SetEventHandler(eh event.EventHandler) {
l.EventHandler = eh
func (l *StatsDUnixgramListener) SetPacketBuffer(pb chan string) {
l.PacketBuffer = pb
}
func (l *StatsDUnixgramListener) Listen() {
@ -185,13 +151,5 @@ func (l *StatsDUnixgramListener) Listen() {
func (l *StatsDUnixgramListener) HandlePacket(packet []byte) {
l.UnixgramPackets.Inc()
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "unixgram", "line", line)
l.LinesReceived.Inc()
if l.Relay != nil && len(line) > 0 {
l.Relay.RelayLine(line)
}
l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger))
}
l.PacketBuffer <- string(packet)
}

86
pkg/parser/worker.go Normal file
View file

@ -0,0 +1,86 @@
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parser
import (
"strings"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/relay"
)
type Worker struct {
EventHandler event.EventHandler
Logger log.Logger
LineParser *line.Parser
Relay *relay.Relay
LinesReceived prometheus.Counter
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
}
func NewWorker(
logger log.Logger,
eventHandler event.EventHandler,
lineParser *line.Parser,
relay *relay.Relay,
linesReceived prometheus.Counter,
sampleErrors prometheus.CounterVec,
samplesReceived prometheus.Counter,
tagErrors prometheus.Counter,
tagsReceived prometheus.Counter,
) *Worker {
return &Worker{
EventHandler: eventHandler,
Logger: logger,
LineParser: lineParser,
Relay: relay,
LinesReceived: linesReceived,
SampleErrors: sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
}
}
func (w *Worker) Consume(c <-chan string) {
for {
bytes, ok := <-c
if !ok {
level.Debug(w.Logger).Log("msg", "channel closed, exiting consume loop")
return
}
w.handle(bytes)
}
}
func (w *Worker) handle(packet string) {
lines := strings.Split(packet, "\n")
for _, l := range lines {
level.Debug(w.Logger).Log("msg", "Incoming line", "sample", l)
w.LinesReceived.Inc()
if w.Relay != nil && len(l) > 0 {
w.Relay.RelayLine(l)
}
w.EventHandler.Queue(w.LineParser.LineToEvents(l, w.SampleErrors, w.SamplesReceived, w.TagErrors, w.TagsReceived, w.Logger))
}
}